fix (backend): fix scheduled reply/quote behavior

This commit is contained in:
naskya 2024-06-07 07:35:56 +09:00
parent d6bd3af8a9
commit 5daa113928
No known key found for this signature in database
GPG key ID: 712D413B3A9FED5C
2 changed files with 298 additions and 284 deletions

View file

@ -171,6 +171,9 @@ export default async (
const dontFederateInitially = const dontFederateInitially =
data.visibility?.startsWith("hidden") === true; data.visibility?.startsWith("hidden") === true;
// Whether this is a scheduled "draft" post (yet to be published)
const isDraft = data.scheduledAt != null;
// If you reply outside the channel, match the scope of the target. // If you reply outside the channel, match the scope of the target.
// TODO (I think it's a process that could be done on the client side, but it's server side for now.) // TODO (I think it's a process that could be done on the client side, but it's server side for now.)
if ( if (
@ -270,7 +273,7 @@ export default async (
data.text = data.text?.trim() ?? null; data.text = data.text?.trim() ?? null;
if (data.lang) { if (data.lang != null) {
if (!Object.keys(langmap).includes(data.lang.toLowerCase())) if (!Object.keys(langmap).includes(data.lang.toLowerCase()))
throw new Error("invalid param"); throw new Error("invalid param");
data.lang = data.lang.toLowerCase(); data.lang = data.lang.toLowerCase();
@ -314,7 +317,7 @@ export default async (
); );
} }
if (data.visibility === "specified") { if (!isDraft && data.visibility === "specified") {
if (data.visibleUsers == null) throw new Error("invalid param"); if (data.visibleUsers == null) throw new Error("invalid param");
for (const u of data.visibleUsers) { for (const u of data.visibleUsers) {
@ -352,310 +355,317 @@ export default async (
}); });
} }
// ハッシュタグ更新 if (!isDraft) {
if (data.visibility === "public" || data.visibility === "home") { // ハッシュタグ更新
updateHashtags(user, tags); if (data.visibility === "public" || data.visibility === "home") {
} updateHashtags(user, tags);
}
// Increment notes count (user) // Increment notes count (user)
incNotesCountOfUser(user); incNotesCountOfUser(user);
// Word mutes & antenna // Word mutes & antenna
const thisNoteIsMutedBy: string[] = []; const thisNoteIsMutedBy: string[] = [];
await hardMutesCache await hardMutesCache
.fetch(null, () => .fetch(null, () =>
UserProfiles.find({ UserProfiles.find({
where: { where: {
enableWordMute: true, enableWordMute: true,
},
select: ["userId", "mutedWords", "mutedPatterns"],
}),
)
.then(async (us) => {
for (const u of us) {
if (u.userId === user.id) return;
await checkWordMute(note, u.mutedWords, u.mutedPatterns).then(
(shouldMute: boolean) => {
if (shouldMute) {
thisNoteIsMutedBy.push(u.userId);
MutedNotes.insert({
id: genId(),
userId: u.userId,
noteId: note.id,
reason: "word",
});
}
}, },
); select: ["userId", "mutedWords", "mutedPatterns"],
} }),
}); )
.then(async (us) => {
for (const u of us) {
if (u.userId === user.id) return;
await checkWordMute(note, u.mutedWords, u.mutedPatterns).then(
(shouldMute: boolean) => {
if (shouldMute) {
thisNoteIsMutedBy.push(u.userId);
MutedNotes.insert({
id: genId(),
userId: u.userId,
noteId: note.id,
reason: "word",
});
}
},
);
}
});
// type errors will be resolved by https://github.com/napi-rs/napi-rs/pull/2054 // type errors will be resolved by https://github.com/napi-rs/napi-rs/pull/2054
const _note = toRustObject(note); const _note = toRustObject(note);
if (note.renoteId == null || isQuote(_note)) { if (note.renoteId == null || isQuote(_note)) {
await updateAntennasOnNewNote(_note, user, thisNoteIsMutedBy); await updateAntennasOnNewNote(_note, user, thisNoteIsMutedBy);
} }
// Channel // Channel
if (note.channelId) { if (note.channelId != null) {
ChannelFollowings.findBy({ followeeId: note.channelId }).then( ChannelFollowings.findBy({ followeeId: note.channelId }).then(
(followings) => { (followings) => {
for (const following of followings) { for (const following of followings) {
insertNoteUnread(following.followerId, note, { insertNoteUnread(following.followerId, note, {
isSpecified: false, isSpecified: false,
isMentioned: false,
});
}
},
);
}
if (data.reply) {
saveReply(data.reply, note);
}
// この投稿を除く指定したユーザーによる指定したノートのリノートが存在しないとき
if (
data.renote &&
!user.isBot &&
(await countSameRenotes(user.id, data.renote.id, note.id)) === 0
) {
incRenoteCount(data.renote);
}
if (data.poll?.expiresAt) {
const delay = data.poll.expiresAt.getTime() - Date.now();
endedPollNotificationQueue.add(
{
noteId: note.id,
},
{
delay,
removeOnComplete: true,
},
);
}
if (!silent) {
if (Users.isLocalUser(user)) activeUsersChart.write(user);
// 未読通知を作成
if (data.visibility === "specified") {
if (data.visibleUsers == null) throw new Error("invalid param");
for (const u of data.visibleUsers) {
// ローカルユーザーのみ
if (!Users.isLocalUser(u)) continue;
insertNoteUnread(u.id, note, {
isSpecified: true,
isMentioned: false, isMentioned: false,
}); });
} }
},
);
}
if (data.reply) {
saveReply(data.reply, note);
}
// この投稿を除く指定したユーザーによる指定したノートのリノートが存在しないとき
if (
data.renote &&
!user.isBot &&
(await countSameRenotes(user.id, data.renote.id, note.id)) === 0
) {
incRenoteCount(data.renote);
}
if (data.poll?.expiresAt) {
const delay = data.poll.expiresAt.getTime() - Date.now();
endedPollNotificationQueue.add(
{
noteId: note.id,
},
{
delay,
removeOnComplete: true,
},
);
}
if (!silent) {
if (Users.isLocalUser(user)) activeUsersChart.write(user);
// 未読通知を作成
if (data.visibility === "specified") {
if (data.visibleUsers == null) throw new Error("invalid param");
for (const u of data.visibleUsers) {
// ローカルユーザーのみ
if (!Users.isLocalUser(u)) continue;
insertNoteUnread(u.id, note, {
isSpecified: true,
isMentioned: false,
});
}
} else {
for (const u of mentionedUsers) {
// ローカルユーザーのみ
if (!Users.isLocalUser(u)) continue;
insertNoteUnread(u.id, note, {
isSpecified: false,
isMentioned: true,
});
}
}
if (!dontFederateInitially) {
let publishKey: string;
let noteToPublish: Note;
const relays = await getCachedRelays();
// Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor.
const boostedByRelay =
!!user.inbox &&
relays.map((relay) => relay.inbox).includes(user.inbox);
if (boostedByRelay && data.renote && data.renote.userHost) {
publishKey = `publishedNote:${data.renote.id}`;
noteToPublish = data.renote;
} else { } else {
publishKey = `publishedNote:${note.id}`; for (const u of mentionedUsers) {
noteToPublish = note; // ローカルユーザーのみ
} if (!Users.isLocalUser(u)) continue;
const lock = new Mutex(redisClient, "publishedNote"); insertNoteUnread(u.id, note, {
await lock.acquire(); isSpecified: false,
try { isMentioned: true,
const published = (await redisClient.get(publishKey)) != null;
if (!published) {
await redisClient.set(publishKey, "done", "EX", 30);
if (noteToPublish.renoteId) {
// Prevents other threads from publishing the boosting post
await redisClient.set(
`publishedNote:${noteToPublish.renoteId}`,
"done",
"EX",
30,
);
}
publishNotesStream(noteToPublish);
}
} finally {
await lock.release();
}
}
if (note.replyId != null) {
// Only provide the reply note id here as the recipient may not be authorized to see the note.
publishNoteStream(note.replyId, "replied", {
id: note.id,
});
}
const webhooks = await getActiveWebhooks().then((webhooks) =>
webhooks.filter((x) => x.userId === user.id && x.on.includes("note")),
);
for (const webhook of webhooks) {
webhookDeliver(webhook, "note", {
note: await Notes.pack(note, user),
});
}
const nm = new NotificationManager(user, note);
const nmRelatedPromises = [];
await createMentionedEvents(mentionedUsers, note, nm);
// If has in reply to note
if (data.reply) {
// Fetch watchers
nmRelatedPromises.push(notifyToWatchersOfReplyee(data.reply, user, nm));
// 通知
if (data.reply.userHost === null) {
const threadMuted = await NoteThreadMutings.findOneBy({
userId: data.reply.userId,
threadId: data.reply.threadId || data.reply.id,
});
if (!threadMuted) {
nm.push(data.reply.userId, "reply");
const packedReply = await Notes.pack(note, {
id: data.reply.userId,
}); });
publishMainStream(data.reply.userId, "reply", packedReply); }
}
if (!dontFederateInitially) {
let publishKey: string;
let noteToPublish: Note;
const relays = await getCachedRelays();
// Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor.
const boostedByRelay =
!!user.inbox &&
relays.map((relay) => relay.inbox).includes(user.inbox);
if (boostedByRelay && data.renote && data.renote.userHost) {
publishKey = `publishedNote:${data.renote.id}`;
noteToPublish = data.renote;
} else {
publishKey = `publishedNote:${note.id}`;
noteToPublish = note;
}
const lock = new Mutex(redisClient, "publishedNote");
await lock.acquire();
try {
const published = (await redisClient.get(publishKey)) != null;
if (!published) {
await redisClient.set(publishKey, "done", "EX", 30);
if (noteToPublish.renoteId) {
// Prevents other threads from publishing the boosting post
await redisClient.set(
`publishedNote:${noteToPublish.renoteId}`,
"done",
"EX",
30,
);
}
publishNotesStream(noteToPublish);
}
} finally {
await lock.release();
}
}
if (note.replyId != null) {
// Only provide the reply note id here as the recipient may not be authorized to see the note.
publishNoteStream(note.replyId, "replied", {
id: note.id,
});
}
const webhooks = await getActiveWebhooks().then((webhooks) =>
webhooks.filter((x) => x.userId === user.id && x.on.includes("note")),
);
for (const webhook of webhooks) {
webhookDeliver(webhook, "note", {
note: await Notes.pack(note, user),
});
}
const nm = new NotificationManager(user, note);
const nmRelatedPromises = [];
await createMentionedEvents(mentionedUsers, note, nm);
// If has in reply to note
if (data.reply != null) {
// Fetch watchers
nmRelatedPromises.push(
notifyToWatchersOfReplyee(data.reply, user, nm),
);
// 通知
if (data.reply.userHost === null) {
const threadMuted = await NoteThreadMutings.findOneBy({
userId: data.reply.userId,
threadId: data.reply.threadId || data.reply.id,
});
if (!threadMuted) {
nm.push(data.reply.userId, "reply");
const packedReply = await Notes.pack(note, {
id: data.reply.userId,
});
publishMainStream(data.reply.userId, "reply", packedReply);
const webhooks = (await getActiveWebhooks()).filter(
(x) =>
x.userId === data.reply?.userId && x.on.includes("reply"),
);
for (const webhook of webhooks) {
webhookDeliver(webhook, "reply", {
note: packedReply,
});
}
}
}
}
// If it is renote
if (data.renote != null) {
const type = data.text ? "quote" : "renote";
// Notify
if (data.renote.userHost === null) {
const threadMuted = await NoteThreadMutings.findOneBy({
userId: data.renote.userId,
threadId: data.renote.threadId || data.renote.id,
});
if (!threadMuted) {
nm.push(data.renote.userId, type);
}
}
// Fetch watchers
nmRelatedPromises.push(
notifyToWatchersOfRenotee(data.renote, user, nm, type),
);
// Publish event
if (user.id !== data.renote.userId && data.renote.userHost === null) {
const packedRenote = await Notes.pack(note, {
id: data.renote.userId,
});
publishMainStream(data.renote.userId, "renote", packedRenote);
const renote = data.renote;
const webhooks = (await getActiveWebhooks()).filter( const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === data.reply?.userId && x.on.includes("reply"), (x) => x.userId === renote.userId && x.on.includes("renote"),
); );
for (const webhook of webhooks) { for (const webhook of webhooks) {
webhookDeliver(webhook, "reply", { webhookDeliver(webhook, "renote", {
note: packedReply, note: packedRenote,
}); });
} }
} }
} }
Promise.all(nmRelatedPromises).then(() => {
nm.deliver();
});
//#region AP deliver
if (Users.isLocalUser(user) && !dontFederateInitially) {
(async () => {
const noteActivity = await renderNoteOrRenoteActivity(data, note);
const dm = new DeliverManager(user, noteActivity);
// メンションされたリモートユーザーに配送
for (const u of mentionedUsers.filter((u) =>
Users.isRemoteUser(u),
)) {
dm.addDirectRecipe(u as IRemoteUser);
}
// 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送
if (data.reply?.userHost != null) {
const u = await Users.findOneBy({ id: data.reply.userId });
if (u && Users.isRemoteUser(u)) dm.addDirectRecipe(u);
}
// 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送
if (data.renote?.userHost != null) {
const u = await Users.findOneBy({ id: data.renote.userId });
if (u && Users.isRemoteUser(u)) dm.addDirectRecipe(u);
}
// フォロワーに配送
if (["public", "home", "followers"].includes(note.visibility)) {
dm.addFollowersRecipe();
}
if (["public"].includes(note.visibility)) {
deliverToRelays(user, noteActivity);
}
dm.execute();
})();
}
//#endregion
} }
// If it is renote if (data.channel) {
if (data.renote) { Channels.increment({ id: data.channel.id }, "notesCount", 1);
const type = data.text ? "quote" : "renote"; Channels.update(data.channel.id, {
lastNotedAt: new Date(),
});
// Notify await Notes.countBy({
if (data.renote.userHost === null) { userId: user.id,
const threadMuted = await NoteThreadMutings.findOneBy({ channelId: data.channel.id,
userId: data.renote.userId, }).then((count) => {
threadId: data.renote.threadId || data.renote.id, // この処理が行われるのはノート作成後なので、ノートが一つしかなかったら最初の投稿だと判断できる
}); // TODO: とはいえノートを削除して何回も投稿すればその分だけインクリメントされる雑さもあるのでどうにかしたい
if (count === 1 && data.channel != null) {
if (!threadMuted) { Channels.increment({ id: data.channel.id }, "usersCount", 1);
nm.push(data.renote.userId, type);
} }
} });
// Fetch watchers
nmRelatedPromises.push(
notifyToWatchersOfRenotee(data.renote, user, nm, type),
);
// Publish event
if (user.id !== data.renote.userId && data.renote.userHost === null) {
const packedRenote = await Notes.pack(note, {
id: data.renote.userId,
});
publishMainStream(data.renote.userId, "renote", packedRenote);
const renote = data.renote;
const webhooks = (await getActiveWebhooks()).filter(
(x) => x.userId === renote.userId && x.on.includes("renote"),
);
for (const webhook of webhooks) {
webhookDeliver(webhook, "renote", {
note: packedRenote,
});
}
}
} }
Promise.all(nmRelatedPromises).then(() => {
nm.deliver();
});
//#region AP deliver
if (Users.isLocalUser(user) && !dontFederateInitially) {
(async () => {
const noteActivity = await renderNoteOrRenoteActivity(data, note);
const dm = new DeliverManager(user, noteActivity);
// メンションされたリモートユーザーに配送
for (const u of mentionedUsers.filter((u) => Users.isRemoteUser(u))) {
dm.addDirectRecipe(u as IRemoteUser);
}
// 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送
if (data.reply?.userHost != null) {
const u = await Users.findOneBy({ id: data.reply.userId });
if (u && Users.isRemoteUser(u)) dm.addDirectRecipe(u);
}
// 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送
if (data.renote?.userHost != null) {
const u = await Users.findOneBy({ id: data.renote.userId });
if (u && Users.isRemoteUser(u)) dm.addDirectRecipe(u);
}
// フォロワーに配送
if (["public", "home", "followers"].includes(note.visibility)) {
dm.addFollowersRecipe();
}
if (["public"].includes(note.visibility)) {
deliverToRelays(user, noteActivity);
}
dm.execute();
})();
}
//#endregion
}
if (data.channel) {
Channels.increment({ id: data.channel.id }, "notesCount", 1);
Channels.update(data.channel.id, {
lastNotedAt: new Date(),
});
await Notes.countBy({
userId: user.id,
channelId: data.channel.id,
}).then((count) => {
// この処理が行われるのはノート作成後なので、ノートが一つしかなかったら最初の投稿だと判断できる
// TODO: とはいえノートを削除して何回も投稿すればその分だけインクリメントされる雑さもあるのでどうにかしたい
if (count === 1 && data.channel != null) {
Channels.increment({ id: data.channel.id }, "usersCount", 1);
}
});
} }
}); });

