From b6498a12e8e76562a98eb23116cc695a816fa147 Mon Sep 17 00:00:00 2001 From: naskya Date: Sat, 27 Jul 2024 21:02:48 +0900 Subject: [PATCH] refactor (backend): port publishUserEvent to backend-rs --- packages/backend-rs/index.d.ts | 13 ++++++ packages/backend-rs/index.js | 2 + packages/backend-rs/src/service/stream.rs | 1 + .../backend-rs/src/service/stream/user.rs | 41 +++++++++++++++++++ .../system/check-expired-mutings.ts | 10 +++-- .../api/endpoints/admin/accounts/delete.ts | 4 +- .../api/endpoints/admin/suspend-user.ts | 4 +- .../server/api/endpoints/channels/follow.ts | 5 +-- .../server/api/endpoints/channels/unfollow.ts | 4 +- .../api/endpoints/i/regenerate-token.ts | 5 ++- .../server/api/endpoints/i/revoke-token.ts | 4 +- .../src/server/api/endpoints/i/update.ts | 14 ++++--- .../src/server/api/endpoints/mute/create.ts | 5 +-- .../src/server/api/endpoints/mute/delete.ts | 4 +- .../api/endpoints/renote-mute/create.ts | 2 +- .../api/endpoints/renote-mute/delete.ts | 2 +- .../server/api/endpoints/reply-mute/create.ts | 2 +- .../server/api/endpoints/reply-mute/delete.ts | 2 +- .../src/server/api/mastodon/helpers/list.ts | 4 +- .../src/server/api/mastodon/helpers/user.ts | 7 ++-- .../backend/src/server/api/stream/types.ts | 1 - .../backend/src/services/blocking/create.ts | 15 ++++--- .../backend/src/services/delete-account.ts | 4 +- .../backend/src/services/following/create.ts | 9 ++-- .../backend/src/services/following/delete.ts | 12 ++++-- .../backend/src/services/following/reject.ts | 10 +++-- packages/backend/src/services/i/update.ts | 16 +++++--- packages/backend/src/services/stream.ts | 29 ++++++------- 28 files changed, 155 insertions(+), 76 deletions(-) create mode 100644 packages/backend-rs/src/service/stream/user.rs diff --git a/packages/backend-rs/index.d.ts b/packages/backend-rs/index.d.ts index 8139084116..d939266207 100644 --- a/packages/backend-rs/index.d.ts +++ b/packages/backend-rs/index.d.ts @@ -1191,6 +1191,8 @@ export declare function publishToNoteStream(noteId: string, kind: NoteEvent, obj export declare function publishToNoteUpdatesStream(note: Note): Promise +export declare function publishToUserStream(userId: string, kind: UserEvent, object: any): Promise + export interface PugArgs { img: string | null title: string @@ -1482,6 +1484,17 @@ export type UserEmojiModPerm = 'add'| 'mod'| 'unauthorized'; +export declare enum UserEvent { + Disconnect = 0, + FollowChannel = 1, + UnfollowChannel = 2, + UpdateProfile = 3, + Mute = 4, + Unmute = 5, + Follow = 6, + Unfollow = 7 +} + export interface UserGroup { id: string createdAt: DateTimeWithTimeZone diff --git a/packages/backend-rs/index.js b/packages/backend-rs/index.js index 9d17a05028..9ca4963455 100644 --- a/packages/backend-rs/index.js +++ b/packages/backend-rs/index.js @@ -432,6 +432,7 @@ module.exports.publishToModerationStream = nativeBinding.publishToModerationStre module.exports.publishToNotesStream = nativeBinding.publishToNotesStream module.exports.publishToNoteStream = nativeBinding.publishToNoteStream module.exports.publishToNoteUpdatesStream = nativeBinding.publishToNoteUpdatesStream +module.exports.publishToUserStream = nativeBinding.publishToUserStream module.exports.PushNotificationKind = nativeBinding.PushNotificationKind module.exports.PushSubscriptionType = nativeBinding.PushSubscriptionType module.exports.RelayStatus = nativeBinding.RelayStatus @@ -454,6 +455,7 @@ module.exports.updateAntennasOnNewNote = nativeBinding.updateAntennasOnNewNote module.exports.updateMetaCache = nativeBinding.updateMetaCache module.exports.updateNodeinfoCache = nativeBinding.updateNodeinfoCache module.exports.UserEmojiModPerm = nativeBinding.UserEmojiModPerm +module.exports.UserEvent = nativeBinding.UserEvent module.exports.UserProfileFfvisibility = nativeBinding.UserProfileFfvisibility module.exports.UserProfileMutingNotificationTypes = nativeBinding.UserProfileMutingNotificationTypes module.exports.verifyPassword = nativeBinding.verifyPassword diff --git a/packages/backend-rs/src/service/stream.rs b/packages/backend-rs/src/service/stream.rs index 47b3891f63..bd412b26a9 100644 --- a/packages/backend-rs/src/service/stream.rs +++ b/packages/backend-rs/src/service/stream.rs @@ -11,6 +11,7 @@ pub mod moderation; pub mod note; pub mod note_edit; pub mod notes; +pub mod user; use crate::{ config::CONFIG, diff --git a/packages/backend-rs/src/service/stream/user.rs b/packages/backend-rs/src/service/stream/user.rs new file mode 100644 index 0000000000..fa18456891 --- /dev/null +++ b/packages/backend-rs/src/service/stream/user.rs @@ -0,0 +1,41 @@ +use crate::service::stream::{publish_to_stream, Error, Stream}; + +#[macros::export] +pub enum UserEvent { + Disconnect, + FollowChannel, + UnfollowChannel, + UpdateProfile, + Mute, + Unmute, + Follow, + Unfollow, +} + +// We want to merge `kind` and `object` into a single enum +// https://github.com/napi-rs/napi-rs/issues/2036 + +#[macros::export(js_name = "publishToUserStream")] +pub async fn publish( + user_id: String, + kind: UserEvent, + object: &serde_json::Value, +) -> Result<(), Error> { + let kind = match kind { + UserEvent::Disconnect => "terminate", + UserEvent::FollowChannel => "followChannel", + UserEvent::UnfollowChannel => "unfollowChannel", + UserEvent::UpdateProfile => "updateUserProfile", + UserEvent::Mute => "mute", + UserEvent::Unmute => "unmute", + UserEvent::Follow => "follow", + UserEvent::Unfollow => "unfollow", + }; + + publish_to_stream( + &Stream::User { user_id }, + Some(kind), + Some(serde_json::to_string(&object)?), + ) + .await +} diff --git a/packages/backend/src/queue/processors/system/check-expired-mutings.ts b/packages/backend/src/queue/processors/system/check-expired-mutings.ts index 05518ae8e1..8a189fd5cd 100644 --- a/packages/backend/src/queue/processors/system/check-expired-mutings.ts +++ b/packages/backend/src/queue/processors/system/check-expired-mutings.ts @@ -2,7 +2,7 @@ import type Bull from "bull"; import { In } from "typeorm"; import { Mutings } from "@/models/index.js"; import { queueLogger } from "../../logger.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; const logger = queueLogger.createSubLogger("check-expired-mutings"); @@ -23,9 +23,11 @@ export async function checkExpiredMutings( id: In(expired.map((m) => m.id)), }); - for (const m of expired) { - publishUserEvent(m.muterId, "unmute", m.mutee!); - } + await Promise.all( + expired.map((m) => + publishToUserStream(m.muterId, UserEvent.Unmute, m.mutee), + ), + ); } logger.info("All expired mutings checked."); diff --git a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts index 4235180838..815b3b3061 100644 --- a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts +++ b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; import { doPostSuspend } from "@/services/suspend-user.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; import { createDeleteAccountJob } from "@/queue/index.js"; export const meta = { @@ -53,6 +53,6 @@ export default define(meta, paramDef, async (ps, me) => { if (Users.isLocalUser(user)) { // Terminate streaming - publishUserEvent(user.id, "terminate", {}); + await publishToUserStream(user.id, UserEvent.Disconnect, {}); } }); diff --git a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts index c6d6f47bc6..03fde41fca 100644 --- a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts +++ b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts @@ -4,7 +4,7 @@ import { Users, Followings, Notifications } from "@/models/index.js"; import type { User } from "@/models/entities/user.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js"; import { doPostSuspend } from "@/services/suspend-user.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; export const meta = { tags: ["admin"], @@ -46,7 +46,7 @@ export default define(meta, paramDef, async (ps, me) => { // Terminate streaming if (Users.isLocalUser(user)) { - publishUserEvent(user.id, "terminate", {}); + await publishToUserStream(user.id, UserEvent.Disconnect, {}); } (async () => { diff --git a/packages/backend/src/server/api/endpoints/channels/follow.ts b/packages/backend/src/server/api/endpoints/channels/follow.ts index afc55ac1cf..9f8b0c2317 100644 --- a/packages/backend/src/server/api/endpoints/channels/follow.ts +++ b/packages/backend/src/server/api/endpoints/channels/follow.ts @@ -1,8 +1,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Channels, ChannelFollowings } from "@/models/index.js"; -import { genIdAt } from "backend-rs"; -import { publishUserEvent } from "@/services/stream.js"; +import { genIdAt, publishToUserStream, UserEvent } from "backend-rs"; export const meta = { tags: ["channels"], @@ -46,5 +45,5 @@ export default define(meta, paramDef, async (ps, user) => { followeeId: channel.id, }); - publishUserEvent(user.id, "followChannel", channel); + await publishToUserStream(user.id, UserEvent.FollowChannel, channel); }); diff --git a/packages/backend/src/server/api/endpoints/channels/unfollow.ts b/packages/backend/src/server/api/endpoints/channels/unfollow.ts index 3371000394..6c0e26bb53 100644 --- a/packages/backend/src/server/api/endpoints/channels/unfollow.ts +++ b/packages/backend/src/server/api/endpoints/channels/unfollow.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Channels, ChannelFollowings } from "@/models/index.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; export const meta = { tags: ["channels"], @@ -41,5 +41,5 @@ export default define(meta, paramDef, async (ps, user) => { followeeId: channel.id, }); - publishUserEvent(user.id, "unfollowChannel", channel); + await publishToUserStream(user.id, UserEvent.UnfollowChannel, channel); }); diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts index 5666a0011f..e17a483378 100644 --- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts +++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts @@ -1,4 +1,3 @@ -import { publishUserEvent } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { Users, UserProfiles } from "@/models/index.js"; import { @@ -7,6 +6,8 @@ import { InternalEvent, publishToInternalStream, publishToMainStream, + publishToUserStream, + UserEvent, verifyPassword, } from "backend-rs"; @@ -53,6 +54,6 @@ export default define(meta, paramDef, async (ps, user) => { // Terminate streaming setTimeout(() => { - publishUserEvent(user.id, "terminate", {}); + publishToUserStream(user.id, UserEvent.Disconnect, {}); }, 5000); }); diff --git a/packages/backend/src/server/api/endpoints/i/revoke-token.ts b/packages/backend/src/server/api/endpoints/i/revoke-token.ts index e38b330742..ba8d90f91a 100644 --- a/packages/backend/src/server/api/endpoints/i/revoke-token.ts +++ b/packages/backend/src/server/api/endpoints/i/revoke-token.ts @@ -1,6 +1,6 @@ import define from "@/server/api/define.js"; import { AccessTokens } from "@/models/index.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; export const meta = { requireCredential: true, @@ -26,6 +26,6 @@ export default define(meta, paramDef, async (ps, user) => { }); // Terminate streaming - publishUserEvent(user.id, "terminate"); + await publishToUserStream(user.id, UserEvent.Disconnect, {}); } }); diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index 0620f837ff..5f2a18867b 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -1,6 +1,10 @@ import * as mfm from "mfm-js"; -import { publishUserEvent } from "@/services/stream.js"; -import { Event, publishToMainStream } from "backend-rs"; +import { + Event, + publishToMainStream, + publishToUserStream, + UserEvent, +} from "backend-rs"; import acceptAllFollowRequests from "@/services/following/requests/accept-all.js"; import { publishToFollowers } from "@/services/i/update.js"; import { extractCustomEmojisFromMfm } from "@/misc/extract-custom-emojis-from-mfm.js"; @@ -346,10 +350,10 @@ export default define(meta, paramDef, async (ps, _user, token) => { }); // Publish meUpdated event - publishToMainStream(user.id, Event.Me, iObj); - publishUserEvent( + await publishToMainStream(user.id, Event.Me, iObj); + await publishToUserStream( user.id, - "updateUserProfile", + UserEvent.UpdateProfile, await UserProfiles.findOneBy({ userId: user.id }), ); diff --git a/packages/backend/src/server/api/endpoints/mute/create.ts b/packages/backend/src/server/api/endpoints/mute/create.ts index c8d6744b1b..a7c9fc6158 100644 --- a/packages/backend/src/server/api/endpoints/mute/create.ts +++ b/packages/backend/src/server/api/endpoints/mute/create.ts @@ -1,10 +1,9 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { getUser } from "@/server/api/common/getters.js"; -import { genIdAt } from "backend-rs"; +import { genIdAt, publishToUserStream, UserEvent } from "backend-rs"; import { Mutings, NoteWatchings } from "@/models/index.js"; import type { Muting } from "@/models/entities/muting.js"; -import { publishUserEvent } from "@/services/stream.js"; export const meta = { tags: ["account"], @@ -88,7 +87,7 @@ export default define(meta, paramDef, async (ps, user) => { muteeId: mutee.id, } as Muting); - publishUserEvent(user.id, "mute", mutee); + await publishToUserStream(user.id, UserEvent.Mute, mutee); NoteWatchings.delete({ userId: muter.id, diff --git a/packages/backend/src/server/api/endpoints/mute/delete.ts b/packages/backend/src/server/api/endpoints/mute/delete.ts index 8058e9a612..9d0dbb661e 100644 --- a/packages/backend/src/server/api/endpoints/mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/mute/delete.ts @@ -2,7 +2,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { getUser } from "@/server/api/common/getters.js"; import { Mutings } from "@/models/index.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; export const meta = { tags: ["account"], @@ -70,5 +70,5 @@ export default define(meta, paramDef, async (ps, user) => { id: muting.id, }); - publishUserEvent(user.id, "unmute", mutee); + await publishToUserStream(user.id, UserEvent.Unmute, mutee); }); diff --git a/packages/backend/src/server/api/endpoints/renote-mute/create.ts b/packages/backend/src/server/api/endpoints/renote-mute/create.ts index 3b2c22a98a..760b819c64 100644 --- a/packages/backend/src/server/api/endpoints/renote-mute/create.ts +++ b/packages/backend/src/server/api/endpoints/renote-mute/create.ts @@ -66,5 +66,5 @@ export default define(meta, paramDef, async (ps, user) => { muteeId: mutee.id, } as RenoteMuting); - // publishUserEvent(user.id, "mute", mutee); + // await publishToUserStream(user.id, UserEvent.RenoteMute, mutee); }); diff --git a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts index 6a824881b0..286db219db 100644 --- a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts @@ -59,5 +59,5 @@ export default define(meta, paramDef, async (ps, user) => { id: muting.id, }); - // publishUserEvent(user.id, "unmute", mutee); + // await publishToUserStream(user.id, UserEvent.RenoteUnMute, mutee); }); diff --git a/packages/backend/src/server/api/endpoints/reply-mute/create.ts b/packages/backend/src/server/api/endpoints/reply-mute/create.ts index 2d9f260b28..97a8faeb33 100644 --- a/packages/backend/src/server/api/endpoints/reply-mute/create.ts +++ b/packages/backend/src/server/api/endpoints/reply-mute/create.ts @@ -66,5 +66,5 @@ export default define(meta, paramDef, async (ps, user) => { muteeId: mutee.id, } as ReplyMuting); - // publishUserEvent(user.id, "mute", mutee); + // await publishToUserStream(user.id, UserEvent.ReplyMute, mutee); }); diff --git a/packages/backend/src/server/api/endpoints/reply-mute/delete.ts b/packages/backend/src/server/api/endpoints/reply-mute/delete.ts index 152d3fca60..28125d6a12 100644 --- a/packages/backend/src/server/api/endpoints/reply-mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/reply-mute/delete.ts @@ -59,5 +59,5 @@ export default define(meta, paramDef, async (ps, user) => { id: record.id, }); - // publishUserEvent(user.id, "unmute", mutee); + // await publishToUserStream(user.id, UserEvent.ReplyUnmute, mutee); }); diff --git a/packages/backend/src/server/api/mastodon/helpers/list.ts b/packages/backend/src/server/api/mastodon/helpers/list.ts index b7564c1048..6937c2e023 100644 --- a/packages/backend/src/server/api/mastodon/helpers/list.ts +++ b/packages/backend/src/server/api/mastodon/helpers/list.ts @@ -195,9 +195,9 @@ export class ListHelpers { if (exclusive !== undefined) { UserListJoinings.findBy({ userListId: list.id }).then((members) => { for (const member of members) { - publishUserEvent( + await publishToUserStream( list.userId, - exclusive ? "userHidden" : "userUnhidden", + exclusive ? UserEvent.Hidden : UserEvent.Unhidden, member.userId, ); } diff --git a/packages/backend/src/server/api/mastodon/helpers/user.ts b/packages/backend/src/server/api/mastodon/helpers/user.ts index 609b2224cf..008429b9e1 100644 --- a/packages/backend/src/server/api/mastodon/helpers/user.ts +++ b/packages/backend/src/server/api/mastodon/helpers/user.ts @@ -27,9 +27,8 @@ import deleteFollowing from "@/services/following/delete.js"; import cancelFollowRequest from "@/services/following/requests/cancel.js"; import createBlocking from "@/services/blocking/create.js"; import deleteBlocking from "@/services/blocking/delete.js"; -import { genId } from "backend-rs"; +import { genId, publishToUserStream, UserEvent } from "backend-rs"; import type { Muting } from "@/models/entities/muting.js"; -import { publishUserEvent } from "@/services/stream.js"; import { UserConverter } from "@/server/api/mastodon/converters/user.js"; import acceptFollowRequest from "@/services/following/requests/accept.js"; import { rejectFollowRequest } from "@/services/following/reject.js"; @@ -152,7 +151,7 @@ export class UserHelpers { muteeId: target.id, } as Muting); - publishUserEvent(localUser.id, "mute", target); + await publishToUserStream(localUser.id, UserEvent.Mute, target); NoteWatchings.delete({ userId: localUser.id, @@ -177,7 +176,7 @@ export class UserHelpers { id: muting.id, }); - publishUserEvent(localUser.id, "unmute", target); + await publishToUserStream(localUser.id, UserEvent.Unmute, target); } return this.getUserRelationshipTo(target.id, localUser.id); diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 138bcedfae..8f0d5a06af 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -63,7 +63,6 @@ export interface UserStreamTypes { unmute: User; follow: Packed<"UserDetailedNotMe">; unfollow: Packed<"User">; - userAdded: Packed<"User">; } export interface MainStreamTypes { diff --git a/packages/backend/src/services/blocking/create.ts b/packages/backend/src/services/blocking/create.ts index 02ac813967..1d7b80ffbf 100644 --- a/packages/backend/src/services/blocking/create.ts +++ b/packages/backend/src/services/blocking/create.ts @@ -1,4 +1,3 @@ -import { publishUserEvent } from "@/services/stream.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { renderUndo } from "@/remote/activitypub/renderer/undo.js"; @@ -15,7 +14,13 @@ import { UserListJoinings, UserLists, } from "@/models/index.js"; -import { Event, genIdAt, publishToMainStream } from "backend-rs"; +import { + Event, + genIdAt, + publishToMainStream, + publishToUserStream, + UserEvent, +} from "backend-rs"; import { getActiveWebhooks } from "@/misc/webhook-cache.js"; import { webhookDeliver } from "@/queue/index.js"; @@ -72,8 +77,8 @@ async function cancelRequest(follower: User, followee: User) { Users.pack(followee, follower, { detail: true, }).then(async (packed) => { - publishUserEvent(follower.id, "unfollow", packed); - publishToMainStream(follower.id, Event.Unfollow, packed); + await publishToUserStream(follower.id, UserEvent.Unfollow, packed); + await publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), @@ -127,7 +132,7 @@ async function unFollow(follower: User, followee: User) { Users.pack(followee, follower, { detail: true, }).then(async (packed) => { - publishUserEvent(follower.id, "unfollow", packed); + publishToUserStream(follower.id, UserEvent.Unfollow, packed); publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( diff --git a/packages/backend/src/services/delete-account.ts b/packages/backend/src/services/delete-account.ts index b307611cf5..117c45e612 100644 --- a/packages/backend/src/services/delete-account.ts +++ b/packages/backend/src/services/delete-account.ts @@ -1,6 +1,6 @@ import { Users } from "@/models/index.js"; import { createDeleteAccountJob } from "@/queue/index.js"; -import { publishUserEvent } from "@/services/stream.js"; +import { publishToUserStream, UserEvent } from "backend-rs"; import { doPostSuspend } from "@/services/suspend-user.js"; export async function deleteAccount(user: { @@ -19,5 +19,5 @@ export async function deleteAccount(user: { }); // Terminate streaming - publishUserEvent(user.id, "terminate", {}); + await publishToUserStream(user.id, UserEvent.Disconnect, {}); } diff --git a/packages/backend/src/services/following/create.ts b/packages/backend/src/services/following/create.ts index 1c962c9e9e..aea5bd48a5 100644 --- a/packages/backend/src/services/following/create.ts +++ b/packages/backend/src/services/following/create.ts @@ -1,4 +1,3 @@ -import { publishUserEvent } from "@/services/stream.js"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import renderAccept from "@/remote/activitypub/renderer/accept.js"; @@ -22,6 +21,8 @@ import { genIdAt, isSilencedServer, publishToMainStream, + publishToUserStream, + UserEvent, } from "backend-rs"; import { createNotification } from "@/services/create-notification.js"; import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js"; @@ -126,12 +127,12 @@ export async function insertFollowingDoc( Users.pack(followee.id, follower, { detail: true, }).then(async (packed) => { - publishUserEvent( + await publishToUserStream( follower.id, - "follow", + UserEvent.Follow, packed as Packed<"UserDetailedNotMe">, ); - publishToMainStream( + await publishToMainStream( follower.id, Event.Follow, packed as Packed<"UserDetailedNotMe">, diff --git a/packages/backend/src/services/following/delete.ts b/packages/backend/src/services/following/delete.ts index 65d409f8bf..9d0135b81f 100644 --- a/packages/backend/src/services/following/delete.ts +++ b/packages/backend/src/services/following/delete.ts @@ -1,5 +1,9 @@ -import { publishUserEvent } from "@/services/stream.js"; -import { Event, publishToMainStream } from "backend-rs"; +import { + Event, + publishToMainStream, + publishToUserStream, + UserEvent, +} from "backend-rs"; import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import { renderUndo } from "@/remote/activitypub/renderer/undo.js"; @@ -51,8 +55,8 @@ export default async function ( Users.pack(followee.id, follower, { detail: true, }).then(async (packed) => { - publishUserEvent(follower.id, "unfollow", packed); - publishToMainStream(follower.id, Event.Unfollow, packed); + await publishToUserStream(follower.id, UserEvent.Unfollow, packed); + await publishToMainStream(follower.id, Event.Unfollow, packed); const webhooks = (await getActiveWebhooks()).filter( (x) => x.userId === follower.id && x.on.includes("unfollow"), diff --git a/packages/backend/src/services/following/reject.ts b/packages/backend/src/services/following/reject.ts index 71f0220b14..1523573bf9 100644 --- a/packages/backend/src/services/following/reject.ts +++ b/packages/backend/src/services/following/reject.ts @@ -2,8 +2,12 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderFollow from "@/remote/activitypub/renderer/follow.js"; import renderReject from "@/remote/activitypub/renderer/reject.js"; import { deliver, webhookDeliver } from "@/queue/index.js"; -import { publishUserEvent } from "@/services/stream.js"; -import { Event, publishToMainStream } from "backend-rs"; +import { + Event, + publishToMainStream, + publishToUserStream, + UserEvent, +} from "backend-rs"; import type { ILocalUser, IRemoteUser } from "@/models/entities/user.js"; import { Users, FollowRequests, Followings } from "@/models/index.js"; import { decrementFollowing } from "./delete.js"; @@ -120,7 +124,7 @@ async function publishUnfollow(followee: Both, follower: Local) { detail: true, }); - publishUserEvent(follower.id, "unfollow", packedFollowee); + publishToUserStream(follower.id, UserEvent.Unfollow, packedFollowee); publishToMainStream(follower.id, Event.Unfollow, packedFollowee); const webhooks = (await getActiveWebhooks()).filter( diff --git a/packages/backend/src/services/i/update.ts b/packages/backend/src/services/i/update.ts index f2752f069b..fcdf2f4f57 100644 --- a/packages/backend/src/services/i/update.ts +++ b/packages/backend/src/services/i/update.ts @@ -11,8 +11,12 @@ import mfm from "mfm-js"; import { extractHashtags } from "@/misc/extract-hashtags.js"; import { normalizeForSearch } from "@/misc/normalize-for-search.js"; import { updateUsertags } from "@/services/update-hashtag.js"; -import { publishUserEvent } from "@/services/stream.js"; -import { Event, publishToMainStream } from "backend-rs"; +import { + Event, + publishToMainStream, + publishToUserStream, + UserEvent, +} from "backend-rs"; import acceptAllFollowRequests from "@/services/following/requests/accept-all.js"; import { promiseEarlyReturn } from "@/prelude/promise.js"; @@ -80,15 +84,15 @@ export async function updateUserProfileData( includeSecrets: isSecure, }); - publishToMainStream(user.id, Event.Me, iObj); - publishUserEvent( + await publishToMainStream(user.id, Event.Me, iObj); + await publishToUserStream( user.id, - "updateUserProfile", + UserEvent.UpdateProfile, await UserProfiles.findOneByOrFail({ userId: user.id }), ); if (user.isLocked && updates.isLocked === false) { - acceptAllFollowRequests(user); + await acceptAllFollowRequests(user); } await promiseEarlyReturn( diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 00f5dc92fa..0d13b8f5bb 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -1,5 +1,5 @@ import { redisClient } from "@/db/redis.js"; -import type { User } from "@/models/entities/user.js"; +// import type { User } from "@/models/entities/user.js"; // import type { Note } from "@/models/entities/note.js"; // import type { UserList } from "@/models/entities/user-list.js"; // import type { UserGroup } from "@/models/entities/user-group.js"; @@ -19,7 +19,7 @@ import type { // MessagingIndexStreamTypes, // MessagingStreamTypes, // NoteStreamTypes, - UserStreamTypes, + // UserStreamTypes, // NoteUpdatesStreamTypes, } from "@/server/api/stream/types.js"; @@ -53,17 +53,18 @@ class Publisher { // this.publish("internal", type, typeof value === "undefined" ? null : value); // }; - public publishUserEvent = ( - userId: User["id"], - type: K, - value?: UserStreamTypes[K], - ): void => { - this.publish( - `user:${userId}`, - type, - typeof value === "undefined" ? null : value, - ); - }; + /* ported to backend-rs */ + // public publishUserEvent = ( + // userId: User["id"], + // type: K, + // value?: UserStreamTypes[K], + // ): void => { + // this.publish( + // `user:${userId}`, + // type, + // typeof value === "undefined" ? null : value, + // ); + // }; /* ported to backend-rs */ // public publishBroadcastStream = ( @@ -217,7 +218,7 @@ const publisher = new Publisher(); export default publisher; // export const publishInternalEvent = publisher.publishInternalEvent; -export const publishUserEvent = publisher.publishUserEvent; +// export const publishUserEvent = publisher.publishUserEvent; // export const publishBroadcastStream = publisher.publishBroadcastStream; // export const publishMainStream = publisher.publishMainStream; // export const publishDriveStream = publisher.publishDriveStream;