From 6d62cd42764d53cf8355b685a0d52c559e0e8a70 Mon Sep 17 00:00:00 2001 From: naskya Date: Sat, 27 Jul 2024 20:16:13 +0900 Subject: [PATCH] refactor (backend): port publishMainStream to backend-rs --- packages/backend-rs/index.d.ts | 36 ++++++++ packages/backend-rs/index.js | 2 + packages/backend-rs/src/service/stream.rs | 1 + .../backend-rs/src/service/stream/main.rs | 87 +++++++++++++++++++ .../api/common/read-messaging-message.ts | 7 +- .../server/api/common/read-notification.ts | 7 +- .../backend/src/server/api/common/signin.ts | 5 +- .../server/api/endpoints/admin/delete-2fa.ts | 4 +- .../api/endpoints/admin/delete-passkeys.ts | 4 +- .../endpoints/drive/files/upload-from-url.ts | 4 +- .../src/server/api/endpoints/i/2fa/done.ts | 4 +- .../server/api/endpoints/i/2fa/key-done.ts | 7 +- .../api/endpoints/i/2fa/password-less.ts | 4 +- .../server/api/endpoints/i/2fa/remove-key.ts | 7 +- .../server/api/endpoints/i/2fa/unregister.ts | 5 +- .../server/api/endpoints/i/2fa/update-key.ts | 4 +- .../src/server/api/endpoints/i/known-as.ts | 5 +- .../src/server/api/endpoints/i/move.ts | 5 +- .../i/read-all-messaging-messages.ts | 6 +- .../api/endpoints/i/read-all-unread-notes.ts | 6 +- .../api/endpoints/i/read-announcement.ts | 5 +- .../api/endpoints/i/regenerate-token.ts | 15 ++-- .../server/api/endpoints/i/registry/set.ts | 5 +- .../server/api/endpoints/i/update-email.ts | 5 +- .../src/server/api/endpoints/i/update.ts | 5 +- .../notifications/mark-all-as-read.ts | 5 +- .../src/server/api/endpoints/page-push.ts | 4 +- .../src/server/api/private/verify-email.ts | 6 +- .../backend/src/services/blocking/create.ts | 10 +-- .../src/services/create-notification.ts | 13 ++- .../backend/src/services/drive/add-file.ts | 11 ++- .../backend/src/services/following/create.ts | 15 ++-- .../backend/src/services/following/delete.ts | 5 +- .../backend/src/services/following/reject.ts | 5 +- .../src/services/following/requests/accept.ts | 4 +- .../src/services/following/requests/cancel.ts | 4 +- .../src/services/following/requests/create.ts | 7 +- packages/backend/src/services/i/update.ts | 5 +- .../backend/src/services/messages/create.ts | 25 ++++-- packages/backend/src/services/note/create.ts | 9 +- packages/backend/src/services/note/read.ts | 8 +- packages/backend/src/services/note/unread.ts | 9 +- packages/backend/src/services/stream.ts | 27 +++--- 43 files changed, 280 insertions(+), 137 deletions(-) create mode 100644 packages/backend-rs/src/service/stream/main.rs diff --git a/packages/backend-rs/index.d.ts b/packages/backend-rs/index.d.ts index c988e15f98..b67d586e8f 100644 --- a/packages/backend-rs/index.d.ts +++ b/packages/backend-rs/index.d.ts @@ -371,6 +371,40 @@ export interface Emoji { height: number | null } +export declare enum Event { + Notification = 0, + NewNotification = 1, + Mention = 2, + NewMention = 3, + Chat = 4, + NewChat = 5, + NewDm = 6, + Reply = 7, + Renote = 8, + Follow = 9, + Followed = 10, + Unfollow = 11, + NewFollowRequest = 12, + Page = 13, + ReadAllNotifications = 14, + ReadAllMentions = 15, + ReadNotifications = 16, + ReadAllDms = 17, + ReadAllChats = 18, + ReadAntenna = 19, + ReadAllAntennaPosts = 20, + NewAntennaPost = 21, + ReadAllAnnouncements = 22, + ReadAllChannelPosts = 23, + NewChannelPost = 24, + DriveFile = 25, + UrlUploadFinished = 26, + Me = 27, + RegenerateMyToken = 28, + Signin = 29, + Registry = 30 +} + export declare function extractHost(uri: string): string export declare function fetchMeta(): Promise @@ -1130,6 +1164,8 @@ export declare function publishToDriveFolderStream(userId: string, kind: DriveFo export declare function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): Promise +export declare function publishToMainStream(userId: string, kind: Event, object: any): Promise + export declare function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): Promise export declare function publishToNotesStream(note: Note): Promise diff --git a/packages/backend-rs/index.js b/packages/backend-rs/index.js index 398a76358f..f62cbb3626 100644 --- a/packages/backend-rs/index.js +++ b/packages/backend-rs/index.js @@ -375,6 +375,7 @@ module.exports.decodeReaction = nativeBinding.decodeReaction module.exports.DriveFileEvent = nativeBinding.DriveFileEvent module.exports.DriveFileUsageHint = nativeBinding.DriveFileUsageHint module.exports.DriveFolderEvent = nativeBinding.DriveFolderEvent +module.exports.Event = nativeBinding.Event module.exports.extractHost = nativeBinding.extractHost module.exports.fetchMeta = nativeBinding.fetchMeta module.exports.fetchNodeinfo = nativeBinding.fetchNodeinfo @@ -424,6 +425,7 @@ module.exports.publishToChatStream = nativeBinding.publishToChatStream module.exports.publishToDriveFileStream = nativeBinding.publishToDriveFileStream module.exports.publishToDriveFolderStream = nativeBinding.publishToDriveFolderStream module.exports.publishToGroupChatStream = nativeBinding.publishToGroupChatStream +module.exports.publishToMainStream = nativeBinding.publishToMainStream module.exports.publishToModerationStream = nativeBinding.publishToModerationStream module.exports.publishToNotesStream = nativeBinding.publishToNotesStream module.exports.publishToNoteStream = nativeBinding.publishToNoteStream diff --git a/packages/backend-rs/src/service/stream.rs b/packages/backend-rs/src/service/stream.rs index fe8175900c..8998ee9a1f 100644 --- a/packages/backend-rs/src/service/stream.rs +++ b/packages/backend-rs/src/service/stream.rs @@ -5,6 +5,7 @@ pub mod chat_index; pub mod custom_emoji; pub mod drive; pub mod group_chat; +pub mod main; pub mod moderation; pub mod note; pub mod note_edit; diff --git a/packages/backend-rs/src/service/stream/main.rs b/packages/backend-rs/src/service/stream/main.rs new file mode 100644 index 0000000000..a56bfb9266 --- /dev/null +++ b/packages/backend-rs/src/service/stream/main.rs @@ -0,0 +1,87 @@ +use crate::service::stream::{publish_to_stream, Error, Stream}; + +#[macros::export] +pub enum Event { + Notification, + NewNotification, + Mention, + NewMention, + Chat, + NewChat, + NewDm, + Reply, + Renote, + Follow, + Followed, + Unfollow, + NewFollowRequest, + Page, + ReadAllNotifications, + ReadAllMentions, + ReadNotifications, + ReadAllDms, + ReadAllChats, + ReadAntenna, + ReadAllAntennaPosts, + NewAntennaPost, + ReadAllAnnouncements, + ReadAllChannelPosts, + NewChannelPost, + DriveFile, + UrlUploadFinished, + Me, + RegenerateMyToken, + Signin, + Registry, +} + +// We want to merge `kind` and `object` into a single enum +// https://github.com/napi-rs/napi-rs/issues/2036 + +#[macros::export(js_name = "publishToMainStream")] +pub async fn publish( + user_id: String, + kind: Event, + object: &serde_json::Value, +) -> Result<(), Error> { + let kind = match kind { + Event::Notification => "notification", + Event::Mention => "mention", + Event::Reply => "reply", + Event::Renote => "renote", + Event::Follow => "follow", + Event::Followed => "followed", + Event::Unfollow => "unfollow", + Event::Me => "meUpdated", + Event::Page => "pageEvent", + Event::UrlUploadFinished => "urlUploadFinished", + Event::ReadAllNotifications => "readAllNotifications", + Event::ReadNotifications => "readNotifications", + Event::NewNotification => "unreadNotification", + Event::NewMention => "unreadMention", + Event::ReadAllMentions => "readAllUnreadMentions", + Event::ReadAllDms => "readAllUnreadSpecifiedNotes", + Event::NewDm => "unreadSpecifiedNote", + Event::ReadAllChats => "readAllMessagingMessages", + Event::Chat => "messagingMessage", + Event::NewChat => "unreadMessagingMessage", + Event::ReadAllAntennaPosts => "readAllAntennas", + Event::NewAntennaPost => "unreadAntenna", + Event::ReadAllAnnouncements => "readAllAnnouncements", + Event::ReadAllChannelPosts => "readAllChannels", + Event::NewChannelPost => "unreadChannel", + Event::RegenerateMyToken => "myTokenRegenerated", + Event::Signin => "signin", + Event::Registry => "registryUpdated", + Event::DriveFile => "driveFileCreated", + Event::ReadAntenna => "readAntenna", + Event::NewFollowRequest => "receiveFollowRequest", + }; + + publish_to_stream( + &Stream::Main { user_id }, + Some(kind), + Some(serde_json::to_string(&object)?), + ) + .await +} diff --git a/packages/backend/src/server/api/common/read-messaging-message.ts b/packages/backend/src/server/api/common/read-messaging-message.ts index a2c8ba70cf..e15924d231 100644 --- a/packages/backend/src/server/api/common/read-messaging-message.ts +++ b/packages/backend/src/server/api/common/read-messaging-message.ts @@ -1,9 +1,10 @@ -import { publishMainStream } from "@/services/stream.js"; import { publishToChatStream, publishToGroupChatStream, publishToChatIndexStream, sendPushNotification, + publishToMainStream, + Event, } from "backend-rs"; import type { User, IRemoteUser } from "@/models/entities/user.js"; import type { MessagingMessage } from "@/models/entities/messaging-message.js"; @@ -61,7 +62,7 @@ export async function readUserMessagingMessage( if (!(await Users.getHasUnreadMessagingMessage(userId))) { // 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行 - publishMainStream(userId, "readAllMessagingMessages"); + await publishToMainStream(userId, Event.ReadAllChats, {}); await sendPushNotification(userId, "readAllChats", {}); } else { // そのユーザーとのメッセージで未読がなければイベント発行 @@ -135,7 +136,7 @@ export async function readGroupMessagingMessage( if (!(await Users.getHasUnreadMessagingMessage(userId))) { // 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行 - publishMainStream(userId, "readAllMessagingMessages"); + await publishToMainStream(userId, Event.ReadAllChats, {}); await sendPushNotification(userId, "readAllChats", {}); } else { // そのグループにおいて未読がなければイベント発行 diff --git a/packages/backend/src/server/api/common/read-notification.ts b/packages/backend/src/server/api/common/read-notification.ts index 6687f5bd9a..a25fad5392 100644 --- a/packages/backend/src/server/api/common/read-notification.ts +++ b/packages/backend/src/server/api/common/read-notification.ts @@ -1,6 +1,5 @@ import { In } from "typeorm"; -import { publishMainStream } from "@/services/stream.js"; -import { sendPushNotification } from "backend-rs"; +import { Event, publishToMainStream, sendPushNotification } from "backend-rs"; import type { User } from "@/models/entities/user.js"; import type { Notification } from "@/models/entities/notification.js"; import { Notifications, Users } from "@/models/index.js"; @@ -46,7 +45,7 @@ export async function readNotificationByQuery( } function postReadAllNotifications(userId: User["id"]) { - publishMainStream(userId, "readAllNotifications"); + publishToMainStream(userId, Event.ReadAllNotifications, {}); return sendPushNotification(userId, "readAllNotifications", {}); } @@ -54,7 +53,7 @@ function postReadNotifications( userId: User["id"], notificationIds: Notification["id"][], ) { - publishMainStream(userId, "readNotifications", notificationIds); + publishToMainStream(userId, Event.ReadNotifications, notificationIds); return sendPushNotification(userId, "readNotifications", { notificationIds, }); diff --git a/packages/backend/src/server/api/common/signin.ts b/packages/backend/src/server/api/common/signin.ts index bafec3dfad..e160d2d8c2 100644 --- a/packages/backend/src/server/api/common/signin.ts +++ b/packages/backend/src/server/api/common/signin.ts @@ -3,8 +3,7 @@ import type Koa from "koa"; import { config } from "@/config.js"; import type { ILocalUser } from "@/models/entities/user.js"; import { Signins } from "@/models/index.js"; -import { genIdAt } from "backend-rs"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, genIdAt, publishToMainStream } from "backend-rs"; export default function (ctx: Koa.Context, user: ILocalUser, redirect = false) { if (redirect) { @@ -40,6 +39,6 @@ export default function (ctx: Koa.Context, user: ILocalUser, redirect = false) { }).then((x) => Signins.findOneByOrFail(x.identifiers[0])); // Publish signin event - publishMainStream(user.id, "signin", await Signins.pack(record)); + publishToMainStream(user.id, Event.Signin, await Signins.pack(record)); })(); } diff --git a/packages/backend/src/server/api/endpoints/admin/delete-2fa.ts b/packages/backend/src/server/api/endpoints/admin/delete-2fa.ts index 1cf60d9c52..85dc2febf4 100644 --- a/packages/backend/src/server/api/endpoints/admin/delete-2fa.ts +++ b/packages/backend/src/server/api/endpoints/admin/delete-2fa.ts @@ -1,5 +1,5 @@ import { Users, UserProfiles } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import define from "@/server/api/define.js"; export const meta = { @@ -36,5 +36,5 @@ export default define(meta, paramDef, async (ps) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/admin/delete-passkeys.ts b/packages/backend/src/server/api/endpoints/admin/delete-passkeys.ts index 7a54f5d99f..028be28eb4 100644 --- a/packages/backend/src/server/api/endpoints/admin/delete-passkeys.ts +++ b/packages/backend/src/server/api/endpoints/admin/delete-passkeys.ts @@ -1,5 +1,5 @@ import { Users, UserProfiles, UserSecurityKeys } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import define from "@/server/api/define.js"; export const meta = { @@ -38,5 +38,5 @@ export default define(meta, paramDef, async (ps) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/drive/files/upload-from-url.ts b/packages/backend/src/server/api/endpoints/drive/files/upload-from-url.ts index 97b3e4963c..b495541370 100644 --- a/packages/backend/src/server/api/endpoints/drive/files/upload-from-url.ts +++ b/packages/backend/src/server/api/endpoints/drive/files/upload-from-url.ts @@ -1,7 +1,7 @@ import { uploadFromUrl } from "@/services/drive/upload-from-url.js"; import define from "@/server/api/define.js"; import { DriveFiles } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { HOUR } from "@/const.js"; export const meta = { @@ -50,7 +50,7 @@ export default define(meta, paramDef, async (ps, user) => { comment: ps.comment, }).then((file) => { DriveFiles.pack(file, { self: true }).then((packedFile) => { - publishMainStream(user.id, "urlUploadFinished", { + publishToMainStream(user.id, Event.UrlUploadFinished, { marker: ps.marker, file: packedFile, }); diff --git a/packages/backend/src/server/api/endpoints/i/2fa/done.ts b/packages/backend/src/server/api/endpoints/i/2fa/done.ts index c1a7b16a84..69ff627490 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/done.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/done.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import * as OTPAuth from "otpauth"; import define from "@/server/api/define.js"; import { Users, UserProfiles } from "@/models/index.js"; @@ -47,5 +47,5 @@ export default define(meta, paramDef, async (ps, user) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts index 01413d6f37..543744ddad 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts @@ -8,8 +8,7 @@ import { } from "@/models/index.js"; import { config } from "@/config.js"; import { procedures, hash } from "@/server/api/2fa.js"; -import { publishMainStream } from "@/services/stream.js"; -import { verifyPassword } from "backend-rs"; +import { Event, publishToMainStream, verifyPassword } from "backend-rs"; const rpIdHashReal = hash(Buffer.from(config.hostname, "utf-8")); @@ -132,9 +131,9 @@ export default define(meta, paramDef, async (ps, user) => { }); // Publish meUpdated event - publishMainStream( + publishToMainStream( user.id, - "meUpdated", + Event.Me, await Users.pack(user.id, user, { detail: true, includeSecrets: true, diff --git a/packages/backend/src/server/api/endpoints/i/2fa/password-less.ts b/packages/backend/src/server/api/endpoints/i/2fa/password-less.ts index 8125f817a0..b0a3b30106 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/password-less.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/password-less.ts @@ -1,6 +1,6 @@ import define from "@/server/api/define.js"; import { Users, UserProfiles, UserSecurityKeys } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { ApiError } from "@/server/api/error.js"; export const meta = { @@ -57,5 +57,5 @@ export default define(meta, paramDef, async (ps, user) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/i/2fa/remove-key.ts b/packages/backend/src/server/api/endpoints/i/2fa/remove-key.ts index 4259d8f70d..ac6e0d45af 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/remove-key.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/remove-key.ts @@ -1,7 +1,6 @@ -import { verifyPassword } from "backend-rs"; +import { Event, publishToMainStream, verifyPassword } from "backend-rs"; import define from "@/server/api/define.js"; import { UserProfiles, UserSecurityKeys, Users } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; export const meta = { requireCredential: true, @@ -54,9 +53,9 @@ export default define(meta, paramDef, async (ps, user) => { } // Publish meUpdated event - publishMainStream( + publishToMainStream( user.id, - "meUpdated", + Event.Me, await Users.pack(user.id, user, { detail: true, includeSecrets: true, diff --git a/packages/backend/src/server/api/endpoints/i/2fa/unregister.ts b/packages/backend/src/server/api/endpoints/i/2fa/unregister.ts index 240ff2b34e..d3b6a9c12b 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/unregister.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/unregister.ts @@ -1,7 +1,6 @@ -import { publishMainStream } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { Users, UserProfiles } from "@/models/index.js"; -import { verifyPassword } from "backend-rs"; +import { Event, publishToMainStream, verifyPassword } from "backend-rs"; export const meta = { requireCredential: true, @@ -38,5 +37,5 @@ export default define(meta, paramDef, async (ps, user) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/i/2fa/update-key.ts b/packages/backend/src/server/api/endpoints/i/2fa/update-key.ts index d77ecc88e8..f74cdbad51 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/update-key.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/update-key.ts @@ -1,7 +1,7 @@ -import { publishMainStream } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { Users, UserSecurityKeys } from "@/models/index.js"; import { ApiError } from "@/server/api/error.js"; +import { Event, publishToMainStream } from "backend-rs"; export const meta = { requireCredential: true, @@ -54,5 +54,5 @@ export default define(meta, paramDef, async (ps, user) => { includeSecrets: true, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); }); diff --git a/packages/backend/src/server/api/endpoints/i/known-as.ts b/packages/backend/src/server/api/endpoints/i/known-as.ts index 63d21d93e3..fe26d25441 100644 --- a/packages/backend/src/server/api/endpoints/i/known-as.ts +++ b/packages/backend/src/server/api/endpoints/i/known-as.ts @@ -3,8 +3,7 @@ import { Users } from "@/models/index.js"; import { resolveUser } from "@/remote/resolve-user.js"; import acceptAllFollowRequests from "@/services/following/requests/accept-all.js"; import { publishToFollowers } from "@/services/i/update.js"; -import { publishMainStream } from "@/services/stream.js"; -import { stringToAcct } from "backend-rs"; +import { Event, publishToMainStream, stringToAcct } from "backend-rs"; import { DAY } from "@/const.js"; import { apiLogger } from "@/server/api/logger.js"; import define from "@/server/api/define.js"; @@ -97,7 +96,7 @@ export default define(meta, paramDef, async (ps, user) => { }); // Publish meUpdated event - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); if (user.isLocked === false) { acceptAllFollowRequests(user); diff --git a/packages/backend/src/server/api/endpoints/i/move.ts b/packages/backend/src/server/api/endpoints/i/move.ts index e7808d7b0a..a0fe50bede 100644 --- a/packages/backend/src/server/api/endpoints/i/move.ts +++ b/packages/backend/src/server/api/endpoints/i/move.ts @@ -1,6 +1,6 @@ import type { User } from "@/models/entities/user.js"; import { resolveUser } from "@/remote/resolve-user.js"; -import { stringToAcct } from "backend-rs"; +import { Event, publishToMainStream, stringToAcct } from "backend-rs"; import { DAY } from "@/const.js"; import DeliverManager from "@/remote/activitypub/deliver-manager.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; @@ -12,7 +12,6 @@ import create from "@/services/following/create.js"; import { getUser } from "@/server/api/common/getters.js"; import { Followings, Users } from "@/models/index.js"; import { config } from "@/config.js"; -import { publishMainStream } from "@/services/stream.js"; import { inspect } from "node:util"; export const meta = { @@ -134,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => { dm.execute(); // Publish meUpdated event - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); const followings = await Followings.findBy({ followeeId: user.id, diff --git a/packages/backend/src/server/api/endpoints/i/read-all-messaging-messages.ts b/packages/backend/src/server/api/endpoints/i/read-all-messaging-messages.ts index 505c424edb..2f3ea6855e 100644 --- a/packages/backend/src/server/api/endpoints/i/read-all-messaging-messages.ts +++ b/packages/backend/src/server/api/endpoints/i/read-all-messaging-messages.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import define from "@/server/api/define.js"; import { MessagingMessages, UserGroupJoinings } from "@/models/index.js"; @@ -16,7 +16,7 @@ export const paramDef = { required: [], } as const; -export default define(meta, paramDef, async (ps, user) => { +export default define(meta, paramDef, async (_ps, user) => { // Update documents await MessagingMessages.update( { @@ -44,5 +44,5 @@ export default define(meta, paramDef, async (ps, user) => { ), ); - publishMainStream(user.id, "readAllMessagingMessages"); + publishToMainStream(user.id, Event.ReadAllChats, {}); }); diff --git a/packages/backend/src/server/api/endpoints/i/read-all-unread-notes.ts b/packages/backend/src/server/api/endpoints/i/read-all-unread-notes.ts index 6f70e55a1a..e3f5a51473 100644 --- a/packages/backend/src/server/api/endpoints/i/read-all-unread-notes.ts +++ b/packages/backend/src/server/api/endpoints/i/read-all-unread-notes.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import define from "@/server/api/define.js"; import { NoteUnreads } from "@/models/index.js"; @@ -23,6 +23,6 @@ export default define(meta, paramDef, async (ps, user) => { }); // 全て既読になったイベントを発行 - publishMainStream(user.id, "readAllUnreadMentions"); - publishMainStream(user.id, "readAllUnreadSpecifiedNotes"); + publishToMainStream(user.id, Event.ReadAllMentions, {}); + publishToMainStream(user.id, Event.ReadAllDms, {}); }); diff --git a/packages/backend/src/server/api/endpoints/i/read-announcement.ts b/packages/backend/src/server/api/endpoints/i/read-announcement.ts index 7981292031..1ad817898f 100644 --- a/packages/backend/src/server/api/endpoints/i/read-announcement.ts +++ b/packages/backend/src/server/api/endpoints/i/read-announcement.ts @@ -1,8 +1,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; -import { genIdAt } from "backend-rs"; +import { Event, genIdAt, publishToMainStream } from "backend-rs"; import { AnnouncementReads, Announcements, Users } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; export const meta = { tags: ["account"], @@ -59,6 +58,6 @@ export default define(meta, paramDef, async (ps, user) => { }); if (!(await Users.getHasUnreadAnnouncement(user.id))) { - publishMainStream(user.id, "readAllAnnouncements"); + publishToMainStream(user.id, Event.ReadAllAnnouncements, {}); } }); diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts index 4b1b3450af..0f0f79c35e 100644 --- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts +++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts @@ -1,11 +1,12 @@ -import { - publishInternalEvent, - publishMainStream, - publishUserEvent, -} from "@/services/stream.js"; +import { publishInternalEvent, publishUserEvent } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { Users, UserProfiles } from "@/models/index.js"; -import { generateUserToken, verifyPassword } from "backend-rs"; +import { + Event, + generateUserToken, + publishToMainStream, + verifyPassword, +} from "backend-rs"; export const meta = { requireCredential: true, @@ -46,7 +47,7 @@ export default define(meta, paramDef, async (ps, user) => { oldToken, newToken, }); - publishMainStream(user.id, "myTokenRegenerated"); + publishToMainStream(user.id, Event.RegenerateMyToken, {}); // Terminate streaming setTimeout(() => { diff --git a/packages/backend/src/server/api/endpoints/i/registry/set.ts b/packages/backend/src/server/api/endpoints/i/registry/set.ts index e778bd98d8..32e2598c85 100644 --- a/packages/backend/src/server/api/endpoints/i/registry/set.ts +++ b/packages/backend/src/server/api/endpoints/i/registry/set.ts @@ -1,7 +1,6 @@ -import { publishMainStream } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { RegistryItems } from "@/models/index.js"; -import { genIdAt } from "backend-rs"; +import { Event, genIdAt, publishToMainStream } from "backend-rs"; export const meta = { requireCredential: true, @@ -55,7 +54,7 @@ export default define(meta, paramDef, async (ps, user) => { } // TODO: サードパーティアプリが傍受出来てしまうのでどうにかする - publishMainStream(user.id, "registryUpdated", { + publishToMainStream(user.id, Event.Registry, { scope: ps.scope, key: ps.key, value: ps.value, diff --git a/packages/backend/src/server/api/endpoints/i/update-email.ts b/packages/backend/src/server/api/endpoints/i/update-email.ts index d894eb07e9..4d27846c88 100644 --- a/packages/backend/src/server/api/endpoints/i/update-email.ts +++ b/packages/backend/src/server/api/endpoints/i/update-email.ts @@ -1,4 +1,3 @@ -import { publishMainStream } from "@/services/stream.js"; import define from "@/server/api/define.js"; import rndstr from "rndstr"; import { config } from "@/config.js"; @@ -6,7 +5,7 @@ import { Users, UserProfiles } from "@/models/index.js"; import { sendEmail } from "@/services/send-email.js"; import { ApiError } from "@/server/api/error.js"; import { validateEmailForAccount } from "@/services/validate-email-for-account.js"; -import { verifyPassword } from "backend-rs"; +import { Event, publishToMainStream, verifyPassword } from "backend-rs"; import { HOUR } from "@/const.js"; export const meta = { @@ -72,7 +71,7 @@ export default define(meta, paramDef, async (ps, user) => { }); // Publish meUpdated event - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); if (ps.email != null) { const code = rndstr("a-z0-9", 16); diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index dd0511cfed..0620f837ff 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -1,5 +1,6 @@ import * as mfm from "mfm-js"; -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import acceptAllFollowRequests from "@/services/following/requests/accept-all.js"; import { publishToFollowers } from "@/services/i/update.js"; import { extractCustomEmojisFromMfm } from "@/misc/extract-custom-emojis-from-mfm.js"; @@ -345,7 +346,7 @@ export default define(meta, paramDef, async (ps, _user, token) => { }); // Publish meUpdated event - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); publishUserEvent( user.id, "updateUserProfile", diff --git a/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts b/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts index 6b6c207ab1..12005db818 100644 --- a/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts +++ b/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts @@ -1,5 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; -import { sendPushNotification } from "backend-rs"; +import { Event, publishToMainStream, sendPushNotification } from "backend-rs"; import { Notifications } from "@/models/index.js"; import define from "@/server/api/define.js"; @@ -30,6 +29,6 @@ export default define(meta, paramDef, async (_, user) => { ); // 全ての通知を読みましたよというイベントを発行 - publishMainStream(user.id, "readAllNotifications"); + await publishToMainStream(user.id, Event.ReadAllNotifications, {}); await sendPushNotification(user.id, "readAllNotifications", {}); }); diff --git a/packages/backend/src/server/api/endpoints/page-push.ts b/packages/backend/src/server/api/endpoints/page-push.ts index 1f87372d61..5221063637 100644 --- a/packages/backend/src/server/api/endpoints/page-push.ts +++ b/packages/backend/src/server/api/endpoints/page-push.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { Users, Pages } from "@/models/index.js"; import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; @@ -32,7 +32,7 @@ export default define(meta, paramDef, async (ps, user) => { throw new ApiError(meta.errors.noSuchPage); } - publishMainStream(page.userId, "pageEvent", { + publishToMainStream(page.userId, Event.Page, { pageId: ps.pageId, event: ps.event, var: ps.var, diff --git a/packages/backend/src/server/api/private/verify-email.ts b/packages/backend/src/server/api/private/verify-email.ts index e6c8295d18..b5c8f9c954 100644 --- a/packages/backend/src/server/api/private/verify-email.ts +++ b/packages/backend/src/server/api/private/verify-email.ts @@ -1,6 +1,6 @@ import type Koa from "koa"; import { Users, UserProfiles } from "@/models/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; export default async (ctx: Koa.Context) => { const body = ctx.request.body; @@ -20,9 +20,9 @@ export default async (ctx: Koa.Context) => { }, ); - publishMainStream( + publishToMainStream( profile.userId, - "meUpdated", + Event.Me, await Users.pack( profile.userId, { id: profile.userId }, diff --git a/packages/backend/src/services/blocking/create.ts b/packages/backend/src/services/blocking/create.ts index 8c41d93665..02ac813967 100644 --- a/packages/backend/src/services/blocking/create.ts +++ b/packages/backend/src/services/blocking/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { renderUndo } from "@/remote/activitypub/renderer/undo.js"; @@ -15,7 +15,7 @@ import { UserListJoinings, UserLists, } from "@/models/index.js"; -import { genIdAt } from "backend-rs"; +import { Event, genIdAt, publishToMainStream } from "backend-rs"; import { getActiveWebhooks } from "@/misc/webhook-cache.js"; import { webhookDeliver } from "@/queue/index.js"; @@ -65,7 +65,7 @@ async function cancelRequest(follower: User, followee: User) { if (Users.isLocalUser(followee)) { Users.pack(followee, followee, { detail: true, - }).then((packed) => publishMainStream(followee.id, "meUpdated", packed)); + }).then((packed) => publishToMainStream(followee.id, Event.Me, packed)); } if (Users.isLocalUser(follower)) { @@ -73,7 +73,7 @@ async function cancelRequest(follower: User, followee: User) { detail: true, }).then(async (packed) => { publishUserEvent(follower.id, "unfollow", packed); - publishMainStream(follower.id, "unfollow", packed); + publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), @@ -128,7 +128,7 @@ async function unFollow(follower: User, followee: User) { detail: true, }).then(async (packed) => { publishUserEvent(follower.id, "unfollow", packed); - publishMainStream(follower.id, "unfollow", packed); + publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), diff --git a/packages/backend/src/services/create-notification.ts b/packages/backend/src/services/create-notification.ts index 6030f0a74d..c862c35d3f 100644 --- a/packages/backend/src/services/create-notification.ts +++ b/packages/backend/src/services/create-notification.ts @@ -1,4 +1,3 @@ -import { publishMainStream } from "@/services/stream.js"; import { Notifications, Mutings, @@ -7,7 +6,13 @@ import { Users, Followings, } from "@/models/index.js"; -import { genIdAt, isSilencedServer, sendPushNotification } from "backend-rs"; +import { + Event, + genIdAt, + isSilencedServer, + publishToMainStream, + sendPushNotification, +} from "backend-rs"; import type { User } from "@/models/entities/user.js"; import type { Notification } from "@/models/entities/notification.js"; import { sendEmailNotification } from "./send-email-notification.js"; @@ -76,7 +81,7 @@ export async function createNotification( const packed = await Notifications.pack(notification, {}); // Publish notification event - publishMainStream(notifieeId, "notification", packed); + publishToMainStream(notifieeId, Event.Notification, packed); // 2秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する setTimeout(async () => { @@ -111,7 +116,7 @@ export async function createNotification( } //#endregion - publishMainStream(notifieeId, "unreadNotification", packed); + publishToMainStream(notifieeId, Event.NewNotification, packed); if (type === "follow") sendEmailNotification.follow( diff --git a/packages/backend/src/services/drive/add-file.ts b/packages/backend/src/services/drive/add-file.ts index f42a987356..541aab170e 100644 --- a/packages/backend/src/services/drive/add-file.ts +++ b/packages/backend/src/services/drive/add-file.ts @@ -5,8 +5,13 @@ import { v4 as uuid } from "uuid"; import type S3 from "aws-sdk/clients/s3.js"; // TODO: migrate to SDK v3 import sharp from "sharp"; import { IsNull } from "typeorm"; -import { publishMainStream } from "@/services/stream.js"; -import { fetchMeta, genId, publishToDriveFileStream } from "backend-rs"; +import { + Event, + fetchMeta, + genId, + publishToDriveFileStream, + publishToMainStream, +} from "backend-rs"; import { FILE_TYPE_BROWSERSAFE } from "@/const.js"; import { contentDisposition } from "@/misc/content-disposition.js"; import { getFileInfo } from "@/misc/get-file-info.js"; @@ -667,7 +672,7 @@ export async function addFile({ if (user != null) { DriveFiles.pack(file, { self: true }).then((packedFile) => { // Publish driveFileCreated event - publishMainStream(user.id, "driveFileCreated", packedFile); + publishToMainStream(user.id, Event.DriveFile, packedFile); publishToDriveFileStream(user.id, "create", toRustObject(file)); }); } diff --git a/packages/backend/src/services/following/create.ts b/packages/backend/src/services/following/create.ts index 6788cd13f6..1c962c9e9e 100644 --- a/packages/backend/src/services/following/create.ts +++ b/packages/backend/src/services/following/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import renderAccept from "@/remote/activitypub/renderer/accept.js"; @@ -17,7 +17,12 @@ import { Instances, UserProfiles, } from "@/models/index.js"; -import { genIdAt, isSilencedServer } from "backend-rs"; +import { + Event, + genIdAt, + isSilencedServer, + publishToMainStream, +} from "backend-rs"; import { createNotification } from "@/services/create-notification.js"; import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js"; import type { Packed } from "@/misc/schema.js"; @@ -126,9 +131,9 @@ export async function insertFollowingDoc( "follow", packed as Packed<"UserDetailedNotMe">, ); - publishMainStream( + publishToMainStream( follower.id, - "follow", + Event.Follow, packed as Packed<"UserDetailedNotMe">, ); @@ -146,7 +151,7 @@ export async function insertFollowingDoc( // Publish followed event if (Users.isLocalUser(followee)) { Users.pack(follower.id, followee).then(async (packed) => { - publishMainStream(followee.id, "followed", packed); + publishToMainStream(followee.id, Event.Followed, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === followee.id && x.on.includes("followed"), diff --git a/packages/backend/src/services/following/delete.ts b/packages/backend/src/services/following/delete.ts index 2fe863fc57..65d409f8bf 100644 --- a/packages/backend/src/services/following/delete.ts +++ b/packages/backend/src/services/following/delete.ts @@ -1,4 +1,5 @@ -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { renderUndo } from "@/remote/activitypub/renderer/undo.js"; @@ -51,7 +52,7 @@ export default async function ( detail: true, }).then(async (packed) => { publishUserEvent(follower.id, "unfollow", packed); - publishMainStream(follower.id, "unfollow", packed); + publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), diff --git a/packages/backend/src/services/following/reject.ts b/packages/backend/src/services/following/reject.ts index 821fe9f639..71f0220b14 100644 --- a/packages/backend/src/services/following/reject.ts +++ b/packages/backend/src/services/following/reject.ts @@ -2,7 +2,8 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import renderReject from "@/remote/activitypub/renderer/reject.js"; import { deliver, webhookDeliver } from "@/queue/index.js"; -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import type { ILocalUser, IRemoteUser } from "@/models/entities/user.js"; import { Users, FollowRequests, Followings } from "@/models/index.js"; import { decrementFollowing } from "./delete.js"; @@ -120,7 +121,7 @@ async function publishUnfollow(followee: Both, follower: Local) { }); publishUserEvent(follower.id, "unfollow", packedFollowee); - publishMainStream(follower.id, "unfollow", packedFollowee); + publishToMainStream(follower.id, Event.Unfollow, packedFollowee); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), diff --git a/packages/backend/src/services/following/requests/accept.ts b/packages/backend/src/services/following/requests/accept.ts index f0425c20b9..bcd7d2cf4c 100644 --- a/packages/backend/src/services/following/requests/accept.ts +++ b/packages/backend/src/services/following/requests/accept.ts @@ -2,7 +2,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import renderAccept from "@/remote/activitypub/renderer/accept.js"; import { deliver } from "@/queue/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { insertFollowingDoc } from "../create.js"; import type { User, CacheableUser } from "@/models/entities/user.js"; import { FollowRequests, Users } from "@/models/index.js"; @@ -44,5 +44,5 @@ export default async function ( Users.pack(followee.id, followee, { detail: true, - }).then((packed) => publishMainStream(followee.id, "meUpdated", packed)); + }).then((packed) => publishToMainStream(followee.id, Event.Me, packed)); } diff --git a/packages/backend/src/services/following/requests/cancel.ts b/packages/backend/src/services/following/requests/cancel.ts index f271b15058..eb1b2dcb49 100644 --- a/packages/backend/src/services/following/requests/cancel.ts +++ b/packages/backend/src/services/following/requests/cancel.ts @@ -2,7 +2,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { renderUndo } from "@/remote/activitypub/renderer/undo.js"; import { deliver } from "@/queue/index.js"; -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import { IdentifiableError } from "@/misc/identifiable-error.js"; import type { User } from "@/models/entities/user.js"; import { Users, FollowRequests } from "@/models/index.js"; @@ -46,5 +46,5 @@ export default async function ( Users.pack(followee.id, followee, { detail: true, - }).then((packed) => publishMainStream(followee.id, "meUpdated", packed)); + }).then((packed) => publishToMainStream(followee.id, Event.Me, packed)); } diff --git a/packages/backend/src/services/following/requests/create.ts b/packages/backend/src/services/following/requests/create.ts index 648fd38a0a..40f383cfd3 100644 --- a/packages/backend/src/services/following/requests/create.ts +++ b/packages/backend/src/services/following/requests/create.ts @@ -1,10 +1,9 @@ -import { publishMainStream } from "@/services/stream.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { deliver } from "@/queue/index.js"; import type { User } from "@/models/entities/user.js"; import { Blockings, FollowRequests, Users } from "@/models/index.js"; -import { genIdAt } from "backend-rs"; +import { Event, genIdAt, publishToMainStream } from "backend-rs"; import { createNotification } from "@/services/create-notification.js"; import { config } from "@/config.js"; @@ -67,12 +66,12 @@ export default async function ( // Publish receiveRequest event if (Users.isLocalUser(followee)) { Users.pack(follower.id, followee).then((packed) => - publishMainStream(followee.id, "receiveFollowRequest", packed), + publishToMainStream(followee.id, Event.NewFollowRequest, packed), ); Users.pack(followee.id, followee, { detail: true, - }).then((packed) => publishMainStream(followee.id, "meUpdated", packed)); + }).then((packed) => publishToMainStream(followee.id, Event.Me, packed)); // 通知を作成 createNotification(followee.id, "receiveFollowRequest", { diff --git a/packages/backend/src/services/i/update.ts b/packages/backend/src/services/i/update.ts index 2f7e2b5e18..f2752f069b 100644 --- a/packages/backend/src/services/i/update.ts +++ b/packages/backend/src/services/i/update.ts @@ -11,7 +11,8 @@ import mfm from "mfm-js"; import { extractHashtags } from "@/misc/extract-hashtags.js"; import { normalizeForSearch } from "@/misc/normalize-for-search.js"; import { updateUsertags } from "@/services/update-hashtag.js"; -import { publishMainStream, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import acceptAllFollowRequests from "@/services/following/requests/accept-all.js"; import { promiseEarlyReturn } from "@/prelude/promise.js"; @@ -79,7 +80,7 @@ export async function updateUserProfileData( includeSecrets: isSecure, }); - publishMainStream(user.id, "meUpdated", iObj); + publishToMainStream(user.id, Event.Me, iObj); publishUserEvent( user.id, "updateUserProfile", diff --git a/packages/backend/src/services/messages/create.ts b/packages/backend/src/services/messages/create.ts index 32c800f0b3..a19bc59168 100644 --- a/packages/backend/src/services/messages/create.ts +++ b/packages/backend/src/services/messages/create.ts @@ -14,9 +14,10 @@ import { publishToGroupChatStream, publishToChatIndexStream, toPuny, + publishToMainStream, + Event, } from "backend-rs"; import type { MessagingMessage } from "@/models/entities/messaging-message.js"; -import { publishMainStream } from "@/services/stream.js"; import { Not } from "typeorm"; import type { Note } from "@/models/entities/note.js"; import renderNote from "@/remote/activitypub/renderer/note.js"; @@ -62,8 +63,8 @@ export async function createMessage( messageObj, ), publishToChatIndexStream(message.userId, "message", messageObj), + publishToMainStream(message.userId, Event.Chat, messageObj), ]); - publishMainStream(message.userId, "messagingMessage", messageObj); } if (Users.isLocalUser(recipientUser)) { @@ -76,8 +77,8 @@ export async function createMessage( messageObj, ), publishToChatIndexStream(recipientUser.id, "message", messageObj), + publishToMainStream(recipientUser.id, Event.Chat, messageObj), ]); - publishMainStream(recipientUser.id, "messagingMessage", messageObj); } } else if (recipientGroup != null) { // group's stream @@ -88,8 +89,10 @@ export async function createMessage( userGroupId: recipientGroup.id, }); for await (const joining of joinings) { - await publishToChatIndexStream(joining.userId, "message", messageObj); - publishMainStream(joining.userId, "messagingMessage", messageObj); + await Promise.all([ + publishToChatIndexStream(joining.userId, "message", messageObj), + publishToMainStream(joining.userId, Event.Chat, messageObj), + ]); } } @@ -108,8 +111,10 @@ export async function createMessage( if (mute.map((m) => m.muteeId).includes(user.id)) return; //#endregion - publishMainStream(recipientUser.id, "unreadMessagingMessage", messageObj); - await sendPushNotification(recipientUser.id, "chat", messageObj); + await Promise.all([ + publishToMainStream(recipientUser.id, Event.NewChat, messageObj), + sendPushNotification(recipientUser.id, "chat", messageObj), + ]); } else if (recipientGroup) { const joinings = await UserGroupJoinings.findBy({ userGroupId: recipientGroup.id, @@ -117,8 +122,10 @@ export async function createMessage( }); for await (const joining of joinings) { if (freshMessage.reads.includes(joining.userId)) return; // 既読 - publishMainStream(joining.userId, "unreadMessagingMessage", messageObj); - await sendPushNotification(joining.userId, "chat", messageObj); + await Promise.all([ + publishToMainStream(joining.userId, Event.NewChat, messageObj), + sendPushNotification(joining.userId, "chat", messageObj), + ]); } } }, 2000); diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 33f0913ea0..4b21c67141 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -1,5 +1,4 @@ import * as mfm from "mfm-js"; -import { publishMainStream } from "@/services/stream.js"; import DeliverManager from "@/remote/activitypub/deliver-manager.js"; import renderNote from "@/remote/activitypub/renderer/note.js"; import renderCreate from "@/remote/activitypub/renderer/create.js"; @@ -48,6 +47,8 @@ import { publishToNotesStream, publishToNoteStream, NoteEvent, + publishToMainStream, + Event, } from "backend-rs"; import { countSameRenotes } from "@/misc/count-same-renotes.js"; import { deliverToRelays, getCachedRelays } from "../relay.js"; @@ -508,7 +509,7 @@ export default async ( const packedReply = await Notes.pack(note, { id: data.reply.userId, }); - publishMainStream(data.reply.userId, "reply", packedReply); + publishToMainStream(data.reply.userId, Event.Reply, packedReply); const webhooks = (await getActiveWebhooks()).filter( (x) => @@ -548,7 +549,7 @@ export default async ( const packedRenote = await Notes.pack(note, { id: data.renote.userId, }); - publishMainStream(data.renote.userId, "renote", packedRenote); + publishToMainStream(data.renote.userId, Event.Renote, packedRenote); const renote = data.renote; const webhooks = (await getActiveWebhooks()).filter( @@ -874,7 +875,7 @@ async function createMentionedEvents( detail: true, }); - publishMainStream(u.id, "mention", detailPackedNote); + publishToMainStream(u.id, Event.Mention, detailPackedNote); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === u.id && x.on.includes("mention"), diff --git a/packages/backend/src/services/note/read.ts b/packages/backend/src/services/note/read.ts index d7fda27a85..98d94e684a 100644 --- a/packages/backend/src/services/note/read.ts +++ b/packages/backend/src/services/note/read.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from "@/services/stream.js"; +import { Event, publishToMainStream } from "backend-rs"; import type { Note } from "@/models/entities/note.js"; import type { User } from "@/models/entities/user.js"; import { NoteUnreads, Followings, ChannelFollowings } from "@/models/index.js"; @@ -84,7 +84,7 @@ export default async function ( }).then((mentionsCount) => { if (mentionsCount === 0) { // 全て既読になったイベントを発行 - publishMainStream(userId, "readAllUnreadMentions"); + publishToMainStream(userId, Event.ReadAllMentions, {}); } }); @@ -94,7 +94,7 @@ export default async function ( }).then((specifiedCount) => { if (specifiedCount === 0) { // 全て既読になったイベントを発行 - publishMainStream(userId, "readAllUnreadSpecifiedNotes"); + publishToMainStream(userId, Event.ReadAllDms, {}); } }); @@ -104,7 +104,7 @@ export default async function ( }).then((channelNoteCount) => { if (channelNoteCount === 0) { // 全て既読になったイベントを発行 - publishMainStream(userId, "readAllChannels"); + publishToMainStream(userId, Event.ReadAllChannelPosts, {}); } }); diff --git a/packages/backend/src/services/note/unread.ts b/packages/backend/src/services/note/unread.ts index d52ac46d4d..f17763290a 100644 --- a/packages/backend/src/services/note/unread.ts +++ b/packages/backend/src/services/note/unread.ts @@ -1,8 +1,7 @@ import type { Note } from "@/models/entities/note.js"; -import { publishMainStream } from "@/services/stream.js"; import type { User } from "@/models/entities/user.js"; import { Mutings, NoteThreadMutings, NoteUnreads } from "@/models/index.js"; -import { genId } from "backend-rs"; +import { Event, genId, publishToMainStream } from "backend-rs"; export async function insertNoteUnread( userId: User["id"], @@ -47,13 +46,13 @@ export async function insertNoteUnread( if (!exists) return; if (params.isMentioned) { - publishMainStream(userId, "unreadMention", note.id); + publishToMainStream(userId, Event.NewMention, note.id); } if (params.isSpecified) { - publishMainStream(userId, "unreadSpecifiedNote", note.id); + publishToMainStream(userId, Event.NewDm, note.id); } if (note.channelId) { - publishMainStream(userId, "unreadChannel", note.id); + publishToMainStream(userId, Event.NewChannelPost, note.id); } }, 2000); } diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 79f813523e..71c6a3cc65 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -15,7 +15,7 @@ import type { // DriveStreamTypes, // GroupMessagingStreamTypes, InternalStreamTypes, - MainStreamTypes, + // MainStreamTypes, // MessagingIndexStreamTypes, // MessagingStreamTypes, // NoteStreamTypes, @@ -76,17 +76,18 @@ class Publisher { // ); // }; - public publishMainStream = ( - userId: User["id"], - type: K, - value?: MainStreamTypes[K], - ): void => { - this.publish( - `mainStream:${userId}`, - type, - typeof value === "undefined" ? null : value, - ); - }; + /* ported to backend-rs */ + // public publishMainStream = ( + // userId: User["id"], + // type: K, + // value?: MainStreamTypes[K], + // ): void => { + // this.publish( + // `mainStream:${userId}`, + // type, + // typeof value === "undefined" ? null : value, + // ); + // }; /* ported to backend-rs */ // public publishDriveStream = ( @@ -217,7 +218,7 @@ export default publisher; export const publishInternalEvent = publisher.publishInternalEvent; export const publishUserEvent = publisher.publishUserEvent; // export const publishBroadcastStream = publisher.publishBroadcastStream; -export const publishMainStream = publisher.publishMainStream; +// export const publishMainStream = publisher.publishMainStream; // export const publishDriveStream = publisher.publishDriveStream; // export const publishNoteStream = publisher.publishNoteStream; // export const publishNotesStream = publisher.publishNotesStream;