refactor (backend): port publishNoteStream to backend-rs

This commit is contained in:
naskya 2024-07-27 17:41:50 +09:00
parent 7186af04e0
commit 5a74760252
No known key found for this signature in database
GPG key ID: 712D413B3A9FED5C
16 changed files with 109 additions and 44 deletions

View file

@ -840,6 +840,15 @@ export interface NoteEdit {
emojis: Array<string>
}
export declare enum NoteEvent {
Delete = 0,
React = 1,
Unreact = 2,
Reply = 3,
Update = 4,
Vote = 5
}
export interface NoteFavorite {
id: string
createdAt: DateTimeWithTimeZone
@ -1125,6 +1134,8 @@ export declare function publishToModerationStream(moderatorId: string, report: A
export declare function publishToNotesStream(note: Note): Promise<void>
export declare function publishToNoteStream(noteId: string, kind: NoteEvent, object: any): Promise<void>
export declare function publishToNoteUpdatesStream(note: Note): Promise<void>
export interface PugArgs {

View file

@ -409,6 +409,7 @@ module.exports.metaToPugArgs = nativeBinding.metaToPugArgs
module.exports.MutedNoteReason = nativeBinding.MutedNoteReason
module.exports.nodeinfo_2_0 = nativeBinding.nodeinfo_2_0
module.exports.nodeinfo_2_1 = nativeBinding.nodeinfo_2_1
module.exports.NoteEvent = nativeBinding.NoteEvent
module.exports.NoteVisibility = nativeBinding.NoteVisibility
module.exports.NotificationType = nativeBinding.NotificationType
module.exports.nyaify = nativeBinding.nyaify
@ -425,6 +426,7 @@ module.exports.publishToDriveFolderStream = nativeBinding.publishToDriveFolderSt
module.exports.publishToGroupChatStream = nativeBinding.publishToGroupChatStream
module.exports.publishToModerationStream = nativeBinding.publishToModerationStream
module.exports.publishToNotesStream = nativeBinding.publishToNotesStream
module.exports.publishToNoteStream = nativeBinding.publishToNoteStream
module.exports.publishToNoteUpdatesStream = nativeBinding.publishToNoteUpdatesStream
module.exports.PushNotificationKind = nativeBinding.PushNotificationKind
module.exports.PushSubscriptionType = nativeBinding.PushSubscriptionType

View file

@ -6,6 +6,7 @@ pub mod custom_emoji;
pub mod drive;
pub mod group_chat;
pub mod moderation;
pub mod note;
pub mod note_edit;
pub mod notes;

View file

@ -0,0 +1,43 @@
use crate::service::stream::{publish_to_stream, Error, Stream};
use serde_json::json;
#[macros::export]
pub enum NoteEvent {
Delete,
React,
Unreact,
Reply,
Update,
Vote,
}
// We want to merge `kind` and `object` into a single enum
// https://github.com/napi-rs/napi-rs/issues/2036
#[macros::export(js_name = "publishToNoteStream")]
pub async fn publish(
note_id: String,
kind: NoteEvent,
object: &serde_json::Value,
) -> Result<(), Error> {
let kind = match kind {
NoteEvent::Delete => "deleted",
NoteEvent::React => "reacted",
NoteEvent::Unreact => "unreacted",
NoteEvent::Reply => "replied",
NoteEvent::Update => "updated",
NoteEvent::Vote => "pollVoted",
};
let value = json!({
"id": note_id.clone(),
"body": object,
});
publish_to_stream(
&Stream::Note { note_id },
Some(kind),
Some(serde_json::to_string(&value)?),
)
.await
}

View file

@ -15,11 +15,13 @@ import { apLogger } from "../logger.js";
import type { DriveFile } from "@/models/entities/drive-file.js";
import {
type ImageSize,
NoteEvent,
extractHost,
genId,
getImageSizeFromUrl,
isBlockedServer,
isSameOrigin,
publishToNoteStream,
toPuny,
} from "backend-rs";
import {
@ -47,7 +49,6 @@ import { parseAudience } from "../audience.js";
import { extractApMentions } from "./mention.js";
import DbResolver from "../db-resolver.js";
import { StatusError } from "@/misc/fetch.js";
import { publishNoteStream } from "@/services/stream.js";
import { extractHashtags } from "@/misc/extract-hashtags.js";
import { UserProfiles } from "@/models/index.js";
import { In } from "typeorm";
@ -795,7 +796,7 @@ export async function updateNote(value: string | IObject, resolver?: Resolver) {
if (publishing) {
// Publish update event for the updated note details
publishNoteStream(note.id, "updated", {
publishToNoteStream(note.id, NoteEvent.Update, {
updatedAt: update.updatedAt,
});
}

View file

@ -18,7 +18,7 @@ import { config } from "@/config.js";
import { noteVisibilities } from "@/types.js";
import { ApiError } from "@/server/api/error.js";
import define from "@/server/api/define.js";
import { genId } from "backend-rs";
import { genId, NoteEvent, publishToNoteStream } from "backend-rs";
import { HOUR } from "@/const.js";
import { getNote } from "@/server/api/common/getters.js";
import { Poll } from "@/models/entities/poll.js";
@ -27,7 +27,6 @@ import { concat } from "@/prelude/array.js";
import { extractHashtags } from "@/misc/extract-hashtags.js";
import { extractCustomEmojisFromMfm } from "@/misc/extract-custom-emojis-from-mfm.js";
import { extractMentionedUsers } from "@/services/note/create.js";
import { publishNoteStream } from "@/services/stream.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
@ -634,7 +633,7 @@ export default define(meta, paramDef, async (ps, user) => {
if (publishing && user.isIndexable) {
// Publish update event for the updated note details
publishNoteStream(note.id, "updated", {
publishToNoteStream(note.id, NoteEvent.Update, {
updatedAt: update.updatedAt,
});

View file

@ -4,7 +4,7 @@ import define from "@/server/api/define.js";
import { getNote } from "@/server/api/common/getters.js";
import { ApiError } from "@/server/api/error.js";
import { SECOND, HOUR } from "@/const.js";
import { publishNoteStream } from "@/services/stream.js";
import { NoteEvent, publishToNoteStream } from "backend-rs";
export const meta = {
tags: ["notes"],
@ -61,7 +61,7 @@ export default define(meta, paramDef, async (ps, user) => {
// Publish update event for the updated note details
// TODO: Send "deleted" to other users?
publishNoteStream(note.id, "updated", {
publishToNoteStream(note.id, NoteEvent.Update, {
updatedAt: new Date(),
});
});

View file

@ -1,5 +1,4 @@
import { Not } from "typeorm";
import { publishNoteStream } from "@/services/stream.js";
import { createNotification } from "@/services/create-notification.js";
import { deliver } from "@/queue/index.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -12,7 +11,7 @@ import {
Blockings,
} from "@/models/index.js";
import type { IRemoteUser } from "@/models/entities/user.js";
import { genId } from "backend-rs";
import { genId, NoteEvent, publishToNoteStream } from "backend-rs";
import { getNote } from "@/server/api/common/getters.js";
import { ApiError } from "@/server/api/error.js";
import define from "@/server/api/define.js";
@ -139,7 +138,7 @@ export default define(meta, paramDef, async (ps, user) => {
`UPDATE poll SET votes[${index}] = votes[${index}] + 1 WHERE "noteId" = '${poll.noteId}'`,
);
publishNoteStream(note.id, "pollVoted", {
publishToNoteStream(note.id, NoteEvent.Vote, {
choice: ps.choice,
userId: user.id,
});

View file

@ -10,8 +10,7 @@ import {
PollVotes,
Users,
} from "@/models/index.js";
import { genId } from "backend-rs";
import { publishNoteStream } from "@/services/stream.js";
import { genId, NoteEvent, publishToNoteStream } from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
import { deliver } from "@/queue/index.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -108,7 +107,7 @@ export class PollHelpers {
`UPDATE poll SET votes[${index}] = votes[${index}] + 1 WHERE "noteId" = '${poll.noteId}'`,
);
publishNoteStream(note.id, "pollVoted", {
publishToNoteStream(note.id, NoteEvent.Vote, {
choice: choice,
userId: user.id,
});

View file

@ -1,5 +1,5 @@
import * as mfm from "mfm-js";
import { publishMainStream, publishNoteStream } from "@/services/stream.js";
import { publishMainStream } from "@/services/stream.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
import renderCreate from "@/remote/activitypub/renderer/create.js";
@ -46,6 +46,8 @@ import {
isQuote,
isSilencedServer,
publishToNotesStream,
publishToNoteStream,
NoteEvent,
} from "backend-rs";
import { countSameRenotes } from "@/misc/count-same-renotes.js";
import { deliverToRelays, getCachedRelays } from "../relay.js";
@ -466,7 +468,7 @@ export default async (
if (note.replyId != null) {
// Only provide the reply note id here as the recipient may not be authorized to see the note.
publishNoteStream(note.replyId, "replied", {
publishToNoteStream(note.replyId, NoteEvent.Reply, {
id: note.id,
});
}

View file

@ -1,5 +1,4 @@
import { Brackets, In } from "typeorm";
import { publishNoteStream } from "@/services/stream.js";
import renderDelete from "@/remote/activitypub/renderer/delete.js";
import renderAnnounce from "@/remote/activitypub/renderer/announce.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
@ -17,6 +16,7 @@ import { countSameRenotes } from "@/misc/count-same-renotes.js";
import { registerOrFetchInstanceDoc } from "@/services/register-or-fetch-instance-doc.js";
import { deliverToRelays } from "@/services/relay.js";
import type { IActivity } from "@/remote/activitypub/type.js";
import { NoteEvent, publishToNoteStream } from "backend-rs";
async function recalculateNotesCountOfLocalUser(user: {
id: User["id"];
@ -72,7 +72,7 @@ export default async function (
// Only broadcast "deleted" to local if the note is deleted from db
if (deleteFromDb) {
publishNoteStream(note.id, "deleted", {
publishToNoteStream(note.id, NoteEvent.Delete, {
deletedAt: deletedAt,
});
}
@ -112,7 +112,7 @@ export default async function (
for (const cascadingNote of cascadingNotes) {
if (deleteFromDb) {
// For other notes, publishNoteStream is also required.
publishNoteStream(cascadingNote.id, "deleted", {
publishToNoteStream(cascadingNote.id, NoteEvent.Delete, {
deletedAt: deletedAt,
});
}

View file

@ -1,5 +1,4 @@
import * as mfm from "mfm-js";
import { publishNoteStream } from "@/services/stream.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -18,7 +17,12 @@ import {
import type { DriveFile } from "@/models/entities/drive-file.js";
import { In } from "typeorm";
import type { ILocalUser, IRemoteUser } from "@/models/entities/user.js";
import { genId, publishToNoteUpdatesStream } from "backend-rs";
import {
genId,
NoteEvent,
publishToNoteStream,
publishToNoteUpdatesStream,
} from "backend-rs";
import type { IPoll } from "@/models/entities/poll.js";
import { deliverToRelays } from "../relay.js";
import renderUpdate from "@/remote/activitypub/renderer/update.js";
@ -187,7 +191,7 @@ export default async function (
if (publishing) {
// Publish update event for the updated note details
publishNoteStream(note.id, "updated", {
publishToNoteStream(note.id, NoteEvent.Update, {
updatedAt: update.updatedAt,
});

View file

@ -1,9 +1,8 @@
import { publishNoteStream } from "@/services/stream.js";
import type { CacheableUser } from "@/models/entities/user.js";
import type { Note } from "@/models/entities/note.js";
import { PollVotes, NoteWatchings, Polls, Blockings } from "@/models/index.js";
import { Not } from "typeorm";
import { genIdAt } from "backend-rs";
import { genIdAt, NoteEvent, publishToNoteStream } from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
export default async function (
@ -60,7 +59,7 @@ export default async function (
`UPDATE poll SET votes[${index}] = votes[${index}] + 1 WHERE "noteId" = '${poll.noteId}'`,
);
publishNoteStream(note.id, "pollVoted", {
publishToNoteStream(note.id, NoteEvent.Vote, {
choice: choice,
userId: user.id,
});

View file

@ -1,4 +1,3 @@
import { publishNoteStream } from "@/services/stream.js";
import { renderLike } from "@/remote/activitypub/renderer/like.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -13,7 +12,13 @@ import {
Blockings,
} from "@/models/index.js";
import { IsNull, Not } from "typeorm";
import { decodeReaction, genIdAt, toDbReaction } from "backend-rs";
import {
decodeReaction,
genIdAt,
NoteEvent,
publishToNoteStream,
toDbReaction,
} from "backend-rs";
import { createNotification } from "@/services/create-notification.js";
import deleteReaction from "./delete.js";
import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js";
@ -102,7 +107,7 @@ export default async (
select: ["name", "host", "originalUrl", "publicUrl"],
});
publishNoteStream(note.id, "reacted", {
publishToNoteStream(note.id, NoteEvent.React, {
reaction: decodedReaction.reaction,
emoji:
emoji != null

View file

@ -1,4 +1,3 @@
import { publishNoteStream } from "@/services/stream.js";
import { renderLike } from "@/remote/activitypub/renderer/like.js";
import { renderUndo } from "@/remote/activitypub/renderer/undo.js";
import { renderActivity } from "@/remote/activitypub/renderer/index.js";
@ -7,7 +6,7 @@ import { IdentifiableError } from "@/misc/identifiable-error.js";
import type { User, IRemoteUser } from "@/models/entities/user.js";
import type { Note } from "@/models/entities/note.js";
import { NoteReactions, Users, Notes } from "@/models/index.js";
import { decodeReaction } from "backend-rs";
import { decodeReaction, NoteEvent, publishToNoteStream } from "backend-rs";
export default async (
user: { id: User["id"]; host: User["host"] },
@ -48,7 +47,7 @@ export default async (
Notes.decrement({ id: note.id }, "score", 1);
publishNoteStream(note.id, "unreacted", {
publishToNoteStream(note.id, NoteEvent.Unreact, {
reaction: decodeReaction(reaction.reaction).reaction,
userId: user.id,
});

View file

@ -1,7 +1,7 @@
import { redisClient } from "@/db/redis.js";
import type { User } from "@/models/entities/user.js";
import type { Note } from "@/models/entities/note.js";
import type { UserList } from "@/models/entities/user-list.js";
// import type { Note } from "@/models/entities/note.js";
// import type { UserList } from "@/models/entities/user-list.js";
// import type { UserGroup } from "@/models/entities/user-group.js";
import { config } from "@/config.js";
// import type { Antenna } from "@/models/entities/antenna.js";
@ -18,7 +18,7 @@ import type {
MainStreamTypes,
// MessagingIndexStreamTypes,
// MessagingStreamTypes,
NoteStreamTypes,
// NoteStreamTypes,
UserStreamTypes,
// NoteUpdatesStreamTypes,
} from "@/server/api/stream/types.js";
@ -101,16 +101,17 @@ class Publisher {
// );
// };
public publishNoteStream = <K extends keyof NoteStreamTypes>(
noteId: Note["id"],
type: K,
value?: NoteStreamTypes[K],
): void => {
this.publish(`noteStream:${noteId}`, type, {
id: noteId,
body: value,
});
};
/* ported to backend-rs */
// public publishNoteStream = <K extends keyof NoteStreamTypes>(
// noteId: Note["id"],
// type: K,
// value?: NoteStreamTypes[K],
// ): void => {
// this.publish(`noteStream:${noteId}`, type, {
// id: noteId,
// body: value,
// });
// };
/* ported to backend-rs */
// public publishNoteUpdatesStream = <K extends keyof NoteUpdatesStreamTypes>(
@ -218,7 +219,7 @@ export const publishUserEvent = publisher.publishUserEvent;
// export const publishBroadcastStream = publisher.publishBroadcastStream;
export const publishMainStream = publisher.publishMainStream;
// export const publishDriveStream = publisher.publishDriveStream;
export const publishNoteStream = publisher.publishNoteStream;
// export const publishNoteStream = publisher.publishNoteStream;
// export const publishNotesStream = publisher.publishNotesStream;
// export const publishNoteUpdatesStream = publisher.publishNoteUpdatesStream;
// export const publishChannelStream = publisher.publishChannelStream;