Merge branch 'refactor/web-workers' into 'develop'

perf:  seperate web and queue workers

Closes #10610

See merge request firefish/firefish!10552
This commit is contained in:
Kainoa Kanter 2023-09-02 17:00:02 +00:00
commit 09318e6596
6 changed files with 71 additions and 22 deletions

View file

@ -143,11 +143,11 @@ reservedUsernames: [
# Whether disable HSTS # Whether disable HSTS
#disableHsts: true #disableHsts: true
# Number of worker processes # Number of worker processes by type.
#clusterLimit: 1 # The sum must not exceed the number of available cores.
#clusterLimits:
# Worker only mode # web: 1
#onlyQueueProcessor: 1 # queue: 1
# Job concurrency per worker # Job concurrency per worker
# deliverJobConcurrency: 128 # deliverJobConcurrency: 128

View file

@ -19,7 +19,12 @@ const ev = new Xev();
* Init process * Init process
*/ */
export default async function () { export default async function () {
process.title = `Firefish (${cluster.isPrimary ? "master" : "worker"})`; const mode =
process.env.mode && ["web", "queue"].includes(process.env.mode)
? `(${process.env.mode})`
: "";
const type = cluster.isPrimary ? "(master)" : "(worker)"
process.title = `Firefish ${mode} ${type}`;
if (cluster.isPrimary || envOption.disableClustering) { if (cluster.isPrimary || envOption.disableClustering) {
await masterMain(); await masterMain();

View file

@ -111,7 +111,7 @@ export async function masterMain() {
bootLogger.succ("Firefish initialized"); bootLogger.succ("Firefish initialized");
if (!envOption.disableClustering) { if (!envOption.disableClustering) {
await spawnWorkers(config.clusterLimit); await spawnWorkers(config.clusterLimits);
} }
bootLogger.succ( bootLogger.succ(
@ -120,7 +120,11 @@ export async function masterMain() {
true, true,
); );
if (!envOption.noDaemons && !config.onlyQueueProcessor) { if (
!envOption.noDaemons &&
config.clusterLimits?.web &&
config.clusterLimits?.web >= 1
) {
import("../daemons/server-stats.js").then((x) => x.default()); import("../daemons/server-stats.js").then((x) => x.default());
import("../daemons/queue-stats.js").then((x) => x.default()); import("../daemons/queue-stats.js").then((x) => x.default());
import("../daemons/janitor.js").then((x) => x.default()); import("../daemons/janitor.js").then((x) => x.default());
@ -136,7 +140,7 @@ function showEnvironment(): void {
if (env !== "production") { if (env !== "production") {
logger.warn("The environment is not in production mode."); logger.warn("The environment is not in production mode.");
logger.warn("DO NOT USE FOR PRODUCTION PURPOSE!", null, true); logger.warn("DO NOT USE THIS IN PRODUCTION!", null, true);
} }
} }
@ -194,19 +198,33 @@ async function connectDb(): Promise<void> {
} }
} }
async function spawnWorkers(limit = 1) { async function spawnWorkers(
const workers = Math.min(limit, os.cpus().length); clusterLimits: Required<Config["clusterLimits"]>,
bootLogger.info(`Starting ${workers} worker${workers === 1 ? "" : "s"}...`); ): Promise<void> {
await Promise.all([...Array(workers)].map(spawnWorker)); const modes = ["web", "queue"];
const cpus = os.cpus().length;
for (const mode of modes.filter((mode) => clusterLimits[mode] > cpus)) {
bootLogger.warn(
`configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`,
);
}
const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0);
const workers = new Array(total);
workers.fill("web", 0, clusterLimits?.web);
workers.fill("queue", clusterLimits?.web);
bootLogger.info(`Starting ${clusterLimits?.web} web workers and ${clusterLimits?.queue} queue workers (total ${total})...`);
await Promise.all(workers.map((mode) => spawnWorker(mode)));
bootLogger.succ("All workers started"); bootLogger.succ("All workers started");
} }
function spawnWorker(): Promise<void> { function spawnWorker(mode: "web" | "queue"): Promise<void> {
return new Promise((res) => { return new Promise((res) => {
const worker = cluster.fork(); const worker = cluster.fork({ mode });
worker.on("message", (message) => { worker.on("message", (message) => {
if (message === "listenFailed") { if (message === "listenFailed") {
bootLogger.error("The server Listen failed due to the previous error."); bootLogger.error("The server listen failed due to the previous error.");
process.exit(1); process.exit(1);
} }
if (message !== "ready") return; if (message !== "ready") return;

View file

@ -1,6 +1,7 @@
import cluster from "node:cluster"; import cluster from "node:cluster";
import { initDb } from "../db/postgre.js"; import { initDb } from "../db/postgre.js";
import config from "@/config/index.js"; import config from "@/config/index.js";
import os from "node:os";
/** /**
* Init worker process * Init worker process
@ -8,13 +9,20 @@ import config from "@/config/index.js";
export async function workerMain() { export async function workerMain() {
await initDb(); await initDb();
if (!config.onlyQueueProcessor) { if (!process.env.mode || process.env.mode === "web") {
// start server // start server
await import("../server/index.js").then((x) => x.default()); await import("../server/index.js").then((x) => x.default());
} }
// start job queue if (!process.env.mode || process.env.mode === "queue") {
import("../queue/index.js").then((x) => x.default()); // start job queue
import("../queue/index.js").then((x) => x.default());
if (process.env.mode === "queue") {
// if this is an exclusive queue worker, renice to have higher priority
os.setPriority(os.constants.priority.PRIORITY_BELOW_NORMAL);
}
}
if (cluster.isWorker) { if (cluster.isWorker) {
// Send a 'ready' message to parent process // Send a 'ready' message to parent process

View file

@ -59,6 +59,23 @@ export default function load() {
if (config.cacheServer && !config.cacheServer.prefix) if (config.cacheServer && !config.cacheServer.prefix)
config.cacheServer.prefix = mixin.hostname; config.cacheServer.prefix = mixin.hostname;
if (!config.clusterLimits) {
config.clusterLimits = {
web: 1,
queue: 1,
};
} else {
config.clusterLimits = {
web: 1,
queue: 1,
...config.clusterLimits,
};
if (config.clusterLimits.web! < 1 || config.clusterLimits.queue! < 1) {
throw new Error("Invalid cluster limits");
}
}
return Object.assign(config, mixin); return Object.assign(config, mixin);
} }

View file

@ -69,9 +69,10 @@ export type Source = {
accesslog?: string; accesslog?: string;
clusterLimit?: number; clusterLimits?: {
web?: number;
onlyQueueProcessor?: boolean; queue?: number;
};
cuid?: { cuid?: {
length?: number; length?: number;