diff --git a/.config/example.yml b/.config/example.yml index 16fa67142e..66ad869022 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -128,6 +128,7 @@ reservedUsernames: [ # Job concurrency per worker # deliverJobConcurrency: 128 # inboxJobConcurrency: 16 +# relationshipJobConcurrency: 16 # Job rate limiter # deliverJobPerSec: 128 diff --git a/packages/backend/package.json b/packages/backend/package.json index 47e9c415cb..3d9419eca7 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -53,7 +53,7 @@ "axios": "^1.3.2", "bcryptjs": "2.4.3", "blurhash": "1.1.5", - "bull": "4.10.4", + "bullmq": "^3.15.0", "cacheable-lookup": "7.0.0", "calckey-js": "workspace:*", "cbor": "8.1.0", @@ -143,7 +143,6 @@ "@swc/core": "^1.3.50", "@types/adm-zip": "^0.5.0", "@types/bcryptjs": "2.4.2", - "@types/bull": "3.15.9", "@types/cbor": "6.0.0", "@types/escape-regexp": "0.0.1", "@types/fluent-ffmpeg": "2.1.20", diff --git a/packages/backend/src/daemons/queue-stats.ts b/packages/backend/src/daemons/queue-stats.ts index 381b52a916..205728685d 100644 --- a/packages/backend/src/daemons/queue-stats.ts +++ b/packages/backend/src/daemons/queue-stats.ts @@ -1,5 +1,8 @@ import Xev from "xev"; import { deliverQueue, inboxQueue } from "../queue/queues.js"; +import * as Bull from "bullmq"; +import config from "@/config/index.js"; +import { QUEUE, baseQueueOptions } from "@/queue/const.js"; const ev = new Xev(); @@ -18,11 +21,20 @@ export default function () { let activeDeliverJobs = 0; let activeInboxJobs = 0; - deliverQueue.on("global:active", () => { + const deliverQueueEvents = new Bull.QueueEvents( + QUEUE.DELIVER, + baseQueueOptions(config, QUEUE.DELIVER), + ); + const inboxQueueEvents = new Bull.QueueEvents( + QUEUE.INBOX, + baseQueueOptions(config, QUEUE.INBOX), + ); + + deliverQueueEvents.on("active", () => { activeDeliverJobs++; }); - inboxQueue.on("global:active", () => { + inboxQueueEvents.on("active", () => { activeInboxJobs++; }); diff --git a/packages/backend/src/misc/id/aid.ts b/packages/backend/src/misc/id/aid.ts deleted file mode 100644 index a12360360b..0000000000 --- a/packages/backend/src/misc/id/aid.ts +++ /dev/null @@ -1,25 +0,0 @@ -// AID -// 長さ8の[2000年1月1日からの経過ミリ秒をbase36でエンコードしたもの] + 長さ2の[ノイズ文字列] - -import * as crypto from "node:crypto"; - -const TIME2000 = 946684800000; -let counter = crypto.randomBytes(2).readUInt16LE(0); - -function getTime(time: number) { - time = time - TIME2000; - if (time < 0) time = 0; - - return time.toString(36).padStart(8, "0"); -} - -function getNoise() { - return counter.toString(36).padStart(2, "0").slice(-2); -} - -export function genAid(date: Date): string { - const t = date.getTime(); - if (isNaN(t)) throw "Failed to create AID: Invalid Date"; - counter++; - return getTime(t) + getNoise(); -} diff --git a/packages/backend/src/misc/id/meid.ts b/packages/backend/src/misc/id/meid.ts deleted file mode 100644 index ee78eb8d14..0000000000 --- a/packages/backend/src/misc/id/meid.ts +++ /dev/null @@ -1,26 +0,0 @@ -const CHARS = "0123456789abcdef"; - -function getTime(time: number) { - if (time < 0) time = 0; - if (time === 0) { - return CHARS[0]; - } - - time += 0x800000000000; - - return time.toString(16).padStart(12, CHARS[0]); -} - -function getRandom() { - let str = ""; - - for (let i = 0; i < 12; i++) { - str += CHARS[Math.floor(Math.random() * CHARS.length)]; - } - - return str; -} - -export function genMeid(date: Date): string { - return getTime(date.getTime()) + getRandom(); -} diff --git a/packages/backend/src/misc/id/meidg.ts b/packages/backend/src/misc/id/meidg.ts deleted file mode 100644 index 4fd39a8b41..0000000000 --- a/packages/backend/src/misc/id/meidg.ts +++ /dev/null @@ -1,28 +0,0 @@ -const CHARS = "0123456789abcdef"; - -// 4bit Fixed hex value 'g' -// 44bit UNIX Time ms in Hex -// 48bit Random value in Hex - -function getTime(time: number) { - if (time < 0) time = 0; - if (time === 0) { - return CHARS[0]; - } - - return time.toString(16).padStart(11, CHARS[0]); -} - -function getRandom() { - let str = ""; - - for (let i = 0; i < 12; i++) { - str += CHARS[Math.floor(Math.random() * CHARS.length)]; - } - - return str; -} - -export function genMeidg(date: Date): string { - return `g${getTime(date.getTime())}${getRandom()}`; -} diff --git a/packages/backend/src/misc/id/object-id.ts b/packages/backend/src/misc/id/object-id.ts deleted file mode 100644 index 45822f0acc..0000000000 --- a/packages/backend/src/misc/id/object-id.ts +++ /dev/null @@ -1,26 +0,0 @@ -const CHARS = "0123456789abcdef"; - -function getTime(time: number) { - if (time < 0) time = 0; - if (time === 0) { - return CHARS[0]; - } - - time = Math.floor(time / 1000); - - return time.toString(16).padStart(8, CHARS[0]); -} - -function getRandom() { - let str = ""; - - for (let i = 0; i < 16; i++) { - str += CHARS[Math.floor(Math.random() * CHARS.length)]; - } - - return str; -} - -export function genObjectId(date: Date): string { - return getTime(date.getTime()) + getRandom(); -} diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts new file mode 100644 index 0000000000..48304b30d2 --- /dev/null +++ b/packages/backend/src/queue/const.ts @@ -0,0 +1,32 @@ +import type * as Bull from "bullmq"; + +export const QUEUE = { + DELIVER: "deliver", + INBOX: "inbox", + SYSTEM: "system", + ENDED_POLL_NOTIFICATION: "endedPollNotification", + DB: "db", + OBJECT_STORAGE: "objectStorage", + WEBHOOK_DELIVER: "webhookDeliver", +}; + +export function baseQueueOptions( + config: any, + queueName: typeof QUEUE[keyof typeof QUEUE], +): Bull.QueueOptions { + return { + connection: { + port: config.redisForJobQueue.port, + host: config.redisForJobQueue.host, + family: + config.redisForJobQueue.family == null + ? 0 + : config.redisForJobQueue.family, + password: config.redisForJobQueue.pass, + db: config.redisForJobQueue.db ?? 0, + }, + prefix: config.redisForJobQueue.prefix + ? `${config.redisForJobQueue.prefix}:queue:${queueName}` + : `queue:${queueName}`, + }; +} diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts deleted file mode 100644 index ae3532cdae..0000000000 --- a/packages/backend/src/queue/get-job-info.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type Bull from "bull"; - -export function getJobInfo(job: Bull.Job, increment = false) { - const age = Date.now() - job.timestamp; - - const formated = - age > 60000 - ? `${Math.floor(age / 1000 / 60)}m` - : age > 10000 - ? `${Math.floor(age / 1000)}s` - : `${age}ms`; - - // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする - const currentAttempts = job.attemptsMade + (increment ? 1 : 0); - const maxAttempts = job.opts ? job.opts.attempts : 0; - - return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; -} diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index a84a446fe7..2a2d871882 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -1,6 +1,9 @@ import type httpSignature from "@peertube/http-signature"; import { v4 as uuid } from "uuid"; +import * as Bull from "bullmq"; +import { QUEUE, baseQueueOptions } from "./const.js"; + import config from "@/config/index.js"; import type { DriveFile } from "@/models/entities/drive-file.js"; import type { IActivity } from "@/remote/activitypub/type.js"; @@ -16,7 +19,6 @@ import processWebhookDeliver from "./processors/webhook-deliver.js"; import processBackground from "./processors/background/index.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { queueLogger } from "./logger.js"; -import { getJobInfo } from "./get-job-info.js"; import { systemQueue, dbQueue, @@ -29,6 +31,16 @@ import { } from "./queues.js"; import type { ThinUser } from "./types.js"; +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function httpRelatedBackoff(attemptsMade: number) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} + function renderError(e: Error): any { return { stack: e.stack, @@ -37,127 +49,78 @@ function renderError(e: Error): any { }; } -const systemLogger = queueLogger.createSubLogger("system"); -const deliverLogger = queueLogger.createSubLogger("deliver"); -const webhookLogger = queueLogger.createSubLogger("webhook"); -const inboxLogger = queueLogger.createSubLogger("inbox"); -const dbLogger = queueLogger.createSubLogger("db"); -const objectStorageLogger = queueLogger.createSubLogger("objectStorage"); +const queueEventLogger = queueLogger.createSubLogger("queueEvent"); +const queues = [ + { queue: systemQueue, logger: systemLogger }, + { queue: dbQueue, logger: dbLogger }, + { queue: deliverQueue, logger: deliverLogger }, + { queue: inboxQueue, logger: inboxLogger }, + { queue: objectStorageQueue, logger: objectStorageLogger }, + { queue: endedPollNotificationQueue, logger: pollLogger }, + { queue: webhookDeliverQueue, logger: webhookLogger }, + { queue: backgroundQueue, logger: systemLogger }, +]; -systemQueue - .on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => systemLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - systemLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - systemLogger.warn(`failed(${err}) id=${job.id}`, { +queues.forEach(({ queue, logger }) => { + const queueName = queue.name; + const queueEvents = new Bull.QueueEvents(queueName); + queueEvents.on("waiting", (jobId) => queueEventLogger.debug(`waiting id=${jobId}`)); + queueEvents.on("active", (job) => queueEventLogger.debug(`active id=${job.jobId}`)); + queueEvents.on("completed", (job, result) => + queueEventLogger.debug(`completed(${result}) id=${job.jobId}`), + ); + queueEvents.on("failed", (job, err) => + queueEventLogger.warn(`failed(${err}) id=${job.jobId}`, { job, e: renderError(err), }), - ) - .on("error", (job: any, err: Error) => - systemLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => systemLogger.warn(`stalled id=${job.id}`)); - -deliverQueue - .on("waiting", (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => - deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`), - ) - .on("completed", (job, result) => - deliverLogger.debug( - `completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`, - ), - ) - .on("failed", (job, err) => - deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`), - ) - .on("error", (job: any, err: Error) => - deliverLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), ); - -inboxQueue - .on("waiting", (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on("completed", (job, result) => - inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`), - ) - .on("failed", (job, err) => - inboxLogger.warn( - `failed(${err}) ${getJobInfo(job)} activity=${ - job.data.activity ? job.data.activity.id : "none" - }`, - { job, e: renderError(err) }, - ), - ) - .on("error", (job: any, err: Error) => - inboxLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - inboxLogger.warn( - `stalled ${getJobInfo(job)} activity=${ - job.data.activity ? job.data.activity.id : "none" - }`, - ), - ); - -dbQueue - .on("waiting", (jobId) => dbLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => dbLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - dbLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }), - ) - .on("error", (job: any, err: Error) => - dbLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => dbLogger.warn(`stalled id=${job.id}`)); - -objectStorageQueue - .on("waiting", (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - objectStorageLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { - job, + queueEvents.on("error", (err) => + queueEventLogger.error(`error(${err})`, { e: renderError(err), }), - ) - .on("error", (job: any, err: Error) => - objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); - -webhookDeliverQueue - .on("waiting", (jobId) => webhookLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => - webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`), - ) - .on("completed", (job, result) => - webhookLogger.debug( - `completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`, - ), - ) - .on("failed", (job, err) => - webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`), - ) - .on("error", (job: any, err: Error) => - webhookLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), ); + queueEvents.on("stalled", (job) => queueEventLogger.warn(`stalled id=${job.jobId}`)); +}); -export function deliver(user: ThinUser, content: unknown, to: string | null) { +// const processPair = ({ queue, processor }: { queue: Bull.Queue; processor: Function }) => { +// new Bull.Worker(queue.name, (job) => processor(job), { +// concurrency: 1, +// ...baseQueueOptions, + +// }); +// }; + +// const processors = [ +// { queue: systemQueue, processor: processSystemQueue }, +// { queue: dbQueue, processor: processDb }, +// { queue: deliverQueue, processor: processDeliver }, +// { queue: inboxQueue, processor: processInbox }, +// { queue: objectStorageQueue, processor: processObjectStorage }, +// { queue: endedPollNotificationQueue, processor: endedPollNotification }, +// { queue: webhookDeliverQueue, processor: processWebhookDeliver }, +// { queue: backgroundQueue, processor: processBackground }, +// ]; + +// processors.forEach(processPair); + +// Make queue workers for each queue + +const systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => processSystemQueue(job), { + concurrency: 10, + limiter: { + duration: 1000, + max: 1, + }, +}); + + +export function deliver( + user: ThinUser, + content: IActivity | null, + to: string | null, + isSharedInbox: boolean, +) { if (content == null) return null; if (to == null) return null; @@ -167,15 +130,15 @@ export function deliver(user: ThinUser, content: unknown, to: string | null) { }, content, to, + isSharedInbox, }; - return deliverQueue.add(data, { + return deliverQueue.add(to, data, { attempts: config.deliverJobMaxAttempts || 12, - timeout: 1 * 60 * 1000, // 1min backoff: { type: "apBackoff", }, - removeOnComplete: true, + removeOnComplete: 1000, removeOnFail: true, }); } @@ -528,8 +491,7 @@ export default function () { "tickCharts", {}, { - repeat: { cron: "55 * * * *" }, - removeOnComplete: true, + repeat: { pattern: "55 * * * *" }, }, ); @@ -537,7 +499,7 @@ export default function () { "resyncCharts", {}, { - repeat: { cron: "0 0 * * *" }, + repeat: { pattern: "0 0 * * *" }, removeOnComplete: true, }, ); @@ -546,7 +508,7 @@ export default function () { "cleanCharts", {}, { - repeat: { cron: "0 0 * * *" }, + repeat: { pattern: "0 0 * * *" }, removeOnComplete: true, }, ); @@ -555,7 +517,7 @@ export default function () { "clean", {}, { - repeat: { cron: "0 0 * * *" }, + repeat: { pattern: "0 0 * * *" }, removeOnComplete: true, }, ); @@ -564,7 +526,7 @@ export default function () { "checkExpiredMutings", {}, { - repeat: { cron: "*/5 * * * *" }, + repeat: { pattern: "*/5 * * * *" }, removeOnComplete: true, }, ); @@ -572,7 +534,10 @@ export default function () { systemQueue.add( "setLocalEmojiSizes", {}, - { removeOnComplete: true, removeOnFail: true }, + { + removeOnComplete: true, + removeOnFail: true, + }, ); processSystemQueue(systemQueue); @@ -582,10 +547,10 @@ export function destroy() { deliverQueue.once("cleaned", (jobs, status) => { deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - deliverQueue.clean(0, "delayed"); + deliverQueue.drain(); inboxQueue.once("cleaned", (jobs, status) => { inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - inboxQueue.clean(0, "delayed"); + inboxQueue.drain(); } diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts index 0686fe9cd3..533ed77953 100644 --- a/packages/backend/src/queue/initialize.ts +++ b/packages/backend/src/queue/initialize.ts @@ -1,31 +1,35 @@ -import Bull from "bull"; +import * as Bull from "bullmq"; import config from "@/config/index.js"; +import { QUEUE, baseQueueOptions } from "@/queue/const.js"; -export function initialize(name: string, limitPerSec = -1) { - return new Bull(name, { - redis: { +export function initialize(name: string) { + return new Bull.Queue(name, { + connection: { port: config.redis.port, host: config.redis.host, family: config.redis.family == null ? 0 : config.redis.family, password: config.redis.pass, db: config.redis.db || 0, }, - prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : "queue", - limiter: - limitPerSec > 0 - ? { - max: limitPerSec, - duration: 1000, - } - : undefined, - settings: { - stalledInterval: 60, - maxStalledCount: 2, - backoffStrategies: { - apBackoff, - }, - }, }); + // return new Bull(name, { + // redis: + // prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : "queue", + // limiter: + // limitPerSec > 0 + // ? { + // max: limitPerSec, + // duration: 1000, + // } + // : undefined, + // settings: { + // stalledInterval: 60, + // maxStalledCount: 2, + // backoffStrategies: { + // apBackoff, + // }, + // }, + // }); } // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 diff --git a/packages/backend/src/queue/processors/background/index-all-notes.ts b/packages/backend/src/queue/processors/background/index-all-notes.ts index 10c332aa3b..9027cd62a2 100644 --- a/packages/backend/src/queue/processors/background/index-all-notes.ts +++ b/packages/backend/src/queue/processors/background/index-all-notes.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { queueLogger } from "../../logger.js"; import { Notes } from "@/models/index.js"; @@ -11,7 +11,6 @@ const logger = queueLogger.createSubLogger("index-all-notes"); export default async function indexAllNotes( job: Bull.Job>, - done: () => void, ): Promise { logger.info("Indexing all notes..."); @@ -47,7 +46,7 @@ export default async function indexAllNotes( } if (notes.length === 0) { - job.progress(100); + job.updateProgress(100); running = false; break; } @@ -70,7 +69,7 @@ export default async function indexAllNotes( indexedCount += chunk.length; const pct = (indexedCount / total) * 100; job.update({ indexedCount, cursor, total }); - job.progress(+pct.toFixed(1)); + job.updateProgress(+pct.toFixed(1)); logger.info(`Indexed notes ${indexedCount}/${total ? total : "?"}`); } cursor = notes[notes.length - 1].id; @@ -81,6 +80,5 @@ export default async function indexAllNotes( } } - done(); logger.info("All notes have been indexed."); } diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts index 6674f954b0..db3feea021 100644 --- a/packages/backend/src/queue/processors/background/index.ts +++ b/packages/backend/src/queue/processors/background/index.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import indexAllNotes from "./index-all-notes.js"; const jobs = { diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index 65471a559f..1cba0ea1f7 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -13,7 +13,7 @@ import { toPuny } from "@/misc/convert-host.js"; import { StatusError } from "@/misc/fetch.js"; import { shouldSkipInstance } from "@/misc/skipped-instances.js"; import type { DeliverJobData } from "@/queue/types.js"; -import type Bull from "bull"; +import * as Bull from "bullmq"; const logger = new Logger("deliver"); @@ -27,7 +27,7 @@ export default async (job: Bull.Job) => { try { if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { - logger.debug(`delivering ${latest}`); + logger.debug(`Delivering ${latest}`); } await request(job.data.user, job.data.to, job.data.content); @@ -72,7 +72,9 @@ export default async (job: Bull.Job) => { } // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Bull.UnrecoverableError( + `${res.statusCode} ${res.statusMessage}`, + ); } else { // DNS error, socket error, timeout ... throw res; diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/processors/ended-poll-notification.ts index 9fe57d8da3..64655f10d2 100644 --- a/packages/backend/src/queue/processors/ended-poll-notification.ts +++ b/packages/backend/src/queue/processors/ended-poll-notification.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { In } from "typeorm"; import { Notes, Polls, PollVotes } from "@/models/index.js"; import { queueLogger } from "../logger.js"; @@ -9,11 +9,9 @@ const logger = queueLogger.createSubLogger("ended-poll-notification"); export async function endedPollNotification( job: Bull.Job, - done: any, ): Promise { const note = await Notes.findOneBy({ id: job.data.noteId }); if (note == null || !note.hasPoll) { - done(); return; } @@ -31,6 +29,4 @@ export async function endedPollNotification( noteId: note.id, }); } - - done(); } diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts index ca063a6f3f..1f638fc32d 100644 --- a/packages/backend/src/queue/processors/inbox.ts +++ b/packages/backend/src/queue/processors/inbox.ts @@ -1,5 +1,5 @@ import { URL } from "node:url"; -import type Bull from "bull"; +import type * as Bull from "bullmq"; import httpSignature from "@peertube/http-signature"; import perform from "@/remote/activitypub/perform.js"; import Logger from "@/services/logger.js"; diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts index fdfe05d1a6..039a1c3139 100644 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { queueLogger } from "../../logger.js"; import { deleteFileSync } from "@/services/drive/delete-file.js"; @@ -9,7 +9,6 @@ const logger = queueLogger.createSubLogger("clean-remote-files"); export default async function cleanRemoteFiles( job: Bull.Job>, - done: any, ): Promise { logger.info("Deleting cached remote files..."); @@ -30,7 +29,7 @@ export default async function cleanRemoteFiles( }); if (files.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -45,9 +44,8 @@ export default async function cleanRemoteFiles( isLink: false, }); - job.progress(deletedCount / total); + job.updateProgress(deletedCount / total); } logger.succ("All cahced remote files has been deleted."); - done(); } diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts index 174aa1906c..1cc64d4cae 100644 --- a/packages/backend/src/queue/processors/object-storage/delete-file.ts +++ b/packages/backend/src/queue/processors/object-storage/delete-file.ts @@ -1,5 +1,5 @@ import type { ObjectStorageFileJobData } from "@/queue/types.js"; -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { deleteObjectStorageFile } from "@/services/drive/delete-file.js"; export default async (job: Bull.Job) => { diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts index 5f90d4cd09..9cbb1b0747 100644 --- a/packages/backend/src/queue/processors/object-storage/index.ts +++ b/packages/backend/src/queue/processors/object-storage/index.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import type { ObjectStorageJobData } from "@/queue/types.js"; import deleteFile from "./delete-file.js"; import cleanRemoteFiles from "./clean-remote-files.js"; diff --git a/packages/backend/src/queue/processors/system/check-expired-mutings.ts b/packages/backend/src/queue/processors/system/check-expired-mutings.ts index a482d0218a..10965f744d 100644 --- a/packages/backend/src/queue/processors/system/check-expired-mutings.ts +++ b/packages/backend/src/queue/processors/system/check-expired-mutings.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { In } from "typeorm"; import { Mutings } from "@/models/index.js"; import { queueLogger } from "../../logger.js"; @@ -8,7 +8,6 @@ const logger = queueLogger.createSubLogger("check-expired-mutings"); export async function checkExpiredMutings( job: Bull.Job>, - done: any, ): Promise { logger.info("Checking expired mutings..."); @@ -29,5 +28,4 @@ export async function checkExpiredMutings( } logger.succ("All expired mutings checked."); - done(); } diff --git a/packages/backend/src/queue/processors/system/clean-charts.ts b/packages/backend/src/queue/processors/system/clean-charts.ts index dde5d95fe3..bc980130ca 100644 --- a/packages/backend/src/queue/processors/system/clean-charts.ts +++ b/packages/backend/src/queue/processors/system/clean-charts.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { queueLogger } from "../../logger.js"; import { @@ -20,7 +20,6 @@ const logger = queueLogger.createSubLogger("clean-charts"); export async function cleanCharts( job: Bull.Job>, - done: any, ): Promise { logger.info("Clean charts..."); @@ -40,5 +39,4 @@ export async function cleanCharts( ]); logger.succ("All charts successfully cleaned."); - done(); } diff --git a/packages/backend/src/queue/processors/system/clean.ts b/packages/backend/src/queue/processors/system/clean.ts index fbd45b0bb9..f75f480521 100644 --- a/packages/backend/src/queue/processors/system/clean.ts +++ b/packages/backend/src/queue/processors/system/clean.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { LessThan } from "typeorm"; import { UserIps } from "@/models/index.js"; @@ -8,7 +8,6 @@ const logger = queueLogger.createSubLogger("clean"); export async function clean( job: Bull.Job>, - done: any, ): Promise { logger.info("Cleaning..."); @@ -17,5 +16,4 @@ export async function clean( }); logger.succ("Cleaned."); - done(); } diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts index 53321de5f9..c930a4073b 100644 --- a/packages/backend/src/queue/processors/system/index.ts +++ b/packages/backend/src/queue/processors/system/index.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { tickCharts } from "./tick-charts.js"; import { resyncCharts } from "./resync-charts.js"; import { cleanCharts } from "./clean-charts.js"; @@ -13,14 +13,10 @@ const jobs = { checkExpiredMutings, clean, setLocalEmojiSizes, -} as Record< - string, - | Bull.ProcessCallbackFunction> - | Bull.ProcessPromiseFunction> ->; +}; export default function (dbQueue: Bull.Queue>) { for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); + dbQueue.add(k, { v }); } } diff --git a/packages/backend/src/queue/processors/system/local-emoji-size.ts b/packages/backend/src/queue/processors/system/local-emoji-size.ts index d696bbd863..4255a53f3d 100644 --- a/packages/backend/src/queue/processors/system/local-emoji-size.ts +++ b/packages/backend/src/queue/processors/system/local-emoji-size.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { IsNull } from "typeorm"; import { Emojis } from "@/models/index.js"; @@ -9,7 +9,6 @@ const logger = queueLogger.createSubLogger("local-emoji-size"); export async function setLocalEmojiSizes( _job: Bull.Job>, - done: any, ): Promise { logger.info("Setting sizes of local emojis..."); @@ -38,5 +37,4 @@ export async function setLocalEmojiSizes( } logger.succ("Done."); - done(); } diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts index dbea0df733..788b9ecfcc 100644 --- a/packages/backend/src/queue/processors/system/resync-charts.ts +++ b/packages/backend/src/queue/processors/system/resync-charts.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { queueLogger } from "../../logger.js"; import { driveChart, notesChart, usersChart } from "@/services/chart/index.js"; @@ -7,7 +7,6 @@ const logger = queueLogger.createSubLogger("resync-charts"); export async function resyncCharts( job: Bull.Job>, - done: any, ): Promise { logger.info("Resync charts..."); @@ -20,5 +19,4 @@ export async function resyncCharts( ]); logger.succ("All charts successfully resynced."); - done(); } diff --git a/packages/backend/src/queue/processors/system/tick-charts.ts b/packages/backend/src/queue/processors/system/tick-charts.ts index 33eed8a596..2c0bb44918 100644 --- a/packages/backend/src/queue/processors/system/tick-charts.ts +++ b/packages/backend/src/queue/processors/system/tick-charts.ts @@ -1,4 +1,4 @@ -import type Bull from "bull"; +import type * as Bull from "bullmq"; import { queueLogger } from "../../logger.js"; import { @@ -20,7 +20,6 @@ const logger = queueLogger.createSubLogger("tick-charts"); export async function tickCharts( job: Bull.Job>, - done: any, ): Promise { logger.info("Tick charts..."); @@ -40,5 +39,4 @@ export async function tickCharts( ]); logger.succ("All charts successfully ticked."); - done(); } diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/processors/webhook-deliver.ts index 0a54ae7d89..0731002a63 100644 --- a/packages/backend/src/queue/processors/webhook-deliver.ts +++ b/packages/backend/src/queue/processors/webhook-deliver.ts @@ -1,5 +1,5 @@ import { URL } from "node:url"; -import type Bull from "bull"; +import * as Bull from "bullmq"; import Logger from "@/services/logger.js"; import type { WebhookDeliverJobData } from "../types.js"; import { getResponse, StatusError } from "@/misc/fetch.js"; @@ -57,7 +57,9 @@ export default async (job: Bull.Job) => { } // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Bull.UnrecoverableError( + `${res.statusCode} ${res.statusMessage}`, + ); } else { // DNS error, socket error, timeout ... throw res; diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 6b0eb2de42..68879db42d 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -14,18 +14,18 @@ export const endedPollNotificationQueue = initializeQueue("endedPollNotification"); export const deliverQueue = initializeQueue( "deliver", - config.deliverJobPerSec || 128, + // config.deliverJobPerSec || 128, ); export const inboxQueue = initializeQueue( "inbox", - config.inboxJobPerSec || 16, + // config.inboxJobPerSec || 16, ); -export const dbQueue = initializeQueue("db", 256); +export const dbQueue = initializeQueue("db", /*256*/); export const objectStorageQueue = initializeQueue("objectStorage"); export const webhookDeliverQueue = initializeQueue( "webhookDeliver", - 64, + // 64, ); export const backgroundQueue = initializeQueue>("bg"); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index b72b127894..13805ccb30 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -12,6 +12,8 @@ export type DeliverJobData = { content: unknown; /** inbox URL to deliver */ to: string; + /** whether it is sharedInbox */ + isSharedInbox: boolean; }; export type InboxJobData = { diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 400e047774..5e5630728b 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -78,7 +78,7 @@ export default class DeliverManager { public async execute() { if (!Users.isLocalUser(this.actor)) return; - const inboxes = new Set(); + const inboxes = new Map(); /* build inbox list @@ -90,7 +90,7 @@ export default class DeliverManager { // followers deliver // TODO: SELECT DISTINCT ON ("followerSharedInbox") "followerSharedInbox" みたいな問い合わせにすればよりパフォーマンス向上できそう // ただ、sharedInboxがnullなリモートユーザーも稀におり、その対応ができなさそう? - const followers = (await Followings.find({ + const followers = (await this.followingsRepository.find({ where: { followeeId: this.actor.id, followerHost: Not(IsNull()), @@ -105,8 +105,8 @@ export default class DeliverManager { }[]; for (const following of followers) { - const inbox = following.followerSharedInbox || following.followerInbox; - inboxes.add(inbox); + const inbox = following.followerSharedInbox ?? following.followerInbox; + inboxes.set(inbox, following.followerSharedInbox === null); } } @@ -120,21 +120,12 @@ export default class DeliverManager { // check that they actually have an inbox recipe.to.inbox != null, ) - .forEach((recipe) => inboxes.add(recipe.to.inbox!)); - - const instancesToSkip = await skippedInstances( - // get (unique) list of hosts - Array.from( - new Set(Array.from(inboxes).map((inbox) => new URL(inbox).host)), - ), - ); + .forEach((recipe) => inboxes.set(recipe.to.inbox!, false)); // deliver for (const inbox of inboxes) { - // skip instances as indicated - if (instancesToSkip.includes(new URL(inbox).host)) continue; - - deliver(this.actor, this.activity, inbox); + // inbox[0]: inbox, inbox[1]: whether it is sharedInbox + deliver(this.actor, this.activity, inbox[0], inbox[1]); } } } diff --git a/packages/backend/src/server/api/common/read-messaging-message.ts b/packages/backend/src/server/api/common/read-messaging-message.ts index fc22c843af..d2d56986a6 100644 --- a/packages/backend/src/server/api/common/read-messaging-message.ts +++ b/packages/backend/src/server/api/common/read-messaging-message.ts @@ -170,10 +170,10 @@ export async function deliverReadActivity( undefined, contents, ); - deliver(user, renderActivity(collection), recipient.inbox); + deliver(user, renderActivity(collection), recipient.inbox, false); } else { for (const content of contents) { - deliver(user, renderActivity(content), recipient.inbox); + deliver(user, renderActivity(content), recipient.inbox, false); } } } diff --git a/packages/backend/src/server/api/endpoints/admin/resolve-abuse-user-report.ts b/packages/backend/src/server/api/endpoints/admin/resolve-abuse-user-report.ts index c876a21984..90761dfc76 100644 --- a/packages/backend/src/server/api/endpoints/admin/resolve-abuse-user-report.ts +++ b/packages/backend/src/server/api/endpoints/admin/resolve-abuse-user-report.ts @@ -36,6 +36,7 @@ export default define(meta, paramDef, async (ps, me) => { actor, renderActivity(renderFlag(actor, [targetUser.uri!], report.comment)), targetUser.inbox, + false, ); } diff --git a/packages/backend/src/server/api/endpoints/i/import-posts.ts b/packages/backend/src/server/api/endpoints/i/import-posts.ts index 3adba0514e..9b2cccb0ac 100644 --- a/packages/backend/src/server/api/endpoints/i/import-posts.ts +++ b/packages/backend/src/server/api/endpoints/i/import-posts.ts @@ -51,5 +51,5 @@ export default define(meta, paramDef, async (ps, user) => { if (file == null) throw new ApiError(meta.errors.noSuchFile); if (file.size === 0) throw new ApiError(meta.errors.emptyFile); - createImportPostsJob(user, file.id, ps.signatureCheck); + createImportPostsJob(user, file.id, ps.signatureCheck || false); }); diff --git a/packages/backend/src/server/api/endpoints/notes/edit.ts b/packages/backend/src/server/api/endpoints/notes/edit.ts index 66db6f644f..b934bfea2d 100644 --- a/packages/backend/src/server/api/endpoints/notes/edit.ts +++ b/packages/backend/src/server/api/endpoints/notes/edit.ts @@ -592,7 +592,7 @@ export default define(meta, paramDef, async (ps, user) => { } if (publishing) { - index(note); + index(note, true); // Publish update event for the updated note details publishNoteStream(note.id, "updated", { diff --git a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts index 0558aa1b8f..19e45bec08 100644 --- a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts +++ b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts @@ -176,6 +176,7 @@ export default define(meta, paramDef, async (ps, user) => { user, renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox, + false, ); } diff --git a/packages/backend/src/server/api/endpoints/notes/search-by-tag.ts b/packages/backend/src/server/api/endpoints/notes/search-by-tag.ts index f988acaa51..5cdaea4094 100644 --- a/packages/backend/src/server/api/endpoints/notes/search-by-tag.ts +++ b/packages/backend/src/server/api/endpoints/notes/search-by-tag.ts @@ -93,7 +93,7 @@ export default define(meta, paramDef, async (ps, me) => { try { if (ps.tag) { - if (!safeForSql(normalizeForSearch(ps.tag))) throw "Injection"; + if (!safeForSql(normalizeForSearch(ps.tag))) throw new Error("Injection"); query.andWhere(`'{"${normalizeForSearch(ps.tag)}"}' <@ note.tags`); } else { query.andWhere( @@ -103,7 +103,7 @@ export default define(meta, paramDef, async (ps, me) => { new Brackets((qb) => { for (const tag of tags) { if (!safeForSql(normalizeForSearch(ps.tag))) - throw "Injection"; + throw new Error("Injection"); qb.andWhere(`'{"${normalizeForSearch(tag)}"}' <@ note.tags`); } }), diff --git a/packages/backend/src/services/blocking/create.ts b/packages/backend/src/services/blocking/create.ts index 60bd6e9431..b47a7fa185 100644 --- a/packages/backend/src/services/blocking/create.ts +++ b/packages/backend/src/services/blocking/create.ts @@ -43,7 +43,7 @@ export default async function (blocker: User, blockee: User) { if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) { const content = renderActivity(renderBlock(blocking)); - deliver(blocker, content, blockee.inbox); + deliver(blocker, content, blockee.inbox, false); } } @@ -91,7 +91,7 @@ async function cancelRequest(follower: User, followee: User) { const content = renderActivity( renderUndo(renderFollow(follower, followee), follower), ); - deliver(follower, content, followee.inbox); + deliver(follower, content, followee.inbox, false); } // リモートからフォローリクエストを受けていたらReject送信 @@ -102,7 +102,7 @@ async function cancelRequest(follower: User, followee: User) { followee, ), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); } } @@ -147,7 +147,7 @@ async function unFollow(follower: User, followee: User) { const content = renderActivity( renderUndo(renderFollow(follower, followee), follower), ); - deliver(follower, content, followee.inbox); + deliver(follower, content, followee.inbox, false); } } diff --git a/packages/backend/src/services/blocking/delete.ts b/packages/backend/src/services/blocking/delete.ts index 67f1e76f0e..b1ae356fda 100644 --- a/packages/backend/src/services/blocking/delete.ts +++ b/packages/backend/src/services/blocking/delete.ts @@ -32,6 +32,6 @@ export default async function (blocker: CacheableUser, blockee: CacheableUser) { // deliver if remote bloking if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) { const content = renderActivity(renderUndo(renderBlock(blocking), blocker)); - deliver(blocker, content, blockee.inbox); + deliver(blocker, content, blockee.inbox, false); } } diff --git a/packages/backend/src/services/following/create.ts b/packages/backend/src/services/following/create.ts index 3a77676b38..45190ac965 100644 --- a/packages/backend/src/services/following/create.ts +++ b/packages/backend/src/services/following/create.ts @@ -199,7 +199,7 @@ export default async function ( const content = renderActivity( renderReject(renderFollow(follower, followee, requestId), followee), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); return; } else if ( Users.isRemoteUser(follower) && @@ -278,6 +278,6 @@ export default async function ( const content = renderActivity( renderAccept(renderFollow(follower, followee, requestId), followee), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); } } diff --git a/packages/backend/src/services/following/delete.ts b/packages/backend/src/services/following/delete.ts index fae4bd3cec..ad17b0e488 100644 --- a/packages/backend/src/services/following/delete.ts +++ b/packages/backend/src/services/following/delete.ts @@ -72,7 +72,7 @@ export default async function ( const content = renderActivity( renderUndo(renderFollow(follower, followee), follower), ); - deliver(follower, content, followee.inbox); + deliver(follower, content, followee.inbox, false); } if (Users.isLocalUser(followee) && Users.isRemoteUser(follower)) { @@ -80,7 +80,7 @@ export default async function ( const content = renderActivity( renderReject(renderFollow(follower, followee), followee), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); } } diff --git a/packages/backend/src/services/following/reject.ts b/packages/backend/src/services/following/reject.ts index 7464219bf6..c5eb69a235 100644 --- a/packages/backend/src/services/following/reject.ts +++ b/packages/backend/src/services/following/reject.ts @@ -109,7 +109,7 @@ async function deliverReject(followee: Local, follower: Remote) { followee, ), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); } /** diff --git a/packages/backend/src/services/following/requests/accept.ts b/packages/backend/src/services/following/requests/accept.ts index 6aa17b09ad..2cb433eb78 100644 --- a/packages/backend/src/services/following/requests/accept.ts +++ b/packages/backend/src/services/following/requests/accept.ts @@ -40,7 +40,7 @@ export default async function ( followee, ), ); - deliver(followee, content, follower.inbox); + deliver(followee, content, follower.inbox, false); } Users.pack(followee.id, followee, { diff --git a/packages/backend/src/services/following/requests/cancel.ts b/packages/backend/src/services/following/requests/cancel.ts index 00daae380d..28549f2d5c 100644 --- a/packages/backend/src/services/following/requests/cancel.ts +++ b/packages/backend/src/services/following/requests/cancel.ts @@ -24,7 +24,7 @@ export default async function ( if (Users.isLocalUser(follower)) { // 本来このチェックは不要だけどTSに怒られるので - deliver(follower, content, followee.inbox); + deliver(follower, content, followee.inbox, false); } } diff --git a/packages/backend/src/services/following/requests/create.ts b/packages/backend/src/services/following/requests/create.ts index 50dbd9b3be..e206b86950 100644 --- a/packages/backend/src/services/following/requests/create.ts +++ b/packages/backend/src/services/following/requests/create.ts @@ -87,6 +87,6 @@ export default async function ( requestId ?? `${config.url}/follows/${followRequest.id}`, ), ); - deliver(follower, content, followee.inbox); + deliver(follower, content, followee.inbox, false); } } diff --git a/packages/backend/src/services/messages/create.ts b/packages/backend/src/services/messages/create.ts index 506f299966..c41c02d3e9 100644 --- a/packages/backend/src/services/messages/create.ts +++ b/packages/backend/src/services/messages/create.ts @@ -142,7 +142,7 @@ export async function createMessage( renderCreate(await renderNote(note, false, true), note), ); - deliver(user, activity, recipientUser.inbox); + deliver(user, activity, recipientUser.inbox, false); } return messageObj; } diff --git a/packages/backend/src/services/messages/delete.ts b/packages/backend/src/services/messages/delete.ts index 77caba80ce..744de74d04 100644 --- a/packages/backend/src/services/messages/delete.ts +++ b/packages/backend/src/services/messages/delete.ts @@ -42,7 +42,7 @@ async function postDeleteMessage(message: MessagingMessage) { user, ), ); - deliver(user, activity, recipient.inbox); + deliver(user, activity, recipient.inbox, false); } } else if (message.groupId) { publishGroupMessagingStream(message.groupId, "deleted", message.id); diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index bd54db7e24..5965359a6b 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -414,6 +414,7 @@ export default async ( if (data.poll?.expiresAt) { const delay = data.poll.expiresAt.getTime() - Date.now(); endedPollNotificationQueue.add( + note.id, { noteId: note.id, }, diff --git a/packages/backend/src/services/relay.ts b/packages/backend/src/services/relay.ts index 244e05c030..1a7580754c 100644 --- a/packages/backend/src/services/relay.ts +++ b/packages/backend/src/services/relay.ts @@ -39,7 +39,7 @@ export async function addRelay(inbox: string) { const relayActor = await getRelayActor(); const follow = await renderFollowRelay(relay, relayActor); const activity = renderActivity(follow); - deliver(relayActor, activity, relay.inbox); + deliver(relayActor, activity, relay.inbox, false); return relay; } @@ -57,7 +57,7 @@ export async function removeRelay(inbox: string) { const follow = renderFollowRelay(relay, relayActor); const undo = renderUndo(follow, relayActor); const activity = renderActivity(undo); - deliver(relayActor, activity, relay.inbox); + deliver(relayActor, activity, relay.inbox, false); await Relays.delete(relay.id); } @@ -104,6 +104,6 @@ export async function deliverToRelays( const signed = await attachLdSignature(copy, user); for (const relay of relays) { - deliver(user, signed, relay.inbox); + deliver(user, signed, relay.inbox, false); } } diff --git a/packages/backend/src/services/suspend-user.ts b/packages/backend/src/services/suspend-user.ts index f72b8ffcb1..8052249601 100644 --- a/packages/backend/src/services/suspend-user.ts +++ b/packages/backend/src/services/suspend-user.ts @@ -41,7 +41,7 @@ export async function doPostSuspend(user: { } for (const inbox of queue) { - deliver(user, content, inbox); + deliver(user, content, inbox, true); } } } diff --git a/packages/backend/src/services/unsuspend-user.ts b/packages/backend/src/services/unsuspend-user.ts index 69447a4a26..b169c46fb1 100644 --- a/packages/backend/src/services/unsuspend-user.ts +++ b/packages/backend/src/services/unsuspend-user.ts @@ -39,7 +39,7 @@ export async function doPostUnsuspend(user: User) { } for (const inbox of queue) { - deliver(user as any, content, inbox); + deliver(user as any, content, inbox, true); } } } diff --git a/packages/backend/test/utils.ts b/packages/backend/test/utils.ts index f3f68b2609..6e07fee97e 100644 --- a/packages/backend/test/utils.ts +++ b/packages/backend/test/utils.ts @@ -331,7 +331,8 @@ export function launchServer( } export async function initTestDb(justBorrow = false, initEntities?: any[]) { - if (process.env.NODE_ENV !== "test") throw "NODE_ENV is not a test"; + if (process.env.NODE_ENV !== "test") + throw new Error("NODE_ENV is not a test"); const db = new DataSource({ type: "postgres", diff --git a/packages/client/src/scripts/time.ts b/packages/client/src/scripts/time.ts index ddaa49352f..2589909eb7 100644 --- a/packages/client/src/scripts/time.ts +++ b/packages/client/src/scripts/time.ts @@ -20,7 +20,7 @@ export function dateUTC(time: number[]): Date { ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6]) : null; - if (!d) throw "wrong number of arguments"; + if (!d) throw new Error("Wrong number of arguments"); return new Date(d); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f001fec77b..e444572990 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -159,9 +159,9 @@ importers: blurhash: specifier: 1.1.5 version: 1.1.5 - bull: - specifier: 4.10.4 - version: 4.10.4 + bullmq: + specifier: ^3.15.0 + version: 3.15.0 cacheable-lookup: specifier: 7.0.0 version: 7.0.0 @@ -431,9 +431,6 @@ importers: '@types/bcryptjs': specifier: 2.4.2 version: 2.4.2 - '@types/bull': - specifier: 3.15.9 - version: 3.15.9 '@types/cbor': specifier: 6.0.0 version: 6.0.0 @@ -1879,6 +1876,7 @@ packages: /@ioredis/commands@1.2.0: resolution: {integrity: sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==} + dev: false /@istanbuljs/load-nyc-config@1.1.0: resolution: {integrity: sha512-VjeHSlIzpv/NyD3N0YuHfXOPDIixcA1q2ZV98wsMqcYlPmv2n3Yb2lYP9XMElnaFVXg5A7YLTeLu6V84uQDjmQ==} @@ -2248,48 +2246,48 @@ packages: os-filter-obj: 2.0.0 dev: true - /@msgpackr-extract/msgpackr-extract-darwin-arm64@2.2.0: - resolution: {integrity: sha512-Z9LFPzfoJi4mflGWV+rv7o7ZbMU5oAU9VmzCgL240KnqDW65Y2HFCT3MW06/ITJSnbVLacmcEJA8phywK7JinQ==} + /@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.2: + resolution: {integrity: sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==} cpu: [arm64] os: [darwin] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-darwin-x64@2.2.0: - resolution: {integrity: sha512-vq0tT8sjZsy4JdSqmadWVw6f66UXqUCabLmUVHZwUFzMgtgoIIQjT4VVRHKvlof3P/dMCkbMJ5hB1oJ9OWHaaw==} + /@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.2: + resolution: {integrity: sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw==} cpu: [x64] os: [darwin] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-arm64@2.2.0: - resolution: {integrity: sha512-hlxxLdRmPyq16QCutUtP8Tm6RDWcyaLsRssaHROatgnkOxdleMTgetf9JsdncL8vLh7FVy/RN9i3XR5dnb9cRA==} + /@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.2: + resolution: {integrity: sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg==} cpu: [arm64] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-arm@2.2.0: - resolution: {integrity: sha512-SaJ3Qq4lX9Syd2xEo9u3qPxi/OB+5JO/ngJKK97XDpa1C587H9EWYO6KD8995DAjSinWvdHKRrCOXVUC5fvGOg==} + /@msgpackr-extract/msgpackr-extract-linux-arm@3.0.2: + resolution: {integrity: sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA==} cpu: [arm] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-x64@2.2.0: - resolution: {integrity: sha512-94y5PJrSOqUNcFKmOl7z319FelCLAE0rz/jPCWS+UtdMZvpa4jrQd+cJPQCLp2Fes1yAW/YUQj/Di6YVT3c3Iw==} + /@msgpackr-extract/msgpackr-extract-linux-x64@3.0.2: + resolution: {integrity: sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA==} cpu: [x64] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-win32-x64@2.2.0: - resolution: {integrity: sha512-XrC0JzsqQSvOyM3t04FMLO6z5gCuhPE6k4FXuLK5xf52ZbdvcFe1yBmo7meCew9B8G2f0T9iu9t3kfTYRYROgA==} + /@msgpackr-extract/msgpackr-extract-win32-x64@3.0.2: + resolution: {integrity: sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ==} cpu: [x64] os: [win32] requiresBuild: true @@ -3055,15 +3053,6 @@ packages: '@types/connect': 3.4.35 '@types/node': 18.11.18 - /@types/bull@3.15.9: - resolution: {integrity: sha512-MPUcyPPQauAmynoO3ezHAmCOhbB0pWmYyijr/5ctaCqhbKWsjW0YCod38ZcLzUBprosfZ9dPqfYIcfdKjk7RNQ==} - dependencies: - '@types/ioredis': 5.0.0 - '@types/redis': 2.8.32 - transitivePeerDependencies: - - supports-color - dev: true - /@types/cacheable-request@6.0.3: resolution: {integrity: sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw==} dependencies: @@ -3214,15 +3203,6 @@ packages: /@types/http-errors@2.0.1: resolution: {integrity: sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==} - /@types/ioredis@5.0.0: - resolution: {integrity: sha512-zJbJ3FVE17CNl5KXzdeSPtdltc4tMT3TzC6fxQS0sQngkbFZ6h+0uTafsRqu+eSLIugf6Yb0Ea0SUuRr42Nk9g==} - deprecated: This is a stub types definition. ioredis provides its own type definitions, so you do not need this installed. - dependencies: - ioredis: 5.3.2 - transitivePeerDependencies: - - supports-color - dev: true - /@types/istanbul-lib-coverage@2.0.4: resolution: {integrity: sha512-z/QT1XN4K4KYuslS23k62yDIDLwLFkzxOuMplDtObz0+y7VqJCaO2o+SPwHCvLFZh7xazvvoor2tA/hPz9ee7g==} dev: true @@ -3535,12 +3515,6 @@ packages: resolution: {integrity: sha512-GSMb93iSA8KKFDgVL2Wzs/kqrHMJcU8xhLdwI5omoACcj7K18SacklLtY1C4G02HC5drd6GygtsIaGbfxJSe0g==} dev: true - /@types/redis@2.8.32: - resolution: {integrity: sha512-7jkMKxcGq9p242exlbsVzuJb57KqHRhNl4dHoQu2Y5v9bCAbtIXXH0R3HleSQW4CTOqpHIYUW3t6tpUj4BVQ+w==} - dependencies: - '@types/node': 18.11.18 - dev: true - /@types/redis@4.0.11: resolution: {integrity: sha512-bI+gth8La8Wg/QCR1+V1fhrL9+LZUSWfcqpOj2Kc80ZQ4ffbdL173vQd5wovmoV9i071FU9oP2g6etLuEwb6Rg==} deprecated: This is a stub types definition. redis provides its own type definitions, so you do not need this installed. @@ -5080,18 +5054,17 @@ packages: node-gyp-build: 4.6.0 dev: false - /bull@4.10.4: - resolution: {integrity: sha512-o9m/7HjS/Or3vqRd59evBlWCXd9Lp+ALppKseoSKHaykK46SmRjAilX98PgmOz1yeVaurt8D5UtvEt4bUjM3eA==} - engines: {node: '>=12'} + /bullmq@3.15.0: + resolution: {integrity: sha512-U0LSRjuoyIBpnE62T4maCWMYEt3qdBCa1lnlPxYKQmRF/Y+FQ9W6iW5JvNNN+NA5Jet7k0uX71a93EX1zGnrhw==} dependencies: - cron-parser: 4.7.1 - debuglog: 1.0.1 - get-port: 5.1.1 + cron-parser: 4.8.1 + glob: 8.0.3 ioredis: 5.3.2 lodash: 4.17.21 - msgpackr: 1.8.1 + msgpackr: 1.9.3 semver: 7.3.8 - uuid: 8.3.2 + tslib: 2.4.1 + uuid: 9.0.0 transitivePeerDependencies: - supports-color dev: false @@ -5599,6 +5572,7 @@ packages: /cluster-key-slot@1.1.2: resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} engines: {node: '>=0.10.0'} + dev: false /co-body@5.2.0: resolution: {integrity: sha512-sX/LQ7LqUhgyaxzbe7IqwPeTr2yfpfUIQ/dgpKo6ZI4y4lpQA0YxAomWIY+7I7rHWcG02PG+OuPREzMW/5tszQ==} @@ -6090,11 +6064,11 @@ packages: /create-require@1.1.1: resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} - /cron-parser@4.7.1: - resolution: {integrity: sha512-WguFaoQ0hQ61SgsCZLHUcNbAvlK0lypKXu62ARguefYmjzaOXIVRNrAmyXzabTwUn4sQvQLkk6bjH+ipGfw8bA==} + /cron-parser@4.8.1: + resolution: {integrity: sha512-jbokKWGcyU4gl6jAfX97E1gDpY12DJ1cLJZmoDzaAln/shZ+S3KBFBuA2Q6WeUN4gJf/8klnV1EfvhA2lK5IRQ==} engines: {node: '>=12.0.0'} dependencies: - luxon: 3.2.1 + luxon: 3.3.0 dev: false /cropperjs@2.0.0-beta.2: @@ -6401,10 +6375,6 @@ packages: ms: 2.1.2 supports-color: 8.1.1 - /debuglog@1.0.1: - resolution: {integrity: sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw==} - dev: false - /decamelize-keys@1.1.1: resolution: {integrity: sha512-WiPxgEirIV0/eIOMcnFBA3/IJZAZqKnwAwWyvvdi4lsr1WCN22nhdf/3db3DoZcUjTV2SqfzIwNyp6y2xs3nmg==} engines: {node: '>=0.10.0'} @@ -6542,6 +6512,7 @@ packages: /denque@2.1.0: resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} engines: {node: '>=0.10'} + dev: false /depd@1.1.2: resolution: {integrity: sha512-7emPTl6Dpo6JRXOXjLRxck+FlLRX5847cLKEn00PLAgc3g2hTZZgr+e4c2v6QpSmLeFP3n5yUo7ft6avBK/5jQ==} @@ -7924,11 +7895,6 @@ packages: through: 2.3.8 dev: false - /get-port@5.1.1: - resolution: {integrity: sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==} - engines: {node: '>=8'} - dev: false - /get-stream@3.0.0: resolution: {integrity: sha512-GlhdIUuVakc8SJ6kK0zAFbiGzRFzNnY4jUuEbV9UROo4Y+0Ny4fjvcZFVTeDA4odpFyOQzaw6hXukJSq/f28sQ==} engines: {node: '>=4'} @@ -8758,6 +8724,7 @@ packages: standard-as-callback: 2.1.0 transitivePeerDependencies: - supports-color + dev: false /iota-array@1.0.0: resolution: {integrity: sha512-pZ2xT+LOHckCatGQ3DcG/a+QuEqvoxqkiL7tvE8nn3uuu+f6i1TtpB5/FtWFbxUuVr5PZCx8KskuGatbJDXOWA==} @@ -10462,6 +10429,7 @@ packages: /lodash.defaults@4.2.0: resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + dev: false /lodash.difference@4.5.0: resolution: {integrity: sha512-dS2j+W26TQ7taQBGN8Lbbq04ssV3emRw4NY58WErlTO29pIqS0HmoT5aJ9+TUQ1N3G+JOZSji4eugsWwGp9yPA==} @@ -10485,6 +10453,7 @@ packages: /lodash.isarguments@3.1.0: resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + dev: false /lodash.isequal@4.5.0: resolution: {integrity: sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==} @@ -10593,8 +10562,8 @@ packages: engines: {node: '>=12'} dev: false - /luxon@3.2.1: - resolution: {integrity: sha512-QrwPArQCNLAKGO/C+ZIilgIuDnEnKx5QYODdDtbFaxzsbZcc/a7WFq7MhsVYgRlwawLtvOUESTlfJ+hc/USqPg==} + /luxon@3.3.0: + resolution: {integrity: sha512-An0UCfG/rSiqtAIiBPO0Y9/zAnHUZxAMiCpTd5h2smgsj7GGmcenvrvww2cqNA8/4A5ZrD1gJpHN2mIHZQF+Mg==} engines: {node: '>=12'} dev: false @@ -11033,26 +11002,26 @@ packages: /ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} - /msgpackr-extract@2.2.0: - resolution: {integrity: sha512-0YcvWSv7ZOGl9Od6Y5iJ3XnPww8O7WLcpYMDwX+PAA/uXLDtyw94PJv9GLQV/nnp3cWlDhMoyKZIQLrx33sWog==} + /msgpackr-extract@3.0.2: + resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==} hasBin: true requiresBuild: true dependencies: - node-gyp-build-optional-packages: 5.0.3 + node-gyp-build-optional-packages: 5.0.7 optionalDependencies: - '@msgpackr-extract/msgpackr-extract-darwin-arm64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-darwin-x64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-arm': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-arm64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-x64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-win32-x64': 2.2.0 + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2 dev: false optional: true - /msgpackr@1.8.1: - resolution: {integrity: sha512-05fT4J8ZqjYlR4QcRDIhLCYKUOHXk7C/xa62GzMKj74l3up9k2QZ3LgFc6qWdsPHl91QA2WLWqWc8b8t7GLNNw==} + /msgpackr@1.9.3: + resolution: {integrity: sha512-DIBUpLO8hZeXAt9Tud3PU9XwwV+Cfiquq9egBa52pSDcwKlBtzHnGR7y9jlUlWquCV6LxDY9qdfKCvory7XPTA==} optionalDependencies: - msgpackr-extract: 2.2.0 + msgpackr-extract: 3.0.2 dev: false /multer@1.4.4-lts.1: @@ -11254,8 +11223,8 @@ packages: fetch-blob: 3.2.0 formdata-polyfill: 4.0.10 - /node-gyp-build-optional-packages@5.0.3: - resolution: {integrity: sha512-k75jcVzk5wnnc/FMxsf4udAoTEUv2jY3ycfdSd3yWu6Cnd1oee6/CfZJApyscA4FJOmdoixWwiwOyf16RzD5JA==} + /node-gyp-build-optional-packages@5.0.7: + resolution: {integrity: sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w==} hasBin: true dev: false optional: true @@ -12897,6 +12866,7 @@ packages: /redis-errors@1.2.0: resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} engines: {node: '>=4'} + dev: false /redis-info@3.1.0: resolution: {integrity: sha512-ER4L9Sh/vm63DkIE0bkSjxluQlioBiBgf5w1UuldaW/3vPcecdljVDisZhmnCMvsxHNiARTTDDHGg9cGwTfrKg==} @@ -12914,6 +12884,7 @@ packages: engines: {node: '>=4'} dependencies: redis-errors: 1.2.0 + dev: false /redis@4.5.1: resolution: {integrity: sha512-oxXSoIqMJCQVBTfxP6BNTCtDMyh9G6Vi5wjdPdV/sRKkufyZslDqCScSGcOr6XGR/reAWZefz7E4leM31RgdBA==} @@ -13766,6 +13737,7 @@ packages: /standard-as-callback@2.1.0: resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + dev: false /start-server-and-test@1.15.2: resolution: {integrity: sha512-t5xJX04Hg7hqxiKHMJBz/n4zIMsE6G7hpAcerFAH+4Vh9le/LeyFcJERJM7WLiPygWF9TOg33oroJF1XOzJtYQ==}