refactor (backend): port publishMainStream to backend-rs

This commit is contained in:
naskya 2024-07-27 20:16:13 +09:00
parent 08e009ebad
commit 6d62cd4276
No known key found for this signature in database
GPG key ID: 712D413B3A9FED5C
43 changed files with 280 additions and 137 deletions

View file

@ -371,6 +371,40 @@ export interface Emoji {
height: number | null
}
export declare enum Event {
Notification = 0,
NewNotification = 1,
Mention = 2,
NewMention = 3,
Chat = 4,
NewChat = 5,
NewDm = 6,
Reply = 7,
Renote = 8,
Follow = 9,
Followed = 10,
Unfollow = 11,
NewFollowRequest = 12,
Page = 13,
ReadAllNotifications = 14,
ReadAllMentions = 15,
ReadNotifications = 16,
ReadAllDms = 17,
ReadAllChats = 18,
ReadAntenna = 19,
ReadAllAntennaPosts = 20,
NewAntennaPost = 21,
ReadAllAnnouncements = 22,
ReadAllChannelPosts = 23,
NewChannelPost = 24,
DriveFile = 25,
UrlUploadFinished = 26,
Me = 27,
RegenerateMyToken = 28,
Signin = 29,
Registry = 30
}
export declare function extractHost(uri: string): string
export declare function fetchMeta(): Promise<Meta>
@ -1130,6 +1164,8 @@ export declare function publishToDriveFolderStream(userId: string, kind: DriveFo
export declare function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): Promise<void>
export declare function publishToMainStream(userId: string, kind: Event, object: any): Promise<void>
export declare function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): Promise<void>
export declare function publishToNotesStream(note: Note): Promise<void>

View file

@ -375,6 +375,7 @@ module.exports.decodeReaction = nativeBinding.decodeReaction
module.exports.DriveFileEvent = nativeBinding.DriveFileEvent
module.exports.DriveFileUsageHint = nativeBinding.DriveFileUsageHint
module.exports.DriveFolderEvent = nativeBinding.DriveFolderEvent
module.exports.Event = nativeBinding.Event
module.exports.extractHost = nativeBinding.extractHost
module.exports.fetchMeta = nativeBinding.fetchMeta
module.exports.fetchNodeinfo = nativeBinding.fetchNodeinfo
@ -424,6 +425,7 @@ module.exports.publishToChatStream = nativeBinding.publishToChatStream
module.exports.publishToDriveFileStream = nativeBinding.publishToDriveFileStream
module.exports.publishToDriveFolderStream = nativeBinding.publishToDriveFolderStream
module.exports.publishToGroupChatStream = nativeBinding.publishToGroupChatStream
module.exports.publishToMainStream = nativeBinding.publishToMainStream
module.exports.publishToModerationStream = nativeBinding.publishToModerationStream
module.exports.publishToNotesStream = nativeBinding.publishToNotesStream
module.exports.publishToNoteStream = nativeBinding.publishToNoteStream

View file

@ -5,6 +5,7 @@ pub mod chat_index;
pub mod custom_emoji;
pub mod drive;
pub mod group_chat;
pub mod main;
pub mod moderation;
pub mod note;
pub mod note_edit;

View file

@ -0,0 +1,87 @@
use crate::service::stream::{publish_to_stream, Error, Stream};
#[macros::export]
pub enum Event {
Notification,
NewNotification,
Mention,
NewMention,
Chat,
NewChat,
NewDm,
Reply,
Renote,
Follow,
Followed,
Unfollow,
NewFollowRequest,
Page,
ReadAllNotifications,
ReadAllMentions,
ReadNotifications,
ReadAllDms,
ReadAllChats,
ReadAntenna,
ReadAllAntennaPosts,
NewAntennaPost,
ReadAllAnnouncements,
ReadAllChannelPosts,
NewChannelPost,
DriveFile,
UrlUploadFinished,
Me,
RegenerateMyToken,
Signin,
Registry,
}
// We want to merge `kind` and `object` into a single enum
// https://github.com/napi-rs/napi-rs/issues/2036
#[macros::export(js_name = "publishToMainStream")]
pub async fn publish(
user_id: String,
kind: Event,
object: &serde_json::Value,
) -> Result<(), Error> {
let kind = match kind {
Event::Notification => "notification",
Event::Mention => "mention",
Event::Reply => "reply",
Event::Renote => "renote",
Event::Follow => "follow",
Event::Followed => "followed",
Event::Unfollow => "unfollow",
Event::Me => "meUpdated",
Event::Page => "pageEvent",
Event::UrlUploadFinished => "urlUploadFinished",
Event::ReadAllNotifications => "readAllNotifications",
Event::ReadNotifications => "readNotifications",
Event::NewNotification => "unreadNotification",
Event::NewMention => "unreadMention",
Event::ReadAllMentions => "readAllUnreadMentions",
Event::ReadAllDms => "readAllUnreadSpecifiedNotes",
Event::NewDm => "unreadSpecifiedNote",
Event::ReadAllChats => "readAllMessagingMessages",
Event::Chat => "messagingMessage",
Event::NewChat => "unreadMessagingMessage",
Event::ReadAllAntennaPosts => "readAllAntennas",
Event::NewAntennaPost => "unreadAntenna",
Event::ReadAllAnnouncements => "readAllAnnouncements",
Event::ReadAllChannelPosts => "readAllChannels",
Event::NewChannelPost => "unreadChannel",
Event::RegenerateMyToken => "myTokenRegenerated",
Event::Signin => "signin",
Event::Registry => "registryUpdated",
Event::DriveFile => "driveFileCreated",
Event::ReadAntenna => "readAntenna",
Event::NewFollowRequest => "receiveFollowRequest",
};
publish_to_stream(
&Stream::Main { user_id },
Some(kind),
Some(serde_json::to_string(&object)?),
)
.await
}

