refactor (backend): port publishUserEvent to backend-rs

This commit is contained in:
naskya 2024-07-27 21:02:48 +09:00
parent 7a54dc8b87
commit b6498a12e8
No known key found for this signature in database
GPG key ID: 712D413B3A9FED5C
28 changed files with 155 additions and 76 deletions

View file

@ -1191,6 +1191,8 @@ export declare function publishToNoteStream(noteId: string, kind: NoteEvent, obj
export declare function publishToNoteUpdatesStream(note: Note): Promise<void>
export declare function publishToUserStream(userId: string, kind: UserEvent, object: any): Promise<void>
export interface PugArgs {
img: string | null
title: string
@ -1482,6 +1484,17 @@ export type UserEmojiModPerm = 'add'|
'mod'|
'unauthorized';
export declare enum UserEvent {
Disconnect = 0,
FollowChannel = 1,
UnfollowChannel = 2,
UpdateProfile = 3,
Mute = 4,
Unmute = 5,
Follow = 6,
Unfollow = 7
}
export interface UserGroup {
id: string
createdAt: DateTimeWithTimeZone

View file

@ -432,6 +432,7 @@ module.exports.publishToModerationStream = nativeBinding.publishToModerationStre
module.exports.publishToNotesStream = nativeBinding.publishToNotesStream
module.exports.publishToNoteStream = nativeBinding.publishToNoteStream
module.exports.publishToNoteUpdatesStream = nativeBinding.publishToNoteUpdatesStream
module.exports.publishToUserStream = nativeBinding.publishToUserStream
module.exports.PushNotificationKind = nativeBinding.PushNotificationKind
module.exports.PushSubscriptionType = nativeBinding.PushSubscriptionType
module.exports.RelayStatus = nativeBinding.RelayStatus
@ -454,6 +455,7 @@ module.exports.updateAntennasOnNewNote = nativeBinding.updateAntennasOnNewNote
module.exports.updateMetaCache = nativeBinding.updateMetaCache
module.exports.updateNodeinfoCache = nativeBinding.updateNodeinfoCache
module.exports.UserEmojiModPerm = nativeBinding.UserEmojiModPerm
module.exports.UserEvent = nativeBinding.UserEvent
module.exports.UserProfileFfvisibility = nativeBinding.UserProfileFfvisibility
module.exports.UserProfileMutingNotificationTypes = nativeBinding.UserProfileMutingNotificationTypes
module.exports.verifyPassword = nativeBinding.verifyPassword

View file

@ -11,6 +11,7 @@ pub mod moderation;
pub mod note;
pub mod note_edit;
pub mod notes;
pub mod user;
use crate::{
config::CONFIG,

View file

@ -0,0 +1,41 @@
use crate::service::stream::{publish_to_stream, Error, Stream};
#[macros::export]
pub enum UserEvent {
Disconnect,
FollowChannel,
UnfollowChannel,
UpdateProfile,
Mute,
Unmute,
Follow,
Unfollow,
}
// We want to merge `kind` and `object` into a single enum
// https://github.com/napi-rs/napi-rs/issues/2036
#[macros::export(js_name = "publishToUserStream")]
pub async fn publish(
user_id: String,
kind: UserEvent,
object: &serde_json::Value,
) -> Result<(), Error> {
let kind = match kind {
UserEvent::Disconnect => "terminate",
UserEvent::FollowChannel => "followChannel",
UserEvent::UnfollowChannel => "unfollowChannel",
UserEvent::UpdateProfile => "updateUserProfile",
UserEvent::Mute => "mute",
UserEvent::Unmute => "unmute",
UserEvent::Follow => "follow",
UserEvent::Unfollow => "unfollow",
};
publish_to_stream(
&Stream::User { user_id },
Some(kind),
Some(serde_json::to_string(&object)?),
)
.await
}

View file

@ -2,7 +2,7 @@ import type Bull from "bull";
import { In } from "typeorm";
import { Mutings } from "@/models/index.js";
import { queueLogger } from "../../logger.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
const logger = queueLogger.createSubLogger("check-expired-mutings");
@ -23,9 +23,11 @@ export async function checkExpiredMutings(
id: In(expired.map((m) => m.id)),
});
for (const m of expired) {
publishUserEvent(m.muterId, "unmute", m.mutee!);
}
await Promise.all(
expired.map((m) =>
publishToUserStream(m.muterId, UserEvent.Unmute, m.mutee),
),
);
}
logger.info("All expired mutings checked.");

View file

@ -1,7 +1,7 @@
import define from "@/server/api/define.js";
import { Users } from "@/models/index.js";
import { doPostSuspend } from "@/services/suspend-user.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
import { createDeleteAccountJob } from "@/queue/index.js";
export const meta = {
@ -53,6 +53,6 @@ export default define(meta, paramDef, async (ps, me) => {
if (Users.isLocalUser(user)) {
// Terminate streaming
publishUserEvent(user.id, "terminate", {});
await publishToUserStream(user.id, UserEvent.Disconnect, {});
}
});

View file

@ -4,7 +4,7 @@ import { Users, Followings, Notifications } from "@/models/index.js";
import type { User } from "@/models/entities/user.js";
import { insertModerationLog } from "@/services/insert-moderation-log.js";
import { doPostSuspend } from "@/services/suspend-user.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
export const meta = {
tags: ["admin"],
@ -46,7 +46,7 @@ export default define(meta, paramDef, async (ps, me) => {
// Terminate streaming
if (Users.isLocalUser(user)) {
publishUserEvent(user.id, "terminate", {});
await publishToUserStream(user.id, UserEvent.Disconnect, {});
}
(async () => {

View file

@ -1,8 +1,7 @@
import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
import { Channels, ChannelFollowings } from "@/models/index.js";
import { genIdAt } from "backend-rs";
import { publishUserEvent } from "@/services/stream.js";
import { genIdAt, publishToUserStream, UserEvent } from "backend-rs";
export const meta = {
tags: ["channels"],
@ -46,5 +45,5 @@ export default define(meta, paramDef, async (ps, user) => {
followeeId: channel.id,
});
publishUserEvent(user.id, "followChannel", channel);
await publishToUserStream(user.id, UserEvent.FollowChannel, channel);
});

View file

@ -1,7 +1,7 @@
import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
import { Channels, ChannelFollowings } from "@/models/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
export const meta = {
tags: ["channels"],
@ -41,5 +41,5 @@ export default define(meta, paramDef, async (ps, user) => {
followeeId: channel.id,
});
publishUserEvent(user.id, "unfollowChannel", channel);
await publishToUserStream(user.id, UserEvent.UnfollowChannel, channel);
});

View file

@ -1,4 +1,3 @@
import { publishUserEvent } from "@/services/stream.js";
import define from "@/server/api/define.js";
import { Users, UserProfiles } from "@/models/index.js";
import {
@ -7,6 +6,8 @@ import {
InternalEvent,
publishToInternalStream,
publishToMainStream,
publishToUserStream,
UserEvent,
verifyPassword,
} from "backend-rs";
@ -53,6 +54,6 @@ export default define(meta, paramDef, async (ps, user) => {
// Terminate streaming
setTimeout(() => {
publishUserEvent(user.id, "terminate", {});
publishToUserStream(user.id, UserEvent.Disconnect, {});
}, 5000);
});

View file

@ -1,6 +1,6 @@
import define from "@/server/api/define.js";
import { AccessTokens } from "@/models/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
export const meta = {
requireCredential: true,
@ -26,6 +26,6 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Terminate streaming
publishUserEvent(user.id, "terminate");
await publishToUserStream(user.id, UserEvent.Disconnect, {});
}
});

View file

@ -1,6 +1,10 @@
import * as mfm from "mfm-js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import {
Event,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import acceptAllFollowRequests from "@/services/following/requests/accept-all.js";
import { publishToFollowers } from "@/services/i/update.js";
import { extractCustomEmojisFromMfm } from "@/misc/extract-custom-emojis-from-mfm.js";
@ -346,10 +350,10 @@ export default define(meta, paramDef, async (ps, _user, token) => {
});
// Publish meUpdated event
publishToMainStream(user.id, Event.Me, iObj);
publishUserEvent(
await publishToMainStream(user.id, Event.Me, iObj);
await publishToUserStream(
user.id,
"updateUserProfile",
UserEvent.UpdateProfile,
await UserProfiles.findOneBy({ userId: user.id }),
);

View file

@ -1,10 +1,9 @@
import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
import { getUser } from "@/server/api/common/getters.js";
import { genIdAt } from "backend-rs";
import { genIdAt, publishToUserStream, UserEvent } from "backend-rs";
import { Mutings, NoteWatchings } from "@/models/index.js";
import type { Muting } from "@/models/entities/muting.js";
import { publishUserEvent } from "@/services/stream.js";
export const meta = {
tags: ["account"],
@ -88,7 +87,7 @@ export default define(meta, paramDef, async (ps, user) => {
muteeId: mutee.id,
} as Muting);
publishUserEvent(user.id, "mute", mutee);
await publishToUserStream(user.id, UserEvent.Mute, mutee);
NoteWatchings.delete({
userId: muter.id,

View file

@ -2,7 +2,7 @@ import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
import { getUser } from "@/server/api/common/getters.js";
import { Mutings } from "@/models/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
export const meta = {
tags: ["account"],
@ -70,5 +70,5 @@ export default define(meta, paramDef, async (ps, user) => {
id: muting.id,
});
publishUserEvent(user.id, "unmute", mutee);
await publishToUserStream(user.id, UserEvent.Unmute, mutee);
});

View file

@ -66,5 +66,5 @@ export default define(meta, paramDef, async (ps, user) => {
muteeId: mutee.id,
} as RenoteMuting);
// publishUserEvent(user.id, "mute", mutee);
// await publishToUserStream(user.id, UserEvent.RenoteMute, mutee);
});

View file

@ -59,5 +59,5 @@ export default define(meta, paramDef, async (ps, user) => {
id: muting.id,
});
// publishUserEvent(user.id, "unmute", mutee);
// await publishToUserStream(user.id, UserEvent.RenoteUnMute, mutee);
});

View file

@ -66,5 +66,5 @@ export default define(meta, paramDef, async (ps, user) => {
muteeId: mutee.id,
} as ReplyMuting);
// publishUserEvent(user.id, "mute", mutee);
// await publishToUserStream(user.id, UserEvent.ReplyMute, mutee);
});

View file

@ -59,5 +59,5 @@ export default define(meta, paramDef, async (ps, user) => {
id: record.id,
});
// publishUserEvent(user.id, "unmute", mutee);
// await publishToUserStream(user.id, UserEvent.ReplyUnmute, mutee);
});

