From 7a54dc8b87b96c6e1a069902af7382d120e7566b Mon Sep 17 00:00:00 2001 From: naskya Date: Sat, 27 Jul 2024 20:39:02 +0900 Subject: [PATCH] refactor (backend): port publishInternalEvent to backend-rs --- packages/backend-rs/index.d.ts | 17 +++++++ packages/backend-rs/index.js | 2 + packages/backend-rs/src/service/stream.rs | 1 + .../backend-rs/src/service/stream/internal.rs | 45 +++++++++++++++++++ .../src/remote/activitypub/models/person.ts | 12 +++-- .../admin/drive-capacity-override.ts | 4 +- .../api/endpoints/admin/moderators/add.ts | 4 +- .../api/endpoints/admin/moderators/remove.ts | 4 +- .../endpoints/admin/set-emoji-moderator.ts | 6 --- .../api/endpoints/admin/silence-user.ts | 4 +- .../api/endpoints/admin/unsilence-user.ts | 4 +- .../server/api/endpoints/antennas/create.ts | 11 +++-- .../server/api/endpoints/antennas/delete.ts | 9 ++-- .../server/api/endpoints/antennas/update.ts | 11 +++-- .../api/endpoints/i/regenerate-token.ts | 6 ++- .../server/api/endpoints/i/webhooks/create.ts | 5 +-- .../server/api/endpoints/i/webhooks/delete.ts | 4 +- .../server/api/endpoints/i/webhooks/update.ts | 4 +- packages/backend/src/services/stream.ts | 17 +++---- packages/backend/src/services/suspend-user.ts | 4 +- .../backend/src/services/unsuspend-user.ts | 4 +- 21 files changed, 128 insertions(+), 50 deletions(-) create mode 100644 packages/backend-rs/src/service/stream/internal.rs diff --git a/packages/backend-rs/index.d.ts b/packages/backend-rs/index.d.ts index b67d586e8f..8139084116 100644 --- a/packages/backend-rs/index.d.ts +++ b/packages/backend-rs/index.d.ts @@ -574,6 +574,21 @@ export interface Instance { faviconUrl: string | null } +export declare enum InternalEvent { + Suspend = 0, + Silence = 1, + Moderator = 2, + Token = 3, + LocalUser = 4, + RemoteUser = 5, + WebhookCreated = 6, + WebhookUpdated = 7, + WebhookDeleted = 8, + AntennaCreated = 9, + AntennaUpdated = 10, + AntennaDeleted = 11 +} + /** * Checks if a server is allowlisted. * Returns `Ok(true)` if private mode is disabled. @@ -1164,6 +1179,8 @@ export declare function publishToDriveFolderStream(userId: string, kind: DriveFo export declare function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): Promise +export declare function publishToInternalStream(kind: InternalEvent, object: any): Promise + export declare function publishToMainStream(userId: string, kind: Event, object: any): Promise export declare function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): Promise diff --git a/packages/backend-rs/index.js b/packages/backend-rs/index.js index f62cbb3626..9d17a05028 100644 --- a/packages/backend-rs/index.js +++ b/packages/backend-rs/index.js @@ -394,6 +394,7 @@ module.exports.greet = nativeBinding.greet module.exports.hashPassword = nativeBinding.hashPassword module.exports.Inbound = nativeBinding.Inbound module.exports.initializeRustLogger = nativeBinding.initializeRustLogger +module.exports.InternalEvent = nativeBinding.InternalEvent module.exports.isAllowedServer = nativeBinding.isAllowedServer module.exports.isBlockedServer = nativeBinding.isBlockedServer module.exports.isOldPasswordAlgorithm = nativeBinding.isOldPasswordAlgorithm @@ -425,6 +426,7 @@ module.exports.publishToChatStream = nativeBinding.publishToChatStream module.exports.publishToDriveFileStream = nativeBinding.publishToDriveFileStream module.exports.publishToDriveFolderStream = nativeBinding.publishToDriveFolderStream module.exports.publishToGroupChatStream = nativeBinding.publishToGroupChatStream +module.exports.publishToInternalStream = nativeBinding.publishToInternalStream module.exports.publishToMainStream = nativeBinding.publishToMainStream module.exports.publishToModerationStream = nativeBinding.publishToModerationStream module.exports.publishToNotesStream = nativeBinding.publishToNotesStream diff --git a/packages/backend-rs/src/service/stream.rs b/packages/backend-rs/src/service/stream.rs index 8998ee9a1f..47b3891f63 100644 --- a/packages/backend-rs/src/service/stream.rs +++ b/packages/backend-rs/src/service/stream.rs @@ -5,6 +5,7 @@ pub mod chat_index; pub mod custom_emoji; pub mod drive; pub mod group_chat; +pub mod internal; pub mod main; pub mod moderation; pub mod note; diff --git a/packages/backend-rs/src/service/stream/internal.rs b/packages/backend-rs/src/service/stream/internal.rs new file mode 100644 index 0000000000..b78f40e4c9 --- /dev/null +++ b/packages/backend-rs/src/service/stream/internal.rs @@ -0,0 +1,45 @@ +use crate::service::stream::{publish_to_stream, Error, Stream}; + +#[macros::export] +pub enum InternalEvent { + Suspend, + Silence, + Moderator, + Token, + LocalUser, + RemoteUser, + WebhookCreated, + WebhookUpdated, + WebhookDeleted, + AntennaCreated, + AntennaUpdated, + AntennaDeleted, +} + +// We want to merge `kind` and `object` into a single enum +// https://github.com/napi-rs/napi-rs/issues/2036 + +#[macros::export(js_name = "publishToInternalStream")] +pub async fn publish(kind: InternalEvent, object: &serde_json::Value) -> Result<(), Error> { + let kind = match kind { + InternalEvent::Suspend => "userChangeSuspendedState", + InternalEvent::Silence => "userChangeSilencedState", + InternalEvent::Moderator => "userChangeModeratorState", + InternalEvent::Token => "userTokenRegenerated", + InternalEvent::LocalUser => "localUserUpdated", + InternalEvent::RemoteUser => "remoteUserUpdated", + InternalEvent::WebhookCreated => "webhookCreated", + InternalEvent::WebhookUpdated => "webhookUpdated", + InternalEvent::WebhookDeleted => "webhookDeleted", + InternalEvent::AntennaCreated => "antennaCreated", + InternalEvent::AntennaUpdated => "antennaUpdated", + InternalEvent::AntennaDeleted => "antennaDeleted", + }; + + publish_to_stream( + &Stream::Internal, + Some(kind), + Some(serde_json::to_string(&object)?), + ) + .await +} diff --git a/packages/backend/src/remote/activitypub/models/person.ts b/packages/backend/src/remote/activitypub/models/person.ts index e861553e97..8a94af4bed 100644 --- a/packages/backend/src/remote/activitypub/models/person.ts +++ b/packages/backend/src/remote/activitypub/models/person.ts @@ -16,7 +16,14 @@ import type { IRemoteUser, CacheableUser } from "@/models/entities/user.js"; import { User } from "@/models/entities/user.js"; import type { Emoji } from "@/models/entities/emoji.js"; import { UserNotePining } from "@/models/entities/user-note-pining.js"; -import { genId, genIdAt, isSameOrigin, toPuny } from "backend-rs"; +import { + genId, + genIdAt, + InternalEvent, + isSameOrigin, + publishToInternalStream, + toPuny, +} from "backend-rs"; import { UserPublickey } from "@/models/entities/user-publickey.js"; import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js"; import { UserProfile } from "@/models/entities/user-profile.js"; @@ -26,7 +33,6 @@ import { normalizeForSearch } from "@/misc/normalize-for-search.js"; import { truncate } from "@/misc/truncate.js"; import { StatusError } from "@/misc/fetch.js"; import { uriPersonCache } from "@/services/user-cache.js"; -import { publishInternalEvent } from "@/services/stream.js"; import { db } from "@/db/postgre.js"; import { apLogger } from "../logger.js"; import { htmlToMfm } from "../misc/html-to-mfm.js"; @@ -611,7 +617,7 @@ export async function updatePerson( }, ); - publishInternalEvent("remoteUserUpdated", { id: user.id }); + publishToInternalStream(InternalEvent.RemoteUser, { id: user.id }); // Hashtag Update updateUsertags(user, tags); diff --git a/packages/backend/src/server/api/endpoints/admin/drive-capacity-override.ts b/packages/backend/src/server/api/endpoints/admin/drive-capacity-override.ts index d4bb3045c6..26abdb890e 100644 --- a/packages/backend/src/server/api/endpoints/admin/drive-capacity-override.ts +++ b/packages/backend/src/server/api/endpoints/admin/drive-capacity-override.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["admin"], @@ -33,7 +33,7 @@ export default define(meta, paramDef, async (ps, me) => { driveCapacityOverrideMb: ps.overrideMb, }); - publishInternalEvent("localUserUpdated", { + publishToInternalStream(InternalEvent.LocalUser, { id: user.id, }); diff --git a/packages/backend/src/server/api/endpoints/admin/moderators/add.ts b/packages/backend/src/server/api/endpoints/admin/moderators/add.ts index c8b53088cd..a5fefaaaca 100644 --- a/packages/backend/src/server/api/endpoints/admin/moderators/add.ts +++ b/packages/backend/src/server/api/endpoints/admin/moderators/add.ts @@ -1,6 +1,6 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["admin"], @@ -32,7 +32,7 @@ export default define(meta, paramDef, async (ps) => { isModerator: true, }); - publishInternalEvent("userChangeModeratorState", { + publishToInternalStream(InternalEvent.Moderator, { id: user.id, isModerator: true, }); diff --git a/packages/backend/src/server/api/endpoints/admin/moderators/remove.ts b/packages/backend/src/server/api/endpoints/admin/moderators/remove.ts index e2af1ca374..5adcae1e3f 100644 --- a/packages/backend/src/server/api/endpoints/admin/moderators/remove.ts +++ b/packages/backend/src/server/api/endpoints/admin/moderators/remove.ts @@ -1,6 +1,6 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["admin"], @@ -28,7 +28,7 @@ export default define(meta, paramDef, async (ps) => { isModerator: false, }); - publishInternalEvent("userChangeModeratorState", { + publishToInternalStream(InternalEvent.Moderator, { id: user.id, isModerator: false, }); diff --git a/packages/backend/src/server/api/endpoints/admin/set-emoji-moderator.ts b/packages/backend/src/server/api/endpoints/admin/set-emoji-moderator.ts index 6633edbc8c..f7871bd9b2 100644 --- a/packages/backend/src/server/api/endpoints/admin/set-emoji-moderator.ts +++ b/packages/backend/src/server/api/endpoints/admin/set-emoji-moderator.ts @@ -1,6 +1,5 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; import type { EmojiModPerm } from "@/models/entities/user.js"; import { unsafeCast } from "@/prelude/unsafe-cast.js"; @@ -36,9 +35,4 @@ export default define(meta, paramDef, async (ps) => { await Users.update(user.id, { emojiModPerm: unsafeCast(ps.emojiModPerm), }); - - publishInternalEvent("userChangeModeratorState", { - id: user.id, - isModerator: true, - }); }); diff --git a/packages/backend/src/server/api/endpoints/admin/silence-user.ts b/packages/backend/src/server/api/endpoints/admin/silence-user.ts index 8d9dcac681..7cf116b788 100644 --- a/packages/backend/src/server/api/endpoints/admin/silence-user.ts +++ b/packages/backend/src/server/api/endpoints/admin/silence-user.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["admin"], @@ -33,7 +33,7 @@ export default define(meta, paramDef, async (ps, me) => { isSilenced: true, }); - publishInternalEvent("userChangeSilencedState", { + publishToInternalStream(InternalEvent.Silence, { id: user.id, isSilenced: true, }); diff --git a/packages/backend/src/server/api/endpoints/admin/unsilence-user.ts b/packages/backend/src/server/api/endpoints/admin/unsilence-user.ts index 6592bf443c..d376e35b72 100644 --- a/packages/backend/src/server/api/endpoints/admin/unsilence-user.ts +++ b/packages/backend/src/server/api/endpoints/admin/unsilence-user.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { Users } from "@/models/index.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["admin"], @@ -29,7 +29,7 @@ export default define(meta, paramDef, async (ps, me) => { isSilenced: false, }); - publishInternalEvent("userChangeSilencedState", { + publishToInternalStream(InternalEvent.Silence, { id: user.id, isSilenced: false, }); diff --git a/packages/backend/src/server/api/endpoints/antennas/create.ts b/packages/backend/src/server/api/endpoints/antennas/create.ts index 332cff3ab6..0a19558f19 100644 --- a/packages/backend/src/server/api/endpoints/antennas/create.ts +++ b/packages/backend/src/server/api/endpoints/antennas/create.ts @@ -1,8 +1,13 @@ import define from "@/server/api/define.js"; -import { fetchMeta, genIdAt, updateAntennaCache } from "backend-rs"; +import { + fetchMeta, + genIdAt, + InternalEvent, + publishToInternalStream, + updateAntennaCache, +} from "backend-rs"; import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js"; import { ApiError } from "@/server/api/error.js"; -import { publishInternalEvent } from "@/services/stream.js"; export const meta = { tags: ["antennas"], @@ -172,7 +177,7 @@ export default define(meta, paramDef, async (ps, user) => { notify: ps.notify, }).then((x) => Antennas.findOneByOrFail(x.identifiers[0])); - publishInternalEvent("antennaCreated", antenna); + await publishToInternalStream(InternalEvent.AntennaCreated, antenna); await updateAntennaCache(); return await Antennas.pack(antenna); diff --git a/packages/backend/src/server/api/endpoints/antennas/delete.ts b/packages/backend/src/server/api/endpoints/antennas/delete.ts index 64ba1c2e1f..23bbc7c1f9 100644 --- a/packages/backend/src/server/api/endpoints/antennas/delete.ts +++ b/packages/backend/src/server/api/endpoints/antennas/delete.ts @@ -1,8 +1,11 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Antennas } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; -import { updateAntennaCache } from "backend-rs"; +import { + InternalEvent, + publishToInternalStream, + updateAntennaCache, +} from "backend-rs"; export const meta = { tags: ["antennas"], @@ -40,6 +43,6 @@ export default define(meta, paramDef, async (ps, user) => { await Antennas.delete(antenna.id); - publishInternalEvent("antennaDeleted", antenna); + await publishToInternalStream(InternalEvent.AntennaDeleted, antenna); await updateAntennaCache(); }); diff --git a/packages/backend/src/server/api/endpoints/antennas/update.ts b/packages/backend/src/server/api/endpoints/antennas/update.ts index 24d659f47a..5f4544ca2c 100644 --- a/packages/backend/src/server/api/endpoints/antennas/update.ts +++ b/packages/backend/src/server/api/endpoints/antennas/update.ts @@ -1,8 +1,11 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; -import { updateAntennaCache } from "backend-rs"; +import { + InternalEvent, + publishToInternalStream, + updateAntennaCache, +} from "backend-rs"; export const meta = { tags: ["antennas"], @@ -163,8 +166,8 @@ export default define(meta, paramDef, async (ps, user) => { notify: ps.notify, }); - publishInternalEvent( - "antennaUpdated", + await publishToInternalStream( + InternalEvent.AntennaUpdated, await Antennas.findOneByOrFail({ id: antenna.id }), ); await updateAntennaCache(); diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts index 0f0f79c35e..5666a0011f 100644 --- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts +++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts @@ -1,9 +1,11 @@ -import { publishInternalEvent, publishUserEvent } from "@/services/stream.js"; +import { publishUserEvent } from "@/services/stream.js"; import define from "@/server/api/define.js"; import { Users, UserProfiles } from "@/models/index.js"; import { Event, generateUserToken, + InternalEvent, + publishToInternalStream, publishToMainStream, verifyPassword, } from "backend-rs"; @@ -42,7 +44,7 @@ export default define(meta, paramDef, async (ps, user) => { }); // Publish event - publishInternalEvent("userTokenRegenerated", { + publishToInternalStream(InternalEvent.Token, { id: user.id, oldToken, newToken, diff --git a/packages/backend/src/server/api/endpoints/i/webhooks/create.ts b/packages/backend/src/server/api/endpoints/i/webhooks/create.ts index 031f0e4eab..cf10b6b37b 100644 --- a/packages/backend/src/server/api/endpoints/i/webhooks/create.ts +++ b/packages/backend/src/server/api/endpoints/i/webhooks/create.ts @@ -1,7 +1,6 @@ import define from "@/server/api/define.js"; -import { genIdAt } from "backend-rs"; +import { genIdAt, InternalEvent, publishToInternalStream } from "backend-rs"; import { Webhooks } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; import { webhookEventTypes } from "@/models/entities/webhook.js"; export const meta = { @@ -41,7 +40,7 @@ export default define(meta, paramDef, async (ps, user) => { on: ps.on, }).then((x) => Webhooks.findOneByOrFail(x.identifiers[0])); - publishInternalEvent("webhookCreated", webhook); + publishToInternalStream(InternalEvent.WebhookCreated, webhook); return webhook; }); diff --git a/packages/backend/src/server/api/endpoints/i/webhooks/delete.ts b/packages/backend/src/server/api/endpoints/i/webhooks/delete.ts index a92ed179d0..4c3b0e7849 100644 --- a/packages/backend/src/server/api/endpoints/i/webhooks/delete.ts +++ b/packages/backend/src/server/api/endpoints/i/webhooks/delete.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Webhooks } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export const meta = { tags: ["webhooks"], @@ -39,5 +39,5 @@ export default define(meta, paramDef, async (ps, user) => { await Webhooks.delete(webhook.id); - publishInternalEvent("webhookDeleted", webhook); + publishToInternalStream(InternalEvent.WebhookDeleted, webhook); }); diff --git a/packages/backend/src/server/api/endpoints/i/webhooks/update.ts b/packages/backend/src/server/api/endpoints/i/webhooks/update.ts index 4a211b915f..f65690e142 100644 --- a/packages/backend/src/server/api/endpoints/i/webhooks/update.ts +++ b/packages/backend/src/server/api/endpoints/i/webhooks/update.ts @@ -1,7 +1,7 @@ import define from "@/server/api/define.js"; import { ApiError } from "@/server/api/error.js"; import { Webhooks } from "@/models/index.js"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; import { webhookEventTypes } from "@/models/entities/webhook.js"; export const meta = { @@ -57,5 +57,5 @@ export default define(meta, paramDef, async (ps, user) => { active: ps.active, }); - publishInternalEvent("webhookUpdated", webhook); + publishToInternalStream(InternalEvent.WebhookUpdated, webhook); }); diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 71c6a3cc65..00f5dc92fa 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -14,7 +14,7 @@ import type { // ChannelStreamTypes, // DriveStreamTypes, // GroupMessagingStreamTypes, - InternalStreamTypes, + // InternalStreamTypes, // MainStreamTypes, // MessagingIndexStreamTypes, // MessagingStreamTypes, @@ -45,12 +45,13 @@ class Publisher { ); }; - public publishInternalEvent = ( - type: K, - value?: InternalStreamTypes[K], - ): void => { - this.publish("internal", type, typeof value === "undefined" ? null : value); - }; + /* ported to backend-rs */ + // public publishInternalEvent = ( + // type: K, + // value?: InternalStreamTypes[K], + // ): void => { + // this.publish("internal", type, typeof value === "undefined" ? null : value); + // }; public publishUserEvent = ( userId: User["id"], @@ -215,7 +216,7 @@ const publisher = new Publisher(); export default publisher; -export const publishInternalEvent = publisher.publishInternalEvent; +// export const publishInternalEvent = publisher.publishInternalEvent; export const publishUserEvent = publisher.publishUserEvent; // export const publishBroadcastStream = publisher.publishBroadcastStream; // export const publishMainStream = publisher.publishMainStream; diff --git a/packages/backend/src/services/suspend-user.ts b/packages/backend/src/services/suspend-user.ts index 22b83d3fdb..6bf271ba7f 100644 --- a/packages/backend/src/services/suspend-user.ts +++ b/packages/backend/src/services/suspend-user.ts @@ -5,13 +5,13 @@ import { config } from "@/config.js"; import type { User } from "@/models/entities/user.js"; import { Users, Followings } from "@/models/index.js"; import { Not, IsNull } from "typeorm"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export async function doPostSuspend(user: { id: User["id"]; host: User["host"]; }) { - publishInternalEvent("userChangeSuspendedState", { + await publishToInternalStream(InternalEvent.Suspend, { id: user.id, isSuspended: true, }); diff --git a/packages/backend/src/services/unsuspend-user.ts b/packages/backend/src/services/unsuspend-user.ts index b5e83a4151..33db77b0a3 100644 --- a/packages/backend/src/services/unsuspend-user.ts +++ b/packages/backend/src/services/unsuspend-user.ts @@ -6,10 +6,10 @@ import { config } from "@/config.js"; import type { User } from "@/models/entities/user.js"; import { Users, Followings } from "@/models/index.js"; import { Not, IsNull } from "typeorm"; -import { publishInternalEvent } from "@/services/stream.js"; +import { InternalEvent, publishToInternalStream } from "backend-rs"; export async function doPostUnsuspend(user: User) { - publishInternalEvent("userChangeSuspendedState", { + publishToInternalStream(InternalEvent.Suspend, { id: user.id, isSuspended: false, });