View file

@ -1,9 +1,10 @@
import { publishMainStream } from "@/services/stream.js";
import {
publishToChatStream,
publishToGroupChatStream,
publishToChatIndexStream,
sendPushNotification,
publishToMainStream,
Event,
} from "backend-rs";
import type { User, IRemoteUser } from "@/models/entities/user.js";
import type { MessagingMessage } from "@/models/entities/messaging-message.js";
@ -61,7 +62,7 @@ export async function readUserMessagingMessage(
if (!(await Users.getHasUnreadMessagingMessage(userId))) {
// 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行
publishMainStream(userId, "readAllMessagingMessages");
await publishToMainStream(userId, Event.ReadAllChats, {});
await sendPushNotification(userId, "readAllChats", {});
} else {
// そのユーザーとのメッセージで未読がなければイベント発行
@ -135,7 +136,7 @@ export async function readGroupMessagingMessage(
if (!(await Users.getHasUnreadMessagingMessage(userId))) {
// 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行
publishMainStream(userId, "readAllMessagingMessages");
await publishToMainStream(userId, Event.ReadAllChats, {});
await sendPushNotification(userId, "readAllChats", {});
} else {
// そのグループにおいて未読がなければイベント発行

View file

@ -1,6 +1,5 @@
import { In } from "typeorm";
import { publishMainStream } from "@/services/stream.js";
import { sendPushNotification } from "backend-rs";
import { Event, publishToMainStream, sendPushNotification } from "backend-rs";
import type { User } from "@/models/entities/user.js";
import type { Notification } from "@/models/entities/notification.js";
import { Notifications, Users } from "@/models/index.js";
@ -46,7 +45,7 @@ export async function readNotificationByQuery(
}
function postReadAllNotifications(userId: User["id"]) {
publishMainStream(userId, "readAllNotifications");
publishToMainStream(userId, Event.ReadAllNotifications, {});
return sendPushNotification(userId, "readAllNotifications", {});
}
@ -54,7 +53,7 @@ function postReadNotifications(
userId: User["id"],
notificationIds: Notification["id"][],
) {
publishMainStream(userId, "readNotifications", notificationIds);
publishToMainStream(userId, Event.ReadNotifications, notificationIds);
return sendPushNotification(userId, "readNotifications", {
notificationIds,
});

View file

@ -3,8 +3,7 @@ import type Koa from "koa";
import { config } from "@/config.js";
import type { ILocalUser } from "@/models/entities/user.js";
import { Signins } from "@/models/index.js";
import { genIdAt } from "backend-rs";
import { publishMainStream } from "@/services/stream.js";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
export default function (ctx: Koa.Context, user: ILocalUser, redirect = false) {
if (redirect) {
@ -40,6 +39,6 @@ export default function (ctx: Koa.Context, user: ILocalUser, redirect = false) {
}).then((x) => Signins.findOneByOrFail(x.identifiers[0]));
// Publish signin event
publishMainStream(user.id, "signin", await Signins.pack(record));
publishToMainStream(user.id, Event.Signin, await Signins.pack(record));
})();
}

View file

@ -1,5 +1,5 @@
import { Users, UserProfiles } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import define from "@/server/api/define.js";
export const meta = {
@ -36,5 +36,5 @@ export default define(meta, paramDef, async (ps) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -1,5 +1,5 @@
import { Users, UserProfiles, UserSecurityKeys } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import define from "@/server/api/define.js";
export const meta = {
@ -38,5 +38,5 @@ export default define(meta, paramDef, async (ps) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -1,7 +1,7 @@
import { uploadFromUrl } from "@/services/drive/upload-from-url.js";
import define from "@/server/api/define.js";
import { DriveFiles } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { HOUR } from "@/const.js";
export const meta = {
@ -50,7 +50,7 @@ export default define(meta, paramDef, async (ps, user) => {
comment: ps.comment,
}).then((file) => {
DriveFiles.pack(file, { self: true }).then((packedFile) => {
publishMainStream(user.id, "urlUploadFinished", {
publishToMainStream(user.id, Event.UrlUploadFinished, {
marker: ps.marker,
file: packedFile,
});

View file

@ -1,4 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import * as OTPAuth from "otpauth";
import define from "@/server/api/define.js";
import { Users, UserProfiles } from "@/models/index.js";
@ -47,5 +47,5 @@ export default define(meta, paramDef, async (ps, user) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -8,8 +8,7 @@ import {
} from "@/models/index.js";
import { config } from "@/config.js";
import { procedures, hash } from "@/server/api/2fa.js";
import { publishMainStream } from "@/services/stream.js";
import { verifyPassword } from "backend-rs";
import { Event, publishToMainStream, verifyPassword } from "backend-rs";
const rpIdHashReal = hash(Buffer.from(config.hostname, "utf-8"));
@ -132,9 +131,9 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Publish meUpdated event
publishMainStream(
publishToMainStream(
user.id,
"meUpdated",
Event.Me,
await Users.pack(user.id, user, {
detail: true,
includeSecrets: true,

View file

@ -1,6 +1,6 @@
import define from "@/server/api/define.js";
import { Users, UserProfiles, UserSecurityKeys } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { ApiError } from "@/server/api/error.js";
export const meta = {
@ -57,5 +57,5 @@ export default define(meta, paramDef, async (ps, user) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -1,7 +1,6 @@
import { verifyPassword } from "backend-rs";
import { Event, publishToMainStream, verifyPassword } from "backend-rs";
import define from "@/server/api/define.js";
import { UserProfiles, UserSecurityKeys, Users } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
export const meta = {
requireCredential: true,
@ -54,9 +53,9 @@ export default define(meta, paramDef, async (ps, user) => {
}
// Publish meUpdated event
publishMainStream(
publishToMainStream(
user.id,
"meUpdated",
Event.Me,
await Users.pack(user.id, user, {
detail: true,
includeSecrets: true,

View file

@ -1,7 +1,6 @@
import { publishMainStream } from "@/services/stream.js";
import define from "@/server/api/define.js";
import { Users, UserProfiles } from "@/models/index.js";
import { verifyPassword } from "backend-rs";
import { Event, publishToMainStream, verifyPassword } from "backend-rs";
export const meta = {
requireCredential: true,
@ -38,5 +37,5 @@ export default define(meta, paramDef, async (ps, user) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -1,7 +1,7 @@
import { publishMainStream } from "@/services/stream.js";
import define from "@/server/api/define.js";
import { Users, UserSecurityKeys } from "@/models/index.js";
import { ApiError } from "@/server/api/error.js";
import { Event, publishToMainStream } from "backend-rs";
export const meta = {
requireCredential: true,
@ -54,5 +54,5 @@ export default define(meta, paramDef, async (ps, user) => {
includeSecrets: true,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
});

View file

@ -3,8 +3,7 @@ import { Users } from "@/models/index.js";
import { resolveUser } from "@/remote/resolve-user.js";
import acceptAllFollowRequests from "@/services/following/requests/accept-all.js";
import { publishToFollowers } from "@/services/i/update.js";
import { publishMainStream } from "@/services/stream.js";
import { stringToAcct } from "backend-rs";
import { Event, publishToMainStream, stringToAcct } from "backend-rs";
import { DAY } from "@/const.js";
import { apiLogger } from "@/server/api/logger.js";
import define from "@/server/api/define.js";
@ -97,7 +96,7 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Publish meUpdated event
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
if (user.isLocked === false) {
acceptAllFollowRequests(user);

View file

@ -1,6 +1,6 @@
import type { User } from "@/models/entities/user.js";
import { resolveUser } from "@/remote/resolve-user.js";
import { stringToAcct } from "backend-rs";
import { Event, publishToMainStream, stringToAcct } from "backend-rs";
import { DAY } from "@/const.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -12,7 +12,6 @@ import create from "@/services/following/create.js";
import { getUser } from "@/server/api/common/getters.js";
import { Followings, Users } from "@/models/index.js";
import { config } from "@/config.js";
import { publishMainStream } from "@/services/stream.js";
import { inspect } from "node:util";
export const meta = {
@ -134,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => {
dm.execute();
// Publish meUpdated event
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
const followings = await Followings.findBy({
followeeId: user.id,

View file

@ -1,4 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import define from "@/server/api/define.js";
import { MessagingMessages, UserGroupJoinings } from "@/models/index.js";
@ -16,7 +16,7 @@ export const paramDef = {
required: [],
} as const;
export default define(meta, paramDef, async (ps, user) => {
export default define(meta, paramDef, async (_ps, user) => {
// Update documents
await MessagingMessages.update(
{
@ -44,5 +44,5 @@ export default define(meta, paramDef, async (ps, user) => {
),
);
publishMainStream(user.id, "readAllMessagingMessages");
publishToMainStream(user.id, Event.ReadAllChats, {});
});

View file

@ -1,4 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import define from "@/server/api/define.js";
import { NoteUnreads } from "@/models/index.js";
@ -23,6 +23,6 @@ export default define(meta, paramDef, async (ps, user) => {
});
// 全て既読になったイベントを発行
publishMainStream(user.id, "readAllUnreadMentions");
publishMainStream(user.id, "readAllUnreadSpecifiedNotes");
publishToMainStream(user.id, Event.ReadAllMentions, {});
publishToMainStream(user.id, Event.ReadAllDms, {});
});

View file

@ -1,8 +1,7 @@
import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
import { genIdAt } from "backend-rs";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
import { AnnouncementReads, Announcements, Users } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
export const meta = {
tags: ["account"],
@ -59,6 +58,6 @@ export default define(meta, paramDef, async (ps, user) => {
});
if (!(await Users.getHasUnreadAnnouncement(user.id))) {
publishMainStream(user.id, "readAllAnnouncements");
publishToMainStream(user.id, Event.ReadAllAnnouncements, {});
}
});

View file

@ -1,11 +1,12 @@
import {
publishInternalEvent,
publishMainStream,
publishUserEvent,
} from "@/services/stream.js";
import { publishInternalEvent, publishUserEvent } from "@/services/stream.js";
import define from "@/server/api/define.js";
import { Users, UserProfiles } from "@/models/index.js";
import { generateUserToken, verifyPassword } from "backend-rs";
import {
Event,
generateUserToken,
publishToMainStream,
verifyPassword,
} from "backend-rs";
export const meta = {
requireCredential: true,
@ -46,7 +47,7 @@ export default define(meta, paramDef, async (ps, user) => {
oldToken,
newToken,
});
publishMainStream(user.id, "myTokenRegenerated");
publishToMainStream(user.id, Event.RegenerateMyToken, {});
// Terminate streaming
setTimeout(() => {

View file

@ -1,7 +1,6 @@
import { publishMainStream } from "@/services/stream.js";
import define from "@/server/api/define.js";
import { RegistryItems } from "@/models/index.js";
import { genIdAt } from "backend-rs";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
export const meta = {
requireCredential: true,
@ -55,7 +54,7 @@ export default define(meta, paramDef, async (ps, user) => {
}
// TODO: サードパーティアプリが傍受出来てしまうのでどうにかする
publishMainStream(user.id, "registryUpdated", {
publishToMainStream(user.id, Event.Registry, {
scope: ps.scope,
key: ps.key,
value: ps.value,

View file

@ -1,4 +1,3 @@
import { publishMainStream } from "@/services/stream.js";
import define from "@/server/api/define.js";
import rndstr from "rndstr";
import { config } from "@/config.js";
@ -6,7 +5,7 @@ import { Users, UserProfiles } from "@/models/index.js";
import { sendEmail } from "@/services/send-email.js";
import { ApiError } from "@/server/api/error.js";
import { validateEmailForAccount } from "@/services/validate-email-for-account.js";
import { verifyPassword } from "backend-rs";
import { Event, publishToMainStream, verifyPassword } from "backend-rs";
import { HOUR } from "@/const.js";
export const meta = {
@ -72,7 +71,7 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Publish meUpdated event
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
if (ps.email != null) {
const code = rndstr("a-z0-9", 16);

View file

@ -1,5 +1,6 @@
import * as mfm from "mfm-js";
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import acceptAllFollowRequests from "@/services/following/requests/accept-all.js";
import { publishToFollowers } from "@/services/i/update.js";
import { extractCustomEmojisFromMfm } from "@/misc/extract-custom-emojis-from-mfm.js";
@ -345,7 +346,7 @@ export default define(meta, paramDef, async (ps, _user, token) => {
});
// Publish meUpdated event
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
publishUserEvent(
user.id,
"updateUserProfile",

View file

@ -1,5 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { sendPushNotification } from "backend-rs";
import { Event, publishToMainStream, sendPushNotification } from "backend-rs";
import { Notifications } from "@/models/index.js";
import define from "@/server/api/define.js";
@ -30,6 +29,6 @@ export default define(meta, paramDef, async (_, user) => {
);
// 全ての通知を読みましたよというイベントを発行
publishMainStream(user.id, "readAllNotifications");
await publishToMainStream(user.id, Event.ReadAllNotifications, {});
await sendPushNotification(user.id, "readAllNotifications", {});
});

View file

@ -1,4 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { Users, Pages } from "@/models/index.js";
import define from "@/server/api/define.js";
import { ApiError } from "@/server/api/error.js";
@ -32,7 +32,7 @@ export default define(meta, paramDef, async (ps, user) => {
throw new ApiError(meta.errors.noSuchPage);
}
publishMainStream(page.userId, "pageEvent", {
publishToMainStream(page.userId, Event.Page, {
pageId: ps.pageId,
event: ps.event,
var: ps.var,

View file

@ -1,6 +1,6 @@
import type Koa from "koa";
import { Users, UserProfiles } from "@/models/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
export default async (ctx: Koa.Context) => {
const body = ctx.request.body;
@ -20,9 +20,9 @@ export default async (ctx: Koa.Context) => {
},
);
publishMainStream(
publishToMainStream(
profile.userId,
"meUpdated",
Event.Me,
await Users.pack(
profile.userId,
{ id: profile.userId },

View file

@ -1,4 +1,4 @@
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
@ -15,7 +15,7 @@ import {
UserListJoinings,
UserLists,
} from "@/models/index.js";
import { genIdAt } from "backend-rs";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
import { webhookDeliver } from "@/queue/index.js";
@ -65,7 +65,7 @@ async function cancelRequest(follower: User, followee: User) {
if (Users.isLocalUser(followee)) {
Users.pack(followee, followee, {
detail: true,
}).then((packed) => publishMainStream(followee.id, "meUpdated", packed));
}).then((packed) => publishToMainStream(followee.id, Event.Me, packed));
}
if (Users.isLocalUser(follower)) {
@ -73,7 +73,7 @@ async function cancelRequest(follower: User, followee: User) {
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishMainStream(follower.id, "unfollow", packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),
@ -128,7 +128,7 @@ async function unFollow(follower: User, followee: User) {
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishMainStream(follower.id, "unfollow", packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),

View file

@ -1,4 +1,3 @@
import { publishMainStream } from "@/services/stream.js";
import {
Notifications,
Mutings,
@ -7,7 +6,13 @@ import {
Users,
Followings,
} from "@/models/index.js";
import { genIdAt, isSilencedServer, sendPushNotification } from "backend-rs";
import {
Event,
genIdAt,
isSilencedServer,
publishToMainStream,
sendPushNotification,
} from "backend-rs";
import type { User } from "@/models/entities/user.js";
import type { Notification } from "@/models/entities/notification.js";
import { sendEmailNotification } from "./send-email-notification.js";
@ -76,7 +81,7 @@ export async function createNotification(
const packed = await Notifications.pack(notification, {});
// Publish notification event
publishMainStream(notifieeId, "notification", packed);
publishToMainStream(notifieeId, Event.Notification, packed);
// 2秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する
setTimeout(async () => {
@ -111,7 +116,7 @@ export async function createNotification(
}
//#endregion
publishMainStream(notifieeId, "unreadNotification", packed);
publishToMainStream(notifieeId, Event.NewNotification, packed);
if (type === "follow")
sendEmailNotification.follow(

View file

@ -5,8 +5,13 @@ import { v4 as uuid } from "uuid";
import type S3 from "aws-sdk/clients/s3.js"; // TODO: migrate to SDK v3
import sharp from "sharp";
import { IsNull } from "typeorm";
import { publishMainStream } from "@/services/stream.js";
import { fetchMeta, genId, publishToDriveFileStream } from "backend-rs";
import {
Event,
fetchMeta,
genId,
publishToDriveFileStream,
publishToMainStream,
} from "backend-rs";
import { FILE_TYPE_BROWSERSAFE } from "@/const.js";
import { contentDisposition } from "@/misc/content-disposition.js";
import { getFileInfo } from "@/misc/get-file-info.js";
@ -667,7 +672,7 @@ export async function addFile({
if (user != null) {
DriveFiles.pack(file, { self: true }).then((packedFile) => {
// Publish driveFileCreated event
publishMainStream(user.id, "driveFileCreated", packedFile);
publishToMainStream(user.id, Event.DriveFile, packedFile);
publishToDriveFileStream(user.id, "create", toRustObject(file));
});
}

View file

@ -1,4 +1,4 @@
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import renderAccept from "@/remote/activitypub/renderer/accept.js";
@ -17,7 +17,12 @@ import {
Instances,
UserProfiles,
} from "@/models/index.js";
import { genIdAt, isSilencedServer } from "backend-rs";
import {
Event,
genIdAt,
isSilencedServer,
publishToMainStream,
} from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js";
import type { Packed } from "@/misc/schema.js";
@ -126,9 +131,9 @@ export async function insertFollowingDoc(
"follow",
packed as Packed<"UserDetailedNotMe">,
);
publishMainStream(
publishToMainStream(
follower.id,
"follow",
Event.Follow,
packed as Packed<"UserDetailedNotMe">,
);
@ -146,7 +151,7 @@ export async function insertFollowingDoc(
// Publish followed event
if (Users.isLocalUser(followee)) {
Users.pack(follower.id, followee).then(async (packed) => {
publishMainStream(followee.id, "followed", packed);
publishToMainStream(followee.id, Event.Followed, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === followee.id && x.on.includes("followed"),

View file

@ -1,4 +1,5 @@
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
@ -51,7 +52,7 @@ export default async function (
detail: true,
}).then(async (packed) => {
publishUserEvent(follower.id, "unfollow", packed);
publishMainStream(follower.id, "unfollow", packed);
publishToMainStream(follower.id, Event.Unfollow, packed);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),

View file

@ -2,7 +2,8 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import renderReject from "@/remote/activitypub/renderer/reject.js";
import { deliver, webhookDeliver } from "@/queue/index.js";
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import type { ILocalUser, IRemoteUser } from "@/models/entities/user.js";
import { Users, FollowRequests, Followings } from "@/models/index.js";
import { decrementFollowing } from "./delete.js";
@ -120,7 +121,7 @@ async function publishUnfollow(followee: Both, follower: Local) {
});
publishUserEvent(follower.id, "unfollow", packedFollowee);
publishMainStream(follower.id, "unfollow", packedFollowee);
publishToMainStream(follower.id, Event.Unfollow, packedFollowee);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === follower.id && x.on.includes("unfollow"),

View file

@ -2,7 +2,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import renderAccept from "@/remote/activitypub/renderer/accept.js";
import { deliver } from "@/queue/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { insertFollowingDoc } from "../create.js";
import type { User, CacheableUser } from "@/models/entities/user.js";
import { FollowRequests, Users } from "@/models/index.js";
@ -44,5 +44,5 @@ export default async function (
Users.pack(followee.id, followee, {
detail: true,
}).then((packed) => publishMainStream(followee.id, "meUpdated", packed));
}).then((packed) => publishToMainStream(followee.id, Event.Me, packed));
}

View file

@ -2,7 +2,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
import { deliver } from "@/queue/index.js";
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import { IdentifiableError } from "@/misc/identifiable-error.js";
import type { User } from "@/models/entities/user.js";
import { Users, FollowRequests } from "@/models/index.js";
@ -46,5 +46,5 @@ export default async function (
Users.pack(followee.id, followee, {
detail: true,
}).then((packed) => publishMainStream(followee.id, "meUpdated", packed));
}).then((packed) => publishToMainStream(followee.id, Event.Me, packed));
}

View file

@ -1,10 +1,9 @@
import { publishMainStream } from "@/services/stream.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderFollow from "@/remote/activitypub/renderer/follow.js";
import { deliver } from "@/queue/index.js";
import type { User } from "@/models/entities/user.js";
import { Blockings, FollowRequests, Users } from "@/models/index.js";
import { genIdAt } from "backend-rs";
import { Event, genIdAt, publishToMainStream } from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
import { config } from "@/config.js";
@ -67,12 +66,12 @@ export default async function (
// Publish receiveRequest event
if (Users.isLocalUser(followee)) {
Users.pack(follower.id, followee).then((packed) =>
publishMainStream(followee.id, "receiveFollowRequest", packed),
publishToMainStream(followee.id, Event.NewFollowRequest, packed),
);
Users.pack(followee.id, followee, {
detail: true,
}).then((packed) => publishMainStream(followee.id, "meUpdated", packed));
}).then((packed) => publishToMainStream(followee.id, Event.Me, packed));
// 通知を作成
createNotification(followee.id, "receiveFollowRequest", {

View file

@ -11,7 +11,8 @@ import mfm from "mfm-js";
import { extractHashtags } from "@/misc/extract-hashtags.js";
import { normalizeForSearch } from "@/misc/normalize-for-search.js";
import { updateUsertags } from "@/services/update-hashtag.js";
import { publishMainStream, publishUserEvent } from "@/services/stream.js";
import { publishUserEvent } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import acceptAllFollowRequests from "@/services/following/requests/accept-all.js";
import { promiseEarlyReturn } from "@/prelude/promise.js";
@ -79,7 +80,7 @@ export async function updateUserProfileData(
includeSecrets: isSecure,
});
publishMainStream(user.id, "meUpdated", iObj);
publishToMainStream(user.id, Event.Me, iObj);
publishUserEvent(
user.id,
"updateUserProfile",

View file

@ -14,9 +14,10 @@ import {
publishToGroupChatStream,
publishToChatIndexStream,
toPuny,
publishToMainStream,
Event,
} from "backend-rs";
import type { MessagingMessage } from "@/models/entities/messaging-message.js";
import { publishMainStream } from "@/services/stream.js";
import { Not } from "typeorm";
import type { Note } from "@/models/entities/note.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
@ -62,8 +63,8 @@ export async function createMessage(
messageObj,
),
publishToChatIndexStream(message.userId, "message", messageObj),
publishToMainStream(message.userId, Event.Chat, messageObj),
]);
publishMainStream(message.userId, "messagingMessage", messageObj);
}
if (Users.isLocalUser(recipientUser)) {
@ -76,8 +77,8 @@ export async function createMessage(
messageObj,
),
publishToChatIndexStream(recipientUser.id, "message", messageObj),
publishToMainStream(recipientUser.id, Event.Chat, messageObj),
]);
publishMainStream(recipientUser.id, "messagingMessage", messageObj);
}
} else if (recipientGroup != null) {
// group's stream
@ -88,8 +89,10 @@ export async function createMessage(
userGroupId: recipientGroup.id,
});
for await (const joining of joinings) {
await publishToChatIndexStream(joining.userId, "message", messageObj);
publishMainStream(joining.userId, "messagingMessage", messageObj);
await Promise.all([
publishToChatIndexStream(joining.userId, "message", messageObj),
publishToMainStream(joining.userId, Event.Chat, messageObj),
]);
}
}
@ -108,8 +111,10 @@ export async function createMessage(
if (mute.map((m) => m.muteeId).includes(user.id)) return;
//#endregion
publishMainStream(recipientUser.id, "unreadMessagingMessage", messageObj);
await sendPushNotification(recipientUser.id, "chat", messageObj);
await Promise.all([
publishToMainStream(recipientUser.id, Event.NewChat, messageObj),
sendPushNotification(recipientUser.id, "chat", messageObj),
]);
} else if (recipientGroup) {
const joinings = await UserGroupJoinings.findBy({
userGroupId: recipientGroup.id,
@ -117,8 +122,10 @@ export async function createMessage(
});
for await (const joining of joinings) {
if (freshMessage.reads.includes(joining.userId)) return; // 既読
publishMainStream(joining.userId, "unreadMessagingMessage", messageObj);
await sendPushNotification(joining.userId, "chat", messageObj);
await Promise.all([
publishToMainStream(joining.userId, Event.NewChat, messageObj),
sendPushNotification(joining.userId, "chat", messageObj),
]);
}
}
}, 2000);

View file

@ -1,5 +1,4 @@
import * as mfm from "mfm-js";
import { publishMainStream } from "@/services/stream.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
import renderCreate from "@/remote/activitypub/renderer/create.js";
@ -48,6 +47,8 @@ import {
publishToNotesStream,
publishToNoteStream,
NoteEvent,
publishToMainStream,
Event,
} from "backend-rs";
import { countSameRenotes } from "@/misc/count-same-renotes.js";
import { deliverToRelays, getCachedRelays } from "../relay.js";
@ -508,7 +509,7 @@ export default async (
const packedReply = await Notes.pack(note, {
id: data.reply.userId,
});
publishMainStream(data.reply.userId, "reply", packedReply);
publishToMainStream(data.reply.userId, Event.Reply, packedReply);
const webhooks = (await getActiveWebhooks()).filter(
(x) =>
@ -548,7 +549,7 @@ export default async (
const packedRenote = await Notes.pack(note, {
id: data.renote.userId,
});
publishMainStream(data.renote.userId, "renote", packedRenote);
publishToMainStream(data.renote.userId, Event.Renote, packedRenote);
const renote = data.renote;
const webhooks = (await getActiveWebhooks()).filter(
@ -874,7 +875,7 @@ async function createMentionedEvents(
detail: true,
});
publishMainStream(u.id, "mention", detailPackedNote);
publishToMainStream(u.id, Event.Mention, detailPackedNote);
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === u.id && x.on.includes("mention"),

View file

@ -1,4 +1,4 @@
import { publishMainStream } from "@/services/stream.js";
import { Event, publishToMainStream } from "backend-rs";
import type { Note } from "@/models/entities/note.js";
import type { User } from "@/models/entities/user.js";
import { NoteUnreads, Followings, ChannelFollowings } from "@/models/index.js";
@ -84,7 +84,7 @@ export default async function (
}).then((mentionsCount) => {
if (mentionsCount === 0) {
// 全て既読になったイベントを発行
publishMainStream(userId, "readAllUnreadMentions");
publishToMainStream(userId, Event.ReadAllMentions, {});
}
});
@ -94,7 +94,7 @@ export default async function (
}).then((specifiedCount) => {
if (specifiedCount === 0) {
// 全て既読になったイベントを発行
publishMainStream(userId, "readAllUnreadSpecifiedNotes");
publishToMainStream(userId, Event.ReadAllDms, {});
}
});
@ -104,7 +104,7 @@ export default async function (
}).then((channelNoteCount) => {
if (channelNoteCount === 0) {
// 全て既読になったイベントを発行
publishMainStream(userId, "readAllChannels");
publishToMainStream(userId, Event.ReadAllChannelPosts, {});
}
});

View file

@ -1,8 +1,7 @@
import type { Note } from "@/models/entities/note.js";
import { publishMainStream } from "@/services/stream.js";
import type { User } from "@/models/entities/user.js";
import { Mutings, NoteThreadMutings, NoteUnreads } from "@/models/index.js";
import { genId } from "backend-rs";
import { Event, genId, publishToMainStream } from "backend-rs";
export async function insertNoteUnread(
userId: User["id"],
@ -47,13 +46,13 @@ export async function insertNoteUnread(
if (!exists) return;
if (params.isMentioned) {
publishMainStream(userId, "unreadMention", note.id);
publishToMainStream(userId, Event.NewMention, note.id);
}
if (params.isSpecified) {
publishMainStream(userId, "unreadSpecifiedNote", note.id);
publishToMainStream(userId, Event.NewDm, note.id);
}
if (note.channelId) {
publishMainStream(userId, "unreadChannel", note.id);
publishToMainStream(userId, Event.NewChannelPost, note.id);
}
}, 2000);
}

View file

@ -15,7 +15,7 @@ import type {
// DriveStreamTypes,
// GroupMessagingStreamTypes,
InternalStreamTypes,
MainStreamTypes,
// MainStreamTypes,
// MessagingIndexStreamTypes,
// MessagingStreamTypes,
// NoteStreamTypes,
@ -76,17 +76,18 @@ class Publisher {
// );
// };
public publishMainStream = <K extends keyof MainStreamTypes>(
userId: User["id"],
type: K,
value?: MainStreamTypes[K],
): void => {
this.publish(
`mainStream:${userId}`,
type,
typeof value === "undefined" ? null : value,
);
};
/* ported to backend-rs */
// public publishMainStream = <K extends keyof MainStreamTypes>(
// userId: User["id"],
// type: K,
// value?: MainStreamTypes[K],
// ): void => {
// this.publish(
// `mainStream:${userId}`,
// type,
// typeof value === "undefined" ? null : value,
// );
// };
/* ported to backend-rs */
// public publishDriveStream = <K extends keyof DriveStreamTypes>(
@ -217,7 +218,7 @@ export default publisher;
export const publishInternalEvent = publisher.publishInternalEvent;
export const publishUserEvent = publisher.publishUserEvent;
// export const publishBroadcastStream = publisher.publishBroadcastStream;
export const publishMainStream = publisher.publishMainStream;
// export const publishMainStream = publisher.publishMainStream;
// export const publishDriveStream = publisher.publishDriveStream;
// export const publishNoteStream = publisher.publishNoteStream;
// export const publishNotesStream = publisher.publishNotesStream;