From b31ebb9f1e52e4f204ebac4db9a3681042dfff52 Mon Sep 17 00:00:00 2001 From: Kainoa Kanter Date: Sat, 2 Sep 2023 17:00:02 +0000 Subject: [PATCH] perf: :zap: seperate web and queue workers --- .config/example.yml | 10 ++++---- packages/backend/src/boot/index.ts | 7 ++++- packages/backend/src/boot/master.ts | 38 ++++++++++++++++++++-------- packages/backend/src/boot/worker.ts | 14 +++++++--- packages/backend/src/config/load.ts | 17 +++++++++++++ packages/backend/src/config/types.ts | 7 ++--- 6 files changed, 71 insertions(+), 22 deletions(-) diff --git a/.config/example.yml b/.config/example.yml index c4c6340076..49922d65ae 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -143,11 +143,11 @@ reservedUsernames: [ # Whether disable HSTS #disableHsts: true -# Number of worker processes -#clusterLimit: 1 - -# Worker only mode -#onlyQueueProcessor: 1 +# Number of worker processes by type. +# The sum must not exceed the number of available cores. +#clusterLimits: +# web: 1 +# queue: 1 # Job concurrency per worker # deliverJobConcurrency: 128 diff --git a/packages/backend/src/boot/index.ts b/packages/backend/src/boot/index.ts index c78d888383..c09f3d2b37 100644 --- a/packages/backend/src/boot/index.ts +++ b/packages/backend/src/boot/index.ts @@ -19,7 +19,12 @@ const ev = new Xev(); * Init process */ export default async function () { - process.title = `Firefish (${cluster.isPrimary ? "master" : "worker"})`; + const mode = + process.env.mode && ["web", "queue"].includes(process.env.mode) + ? `(${process.env.mode})` + : ""; + const type = cluster.isPrimary ? "(master)" : "(worker)" + process.title = `Firefish ${mode} ${type}`; if (cluster.isPrimary || envOption.disableClustering) { await masterMain(); diff --git a/packages/backend/src/boot/master.ts b/packages/backend/src/boot/master.ts index 1a40d0f3da..bdb528d2f0 100644 --- a/packages/backend/src/boot/master.ts +++ b/packages/backend/src/boot/master.ts @@ -111,7 +111,7 @@ export async function masterMain() { bootLogger.succ("Firefish initialized"); if (!envOption.disableClustering) { - await spawnWorkers(config.clusterLimit); + await spawnWorkers(config.clusterLimits); } bootLogger.succ( @@ -120,7 +120,11 @@ export async function masterMain() { true, ); - if (!envOption.noDaemons && !config.onlyQueueProcessor) { + if ( + !envOption.noDaemons && + config.clusterLimits?.web && + config.clusterLimits?.web >= 1 + ) { import("../daemons/server-stats.js").then((x) => x.default()); import("../daemons/queue-stats.js").then((x) => x.default()); import("../daemons/janitor.js").then((x) => x.default()); @@ -136,7 +140,7 @@ function showEnvironment(): void { if (env !== "production") { logger.warn("The environment is not in production mode."); - logger.warn("DO NOT USE FOR PRODUCTION PURPOSE!", null, true); + logger.warn("DO NOT USE THIS IN PRODUCTION!", null, true); } } @@ -194,19 +198,33 @@ async function connectDb(): Promise { } } -async function spawnWorkers(limit = 1) { - const workers = Math.min(limit, os.cpus().length); - bootLogger.info(`Starting ${workers} worker${workers === 1 ? "" : "s"}...`); - await Promise.all([...Array(workers)].map(spawnWorker)); +async function spawnWorkers( + clusterLimits: Required, +): Promise { + const modes = ["web", "queue"]; + const cpus = os.cpus().length; + for (const mode of modes.filter((mode) => clusterLimits[mode] > cpus)) { + bootLogger.warn( + `configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`, + ); + } + + const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0); + const workers = new Array(total); + workers.fill("web", 0, clusterLimits?.web); + workers.fill("queue", clusterLimits?.web); + + bootLogger.info(`Starting ${clusterLimits?.web} web workers and ${clusterLimits?.queue} queue workers (total ${total})...`); + await Promise.all(workers.map((mode) => spawnWorker(mode))); bootLogger.succ("All workers started"); } -function spawnWorker(): Promise { +function spawnWorker(mode: "web" | "queue"): Promise { return new Promise((res) => { - const worker = cluster.fork(); + const worker = cluster.fork({ mode }); worker.on("message", (message) => { if (message === "listenFailed") { - bootLogger.error("The server Listen failed due to the previous error."); + bootLogger.error("The server listen failed due to the previous error."); process.exit(1); } if (message !== "ready") return; diff --git a/packages/backend/src/boot/worker.ts b/packages/backend/src/boot/worker.ts index 052c7397f3..236621b010 100644 --- a/packages/backend/src/boot/worker.ts +++ b/packages/backend/src/boot/worker.ts @@ -1,6 +1,7 @@ import cluster from "node:cluster"; import { initDb } from "../db/postgre.js"; import config from "@/config/index.js"; +import os from "node:os"; /** * Init worker process @@ -8,13 +9,20 @@ import config from "@/config/index.js"; export async function workerMain() { await initDb(); - if (!config.onlyQueueProcessor) { + if (!process.env.mode || process.env.mode === "web") { // start server await import("../server/index.js").then((x) => x.default()); } - // start job queue - import("../queue/index.js").then((x) => x.default()); + if (!process.env.mode || process.env.mode === "queue") { + // start job queue + import("../queue/index.js").then((x) => x.default()); + + if (process.env.mode === "queue") { + // if this is an exclusive queue worker, renice to have higher priority + os.setPriority(os.constants.priority.PRIORITY_BELOW_NORMAL); + } + } if (cluster.isWorker) { // Send a 'ready' message to parent process diff --git a/packages/backend/src/config/load.ts b/packages/backend/src/config/load.ts index d8dea793e9..2ff3309264 100644 --- a/packages/backend/src/config/load.ts +++ b/packages/backend/src/config/load.ts @@ -59,6 +59,23 @@ export default function load() { if (config.cacheServer && !config.cacheServer.prefix) config.cacheServer.prefix = mixin.hostname; + if (!config.clusterLimits) { + config.clusterLimits = { + web: 1, + queue: 1, + }; + } else { + config.clusterLimits = { + web: 1, + queue: 1, + ...config.clusterLimits, + }; + + if (config.clusterLimits.web! < 1 || config.clusterLimits.queue! < 1) { + throw new Error("Invalid cluster limits"); + } + } + return Object.assign(config, mixin); } diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index 52854db5a5..13d87fc19c 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -69,9 +69,10 @@ export type Source = { accesslog?: string; - clusterLimit?: number; - - onlyQueueProcessor?: boolean; + clusterLimits?: { + web?: number; + queue?: number; + }; cuid?: { length?: number;