View file

@ -42,8 +42,12 @@ export default async function (
) { ) {
const deletedAt = new Date(); const deletedAt = new Date();
// Whether this is a scheduled "draft" post
const isDraft = note.scheduledAt != null;
// この投稿を除く指定したユーザーによる指定したノートのリノートが存在しないとき // この投稿を除く指定したユーザーによる指定したノートのリノートが存在しないとき
if ( if (
!isDraft &&
note.renoteId && note.renoteId &&
(await countSameRenotes(user.id, note.renoteId, note.id)) === 0 && (await countSameRenotes(user.id, note.renoteId, note.id)) === 0 &&
deleteFromDb deleteFromDb
@ -52,7 +56,7 @@ export default async function (
Notes.decrement({ id: note.renoteId }, "score", 1); Notes.decrement({ id: note.renoteId }, "score", 1);
} }
if (note.replyId && deleteFromDb) { if (!isDraft && note.replyId != null && deleteFromDb) {
await Notes.decrement({ id: note.replyId }, "repliesCount", 1); await Notes.decrement({ id: note.replyId }, "repliesCount", 1);
} }
@ -67,14 +71,14 @@ export default async function (
const instanceNotesCountDecreasement: Record<string, number> = {}; const instanceNotesCountDecreasement: Record<string, number> = {};
// Only broadcast "deleted" to local if the note is deleted from db // Only broadcast "deleted" to local if the note is deleted from db
if (deleteFromDb) { if (!isDraft && deleteFromDb) {
publishNoteStream(note.id, "deleted", { publishNoteStream(note.id, "deleted", {
deletedAt: deletedAt, deletedAt: deletedAt,
}); });
} }
//#region ローカルの投稿なら削除アクティビティを配送 //#region ローカルの投稿なら削除アクティビティを配送
if (Users.isLocalUser(user) && !note.localOnly) { if (!isDraft && Users.isLocalUser(user) && !note.localOnly) {
let renote: Note | null = null; let renote: Note | null = null;
// if deletd note is renote // if deletd note is renote