View file

@ -195,9 +195,9 @@ export class ListHelpers {
if (exclusive !== undefined) {
UserListJoinings.findBy({ userListId: list.id }).then((members) => {
for (const member of members) {
publishUserEvent(
await publishToUserStream(
list.userId,
exclusive ? "userHidden" : "userUnhidden",
exclusive ? UserEvent.Hidden : UserEvent.Unhidden,
member.userId,
);
}

View file

@ -27,9 +27,8 @@ import deleteFollowing from "@/services/following/delete.js";
import cancelFollowRequest from "@/services/following/requests/cancel.js";
import createBlocking from "@/services/blocking/create.js";
import deleteBlocking from "@/services/blocking/delete.js";
import { genId } from "backend-rs";
import { genId, publishToUserStream, UserEvent } from "backend-rs";
import type { Muting } from "@/models/entities/muting.js";
import { publishUserEvent } from "@/services/stream.js";
import { UserConverter } from "@/server/api/mastodon/converters/user.js";
import acceptFollowRequest from "@/services/following/requests/accept.js";
import { rejectFollowRequest } from "@/services/following/reject.js";
@ -152,7 +151,7 @@ export class UserHelpers {
muteeId: target.id,
} as Muting);
publishUserEvent(localUser.id, "mute", target);
await publishToUserStream(localUser.id, UserEvent.Mute, target);
NoteWatchings.delete({
userId: localUser.id,
@ -177,7 +176,7 @@ export class UserHelpers {
id: muting.id,
});
publishUserEvent(localUser.id, "unmute", target);
await publishToUserStream(localUser.id, UserEvent.Unmute, target);
}
return this.getUserRelationshipTo(target.id, localUser.id);

View file

@ -63,7 +63,6 @@ export interface UserStreamTypes {
unmute: User;
follow: Packed<"UserDetailedNotMe">;
unfollow: Packed<"User">;
userAdded: Packed<"User">;
}
export interface MainStreamTypes {

View file

@ -1,4 +1,3 @@
import { publishUserEvent } from "@/services/stream.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
@ -15,7 +14,13 @@ import {
UserListJoinings,
UserLists,
} from "@/models/index.js";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
import {
Event,
genIdAt,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
import { webhookDeliver } from "@/queue/index.js";
@ -72,8 +77,8 @@ async function cancelRequest(follower: User, followee: User) {
Users.pack(followee, follower, {
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
await publishToUserStream(follower.id, UserEvent.Unfollow, packed);
await publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),
@ -127,7 +132,7 @@ async function unFollow(follower: User, followee: User) {
Users.pack(followee, follower, {
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishToUserStream(follower.id, UserEvent.Unfollow, packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(

View file

@ -1,6 +1,6 @@
import { Users } from "@/models/index.js";
import { createDeleteAccountJob } from "@/queue/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { publishToUserStream, UserEvent } from "backend-rs";
import { doPostSuspend } from "@/services/suspend-user.js";
export async function deleteAccount(user: {
@ -19,5 +19,5 @@ export async function deleteAccount(user: {
});
// Terminate streaming
publishUserEvent(user.id, "terminate", {});
await publishToUserStream(user.id, UserEvent.Disconnect, {});
}

View file

@ -1,4 +1,3 @@
import { publishUserEvent } from "@/services/stream.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import renderAccept from "@/remote/activitypub/renderer/accept.js";
@ -22,6 +21,8 @@ import {
genIdAt,
isSilencedServer,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js";
@ -126,12 +127,12 @@ export async function insertFollowingDoc(
Users.pack(followee.id, follower, {
detail: true,
}).then(async (packed) => {
publishUserEvent(
await publishToUserStream(
follower.id,
"follow",
UserEvent.Follow,
packed as Packed<"UserDetailedNotMe">,
);
publishToMainStream(
await publishToMainStream(
follower.id,
Event.Follow,
packed as Packed<"UserDetailedNotMe">,

View file

@ -1,5 +1,9 @@
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import {
Event,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
@ -51,8 +55,8 @@ export default async function (
Users.pack(followee.id, follower, {
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
await publishToUserStream(follower.id, UserEvent.Unfollow, packed);
await publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),

View file

@ -2,8 +2,12 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import renderReject from "@/remote/activitypub/renderer/reject.js";
import { deliver, webhookDeliver } from "@/queue/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import {
Event,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import type { ILocalUser, IRemoteUser } from "@/models/entities/user.js";
import { Users, FollowRequests, Followings } from "@/models/index.js";
import { decrementFollowing } from "./delete.js";
@ -120,7 +124,7 @@ async function publishUnfollow(followee: Both, follower: Local) {
detail: true,
});
publishUserEvent(follower.id, "unfollow", packedFollowee);
publishToUserStream(follower.id, UserEvent.Unfollow, packedFollowee);
publishToMainStream(follower.id, Event.Unfollow, packedFollowee);
const webhooks = (await getActiveWebhooks()).filter(

View file

@ -11,8 +11,12 @@ import mfm from "mfm-js";
import { extractHashtags } from "@/misc/extract-hashtags.js";
import { normalizeForSearch } from "@/misc/normalize-for-search.js";
import { updateUsertags } from "@/services/update-hashtag.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import {
Event,
publishToMainStream,
publishToUserStream,
UserEvent,
} from "backend-rs";
import acceptAllFollowRequests from "@/services/following/requests/accept-all.js";
import { promiseEarlyReturn } from "@/prelude/promise.js";
@ -80,15 +84,15 @@ export async function updateUserProfileData(
includeSecrets: isSecure,
});
publishToMainStream(user.id, Event.Me, iObj);
publishUserEvent(
await publishToMainStream(user.id, Event.Me, iObj);
await publishToUserStream(
user.id,
"updateUserProfile",
UserEvent.UpdateProfile,
await UserProfiles.findOneByOrFail({ userId: user.id }),
);
if (user.isLocked && updates.isLocked === false) {
acceptAllFollowRequests(user);
await acceptAllFollowRequests(user);
}
await promiseEarlyReturn(

View file

@ -1,5 +1,5 @@
import { redisClient } from "@/db/redis.js";
import type { User } from "@/models/entities/user.js";
// import type { User } from "@/models/entities/user.js";
// import type { Note } from "@/models/entities/note.js";
// import type { UserList } from "@/models/entities/user-list.js";
// import type { UserGroup } from "@/models/entities/user-group.js";
@ -19,7 +19,7 @@ import type {
// MessagingIndexStreamTypes,
// MessagingStreamTypes,
// NoteStreamTypes,
UserStreamTypes,
// UserStreamTypes,
// NoteUpdatesStreamTypes,
} from "@/server/api/stream/types.js";
@ -53,17 +53,18 @@ class Publisher {
// this.publish("internal", type, typeof value === "undefined" ? null : value);
// };
public publishUserEvent = <K extends keyof UserStreamTypes>(
userId: User["id"],
type: K,
value?: UserStreamTypes[K],
): void => {
this.publish(
`user:${userId}`,
type,
typeof value === "undefined" ? null : value,
);
};
/* ported to backend-rs */
// public publishUserEvent = <K extends keyof UserStreamTypes>(
// userId: User["id"],
// type: K,
// value?: UserStreamTypes[K],
// ): void => {
// this.publish(
// `user:${userId}`,
// type,
// typeof value === "undefined" ? null : value,
// );
// };
/* ported to backend-rs */
// public publishBroadcastStream = <K extends keyof BroadcastTypes>(
@ -217,7 +218,7 @@ const publisher = new Publisher();
export default publisher;
// export const publishInternalEvent = publisher.publishInternalEvent;
export const publishUserEvent = publisher.publishUserEvent;
// export const publishUserEvent = publisher.publishUserEvent;
// export const publishBroadcastStream = publisher.publishBroadcastStream;
// export const publishMainStream = publisher.publishMainStream;
// export const publishDriveStream = publisher.publishDriveStream;