store notifications to scylla

This commit is contained in:
Namekuji 2023-08-19 05:01:25 -04:00
parent 6b156d792a
commit d315c80fd1
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
28 changed files with 530 additions and 213 deletions

View file

@ -76,8 +76,12 @@ db:
# # 15 days after signing up. In this case, the user's first post does not
# # appear in their home timeline.
# #
# # (default: 14)
# # (default 14)
# sparseTimelineDays: 14
#
# # <queryLimit> determines the number of maximum rows read within one pagination.
# # (default 1000, max 5000, min 1)
# queryLimit: 1000
# ┌─────────────────────┐
#───┘ Redis configuration └─────────────────────────────────────

View file

@ -1,22 +1,22 @@
DROP TABLE IF EXISTS poll_vote;
DROP MATERIALIZED VIEW IF EXISTS reaction_by_id;
DROP MATERIALIZED VIEW IF EXISTS reaction_by_user_id;
DROP INDEX IF EXISTS reaction_by_id;
DROP TABLE IF EXISTS reaction;
DROP INDEX IF EXISTS home_by_id;
DROP TABLE IF EXISTS home_timeline;
DROP MATERIALIZED VIEW IF EXISTS local_timeline;
DROP MATERIALIZED VIEW IF EXISTS global_timeline;
DROP MATERIALIZED VIEW IF EXISTS note_by_channel_id;
DROP MATERIALIZED VIEW IF EXISTS note_by_renote_id_and_user_id;
DROP MATERIALIZED VIEW IF EXISTS note_by_renote_id;
DROP MATERIALIZED VIEW IF EXISTS note_by_user_id;
DROP MATERIALIZED VIEW IF EXISTS note_by_id;
DROP INDEX IF EXISTS note_by_reply_id;
DROP INDEX IF EXISTS note_by_uri;
DROP INDEX IF EXISTS note_by_url;
DROP TABLE IF EXISTS note;
DROP TYPE IF EXISTS poll;
DROP TYPE IF EXISTS emoji;
DROP TYPE IF EXISTS note_edit_history;
DROP TYPE IF EXISTS drive_file;
DROP TABLE poll_vote;
DROP MATERIALIZED VIEW reaction_by_id;
DROP MATERIALIZED VIEW reaction_by_user_id;
DROP INDEX reaction_by_id;
DROP TABLE reaction;
DROP INDEX home_by_id;
DROP TABLE home_timeline;
DROP MATERIALIZED VIEW local_timeline;
DROP MATERIALIZED VIEW global_timeline;
DROP MATERIALIZED VIEW note_by_channel_id;
DROP MATERIALIZED VIEW note_by_renote_id_and_user_id;
DROP MATERIALIZED VIEW note_by_renote_id;
DROP MATERIALIZED VIEW note_by_user_id;
DROP MATERIALIZED VIEW note_by_id;
DROP INDEX note_by_reply_id;
DROP INDEX note_by_uri;
DROP INDEX note_by_url;
DROP TABLE note;
DROP TYPE poll;
DROP TYPE emoji;
DROP TYPE note_edit_history;
DROP TYPE drive_file;

View file

@ -1,4 +1,4 @@
CREATE TYPE IF NOT EXISTS drive_file (
CREATE TYPE drive_file (
"id" ascii,
"type" ascii,
"createdAt" timestamp,
@ -15,27 +15,27 @@ CREATE TYPE IF NOT EXISTS drive_file (
"height" int,
);
CREATE TYPE IF NOT EXISTS note_edit_history (
CREATE TYPE note_edit_history (
"content" text,
"cw" text,
"files" set<frozen<drive_file>>,
"updatedAt" timestamp,
);
CREATE TYPE IF NOT EXISTS emoji (
CREATE TYPE emoji (
"name" text,
"url" text,
"width" int,
"height" int,
);
CREATE TYPE IF NOT EXISTS poll (
CREATE TYPE poll (
"expiresAt" timestamp,
"multiple" boolean,
"choices" map<int, text>,
);
CREATE TABLE IF NOT EXISTS note ( -- Store all posts
CREATE TABLE note ( -- Store all posts
"createdAtDate" date, -- For partitioning
"createdAt" timestamp,
"id" ascii, -- Post
@ -79,11 +79,11 @@ CREATE TABLE IF NOT EXISTS note ( -- Store all posts
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility")
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE INDEX IF NOT EXISTS note_by_uri ON note ("uri");
CREATE INDEX IF NOT EXISTS note_by_url ON note ("url");
CREATE INDEX IF NOT EXISTS note_by_reply_id ON note ("replyId");
CREATE INDEX note_by_uri ON note ("uri");
CREATE INDEX note_by_url ON note ("url");
CREATE INDEX note_by_reply_id ON note ("replyId");
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_id AS
CREATE MATERIALIZED VIEW note_by_id AS
SELECT * FROM note
WHERE "id" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -94,7 +94,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_id AS
PRIMARY KEY ("id", "createdAt", "createdAtDate", "userId", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_user_id AS
CREATE MATERIALIZED VIEW note_by_user_id AS
SELECT * FROM note
WHERE "userId" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -104,7 +104,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_user_id AS
PRIMARY KEY ("userId", "createdAt", "createdAtDate", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id AS
CREATE MATERIALIZED VIEW note_by_renote_id AS
SELECT * FROM note
WHERE "renoteId" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -115,7 +115,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id AS
PRIMARY KEY ("renoteId", "createdAt", "createdAtDate", "userId", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id_and_user_id AS
CREATE MATERIALIZED VIEW note_by_renote_id_and_user_id AS
SELECT "renoteId", "userId", "createdAt", "createdAtDate", "userHost", "visibility", "id" FROM note
WHERE "renoteId" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -126,7 +126,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id_and_user_id AS
PRIMARY KEY (("renoteId", "userId"), "createdAt", "createdAtDate", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_channel_id AS
CREATE MATERIALIZED VIEW note_by_channel_id AS
SELECT * FROM note
WHERE "channelId" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -137,7 +137,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_channel_id AS
PRIMARY KEY ("channelId", "createdAt", "createdAtDate", "userId", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS global_timeline AS
CREATE MATERIALIZED VIEW global_timeline AS
SELECT * FROM note
WHERE "createdAtDate" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -147,7 +147,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS global_timeline AS
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS local_timeline AS
CREATE MATERIALIZED VIEW local_timeline AS
SELECT * FROM note
WHERE "createdAtDate" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -157,7 +157,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS local_timeline AS
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE TABLE IF NOT EXISTS home_timeline (
CREATE TABLE home_timeline (
"feedUserId" ascii, -- For partitioning
"createdAtDate" date, -- For partitioning
"createdAt" timestamp,
@ -202,9 +202,9 @@ CREATE TABLE IF NOT EXISTS home_timeline (
PRIMARY KEY (("feedUserId", "createdAtDate"), "createdAt", "userId")
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE INDEX IF NOT EXISTS home_by_id ON home_timeline ("id");
CREATE INDEX home_by_id ON home_timeline ("id");
CREATE TABLE IF NOT EXISTS reaction (
CREATE TABLE reaction (
"id" text,
"noteId" ascii,
"userId" ascii,
@ -214,7 +214,7 @@ CREATE TABLE IF NOT EXISTS reaction (
PRIMARY KEY ("noteId", "userId") -- this key constraints one reaction per user for the same post
);
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_user_id AS
CREATE MATERIALIZED VIEW reaction_by_user_id AS
SELECT * FROM reaction
WHERE "userId" IS NOT NULL
AND "createdAt" IS NOT NULL
@ -222,14 +222,14 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_user_id AS
PRIMARY KEY ("userId", "createdAt", "noteId")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_id AS
CREATE MATERIALIZED VIEW reaction_by_id AS
SELECT * FROM reaction
WHERE "noteId" IS NOT NULL
AND "reaction" IS NOT NULL
AND "userId" IS NOT NULL
PRIMARY KEY ("noteId", "reaction", "userId");
CREATE TABLE IF NOT EXISTS poll_vote (
CREATE TABLE poll_vote (
"noteId" ascii,
"userId" ascii,
"choice" set<int>,

View file

@ -0,0 +1,2 @@
DROP INDEX notification_by_id;
DROP TABLE notification;

View file

@ -0,0 +1,18 @@
CREATE TABLE notification (
"targetId" ascii,
"createdAtDate" date,
"createdAt" timestamp,
"id" ascii,
"notifierId" ascii,
"notifierHost" text,
"type" ascii,
"entityId" ascii,
"reaction" text,
"choice" int,
"customBody" text,
"customHeader" text,
"customIcon" text,
PRIMARY KEY (("targetId", "createdAtDate"), "createdAt")
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE INDEX notification_by_id ON notification ("id");

View file

@ -38,28 +38,32 @@ impl Initializer {
}
pub(crate) async fn setup(&self) -> Result<(), Error> {
let pairs = vec![
let mut conn = PgConnection::connect(&self.postgres_url).await?;
let fk_pairs = vec![
("channel_note_pining", "FK_10b19ef67d297ea9de325cd4502"),
("clip_note", "FK_a012eaf5c87c65da1deb5fdbfa3"),
("muted_note", "FK_70ab9786313d78e4201d81cdb89"),
("note_edit", "FK_702ad5ae993a672e4fbffbcd38c"),
("note_favorite", "FK_0e00498f180193423c992bc4370"),
("note_unread", "FK_e637cba4dc4410218c4251260e4"),
("note_watching", "FK_03e7028ab8388a3f5e3ce2a8619"),
("promo_note", "FK_e263909ca4fe5d57f8d4230dd5c"),
("promo_read", "FK_a46a1a603ecee695d7db26da5f4"),
("user_note_pining", "FK_68881008f7c3588ad7ecae471cf"),
("notification", "FK_769cb6b73a1efe22ddf733ac453"),
];
let mut conn = PgConnection::connect(&self.postgres_url).await?;
for (table, fk) in pairs {
for (table, fk) in fk_pairs {
sqlx::query(&format!("ALTER TABLE {} DROP CONSTRAINT \"{}\"", table, fk))
.execute(&mut conn)
.await?;
}
let tables = vec!["note", "note_edit", "poll", "poll_vote", "notification"];
for table in tables {
sqlx::query(&format!("DROP TABLE {}", table))
.execute(&mut conn)
.await?;
}
Ok(())
}
}

View file

@ -23,6 +23,7 @@ export type Source = {
replicationFactor: number;
localDataCentre: string;
sparseTimelineDays?: number;
queryLimit?: number;
};
redis: {
host: string;

View file

@ -159,4 +159,13 @@ export const scyllaQueries = {
select: `SELECT * FROM poll_vote WHERE "noteId" = ?`,
insert: `INSERT INTO poll_vote ("noteId", "userId", "choice", "createdAt") VALUES (?, ?, ?, ?)`,
},
notification: {
insert: `INSERT INTO notification
("targetId", "createdAtDate", "createdAt", "id", "notifierId", "notifierHost", "type", "entityId", "reaction", "choice", "customBody", "customHeader", "customIcon")
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
select: {
byTargetId: `SELECT * FROM notification WHERE "targetId" = ? AND "createdAtDate" = ?`,
byId: `SELECT * FROM notification WHERE "id" = ?`,
},
},
};

View file

@ -40,7 +40,7 @@ import { Signin } from "@/models/entities/signin.js";
import { AuthSession } from "@/models/entities/auth-session.js";
import { FollowRequest } from "@/models/entities/follow-request.js";
import { Emoji } from "@/models/entities/emoji.js";
import { UserNotePining, UserNotePiningScylla } from "@/models/entities/user-note-pining.js";
import { UserNotePining } from "@/models/entities/user-note-pining.js";
import { Poll } from "@/models/entities/poll.js";
import { UserKeypair } from "@/models/entities/user-keypair.js";
import { UserPublickey } from "@/models/entities/user-publickey.js";
@ -130,7 +130,7 @@ export const entities = [
UserGroup,
UserGroupJoining,
UserGroupInvitation,
config.scylla ? UserNotePiningScylla : UserNotePining,
UserNotePining,
UserSecurityKey,
UsedUsername,
AttestationChallenge,

View file

@ -21,6 +21,7 @@ import { UserProfiles } from "@/models/index.js";
import { getWordHardMute } from "@/misc/check-word-mute.js";
import type { UserProfile } from "@/models/entities/user-profile.js";
import { scyllaQueries } from "@/db/cql.js";
import { notificationTypes } from "@/types.js";
function newClient(): Client | null {
if (!config.scylla) {
@ -66,6 +67,40 @@ export const scyllaClient = newClient();
export const prepared = scyllaQueries;
export interface ScyllaNotification {
targetId: string;
createdAtDate: Date;
createdAt: Date;
id: string;
notifierId: string | null;
notifierHost: string | null;
type: typeof notificationTypes[number];
entityId: string | null;
reaction: string | null;
choice: number | null;
customBody: string | null;
customHeader: string | null;
customIcon: string | null;
}
export function parseScyllaNotification(row: types.Row): ScyllaNotification {
return {
targetId: row.get("targetId"),
createdAtDate: row.get("createdAt"),
createdAt: row.get("createdAt"),
id: row.get("id"),
type: row.get("type"),
notifierId: row.get("notifierId") ?? null,
notifierHost: row.get("notifierHost") ?? null,
entityId: row.get("entityId") ?? null,
reaction: row.get("reaction") ?? null,
choice: row.get("choice") ?? null,
customBody: row.get("customBody") ?? null,
customHeader: row.get("customHeader") ?? null,
customIcon: row.get("customIcon") ?? null,
};
}
export interface ScyllaDriveFile {
id: string;
type: string;
@ -114,14 +149,14 @@ export interface ScyllaNoteEditHistory {
export interface ScyllaPoll {
expiresAt: Date | null;
multiple: boolean;
choices: Record<number, string>,
choices: Record<number, string>;
}
export interface ScyllaPollVote {
noteId: string,
userId: string,
choice: Set<number>,
createdAt: Date,
noteId: string;
userId: string;
choice: Set<number>;
createdAt: Date;
}
export function parseScyllaPollVote(row: types.Row): ScyllaPollVote {
@ -130,7 +165,7 @@ export function parseScyllaPollVote(row: types.Row): ScyllaPollVote {
userId: row.get("userId"),
choice: new Set(row.get("choice") ?? []),
createdAt: row.get("createdAt"),
}
};
}
export type ScyllaNote = Note & {
@ -151,7 +186,7 @@ export function parseScyllaNote(row: types.Row): ScyllaNote {
const userHost = row.get("userHost");
return {
createdAtDate: row.get("createdAtDate"),
createdAtDate: row.get("createdAt"),
createdAt: row.get("createdAt"),
id: row.get("id"),
visibility: row.get("visibility"),
@ -216,8 +251,6 @@ export interface ScyllaNoteReaction extends NoteReaction {
emoji: PopulatedEmoji;
}
const QUERY_LIMIT = 1000; // TODO: should this be configurable?
export type FeedType =
| "home"
| "local"
@ -225,7 +258,8 @@ export type FeedType =
| "global"
| "renotes"
| "user"
| "channel";
| "channel"
| "notification";
export function parseScyllaReaction(row: types.Row): ScyllaNoteReaction {
return {
@ -238,7 +272,7 @@ export function parseScyllaReaction(row: types.Row): ScyllaNoteReaction {
};
}
export function prepareNoteQuery(
export function preparePaginationQuery(
kind: FeedType,
ps: {
untilId?: string;
@ -269,6 +303,9 @@ export function prepareNoteQuery(
case "channel":
queryParts.push(prepared.note.select.byChannelId);
break;
case "notification":
queryParts.push(prepared.notification.select.byTargetId);
break;
default:
queryParts.push(prepared.note.select.byDate);
}
@ -293,7 +330,8 @@ export function prepareNoteQuery(
queryParts.push(`AND "createdAt" > ?`);
}
queryParts.push(`LIMIT ${QUERY_LIMIT}`);
const queryLimit = config.scylla?.queryLimit ?? 1000;
queryParts.push(`LIMIT ${queryLimit}`);
const query = queryParts.join(" ");
@ -304,7 +342,7 @@ export function prepareNoteQuery(
};
}
export async function execNotePaginationQuery(
export async function execPaginationQuery(
kind: FeedType,
ps: {
limit: number;
@ -315,34 +353,37 @@ export async function execNotePaginationQuery(
noteId?: string;
channelId?: string;
},
filter?: (_: ScyllaNote[]) => Promise<ScyllaNote[]>,
filter?: {
note?: (_: ScyllaNote[]) => Promise<ScyllaNote[]>;
notification?: (_: ScyllaNotification[]) => ScyllaNotification[];
},
userId?: User["id"],
maxPartitions = config.scylla?.sparseTimelineDays ?? 14,
): Promise<ScyllaNote[]> {
): Promise<ScyllaNote[] | ScyllaNotification[]> {
if (!scyllaClient) return [];
switch (kind) {
case "home":
case "user":
if (!userId)
throw new Error("Query of home and user timelines needs userId");
case "notification":
if (!userId) throw new Error(`Feed ${kind} needs userId`);
break;
case "renotes":
if (!ps.noteId) throw new Error("Query of renotes needs noteId");
if (!ps.noteId) throw new Error(`Feed ${kind} needs noteId`);
break;
case "channel":
if (!ps.channelId)
throw new Error("Query of channel timeline needs channelId");
if (!ps.channelId) throw new Error(`Feed ${kind} needs channelId`);
break;
}
let { query, untilDate, sinceDate } = prepareNoteQuery(kind, ps);
let { query, untilDate, sinceDate } = preparePaginationQuery(kind, ps);
let scannedPartitions = 0;
const foundNotes: ScyllaNote[] = [];
const found: (ScyllaNote | ScyllaNotification)[] = [];
const queryLimit = config.scylla?.queryLimit ?? 1000;
// Try to get posts of at most <maxPartitions> in the single request
while (foundNotes.length < ps.limit && scannedPartitions < maxPartitions) {
while (found.length < ps.limit && scannedPartitions < maxPartitions) {
const params: (Date | string | string[] | number)[] = [];
if (kind === "home" && userId) {
params.push(userId, untilDate, untilDate);
@ -352,6 +393,8 @@ export async function execNotePaginationQuery(
params.push(ps.noteId, untilDate);
} else if (kind === "channel" && ps.channelId) {
params.push(ps.channelId, untilDate);
} else if (kind === "notification" && userId) {
params.push(userId, untilDate, untilDate);
} else {
params.push(untilDate, untilDate);
}
@ -365,12 +408,22 @@ export async function execNotePaginationQuery(
});
if (result.rowLength > 0) {
if (kind === "notification") {
const notifications = result.rows.map(parseScyllaNotification);
found.push(
...(filter?.notification
? filter.notification(notifications)
: notifications),
);
untilDate = notifications[notifications.length - 1].createdAt;
} else {
const notes = result.rows.map(parseScyllaNote);
foundNotes.push(...(filter ? await filter(notes) : notes));
found.push(...(filter?.note ? await filter.note(notes) : notes));
untilDate = notes[notes.length - 1].createdAt;
}
}
if (result.rowLength < QUERY_LIMIT) {
if (result.rowLength < queryLimit) {
// Reached the end of partition. Queries posts created one day before.
scannedPartitions++;
const yesterday = new Date(untilDate.getTime() - 86400000);
@ -380,7 +433,11 @@ export async function execNotePaginationQuery(
}
}
return foundNotes;
if (kind === "notification") {
return found as ScyllaNotification[];
}
return found as ScyllaNote[];
}
export async function filterVisibility(

View file

@ -65,8 +65,8 @@ export class Notification {
* reply - A post that a user made (or was watching) has been replied to.
* renote - A post that a user made (or was watching) has been renoted.
* quote - A post that a user made (or was watching) has been quoted and renoted.
* reaction - (Watchしている)稿
* pollVote - (Watchしている)稿
* reaction - Someone reacted my post or one I'm wathing
* pollVote - Someone voted to my poll or one I'm wathing
* pollEnded -
* receiveFollowRequest -
* followRequestAccepted - A follow request has been accepted.

View file

@ -10,8 +10,9 @@ import { Note } from "./note.js";
import { User } from "./user.js";
import { id } from "../id.js";
@Entity()
@Index(["userId", "noteId"], { unique: true })
class UserNotePiningBase {
export class UserNotePining {
@PrimaryColumn(id())
public id: string;
@ -32,16 +33,10 @@ class UserNotePiningBase {
@Column(id())
public noteId: Note["id"];
}
@Entity()
export class UserNotePining extends UserNotePiningBase {
@ManyToOne((type) => Note, {
onDelete: "CASCADE",
})
@JoinColumn()
public note: Note | null;
}
@Entity({ name: "user_note_pining" })
export class UserNotePiningScylla extends UserNotePiningBase {}

View file

@ -17,7 +17,7 @@ import { NoteRepository } from "./repositories/note.js";
import { DriveFileRepository } from "./repositories/drive-file.js";
import { DriveFolderRepository } from "./repositories/drive-folder.js";
import { AccessToken } from "./entities/access-token.js";
import { UserNotePining, UserNotePiningScylla } from "./entities/user-note-pining.js";
import { UserNotePining } from "./entities/user-note-pining.js";
import { SigninRepository } from "./repositories/signin.js";
import { MessagingMessageRepository } from "./repositories/messaging-message.js";
import { UserListRepository } from "./repositories/user-list.js";
@ -93,7 +93,7 @@ export const UserListJoinings = db.getRepository(UserListJoining);
export const UserGroups = UserGroupRepository;
export const UserGroupJoinings = db.getRepository(UserGroupJoining);
export const UserGroupInvitations = UserGroupInvitationRepository;
export const UserNotePinings = db.getRepository(config.scylla ? UserNotePiningScylla : UserNotePining);
export const UserNotePinings = db.getRepository(UserNotePining);
export const UserIps = db.getRepository(UserIp);
export const UsedUsernames = db.getRepository(UsedUsername);
export const Followings = FollowingRepository;

View file

@ -231,7 +231,6 @@ export const NoteRepository = db.getRepository(Note).extend({
me?: { id: User["id"] } | null | undefined,
options?: {
detail?: boolean;
scyllaNote?: boolean;
_hint_?: {
myReactions: Map<Note["id"], NoteReaction | null>;
};
@ -246,24 +245,21 @@ export const NoteRepository = db.getRepository(Note).extend({
const meId = me ? me.id : null;
let note: Note | null = null;
const isSrcNote = typeof src === "object";
// Always lookup from ScyllaDB if enabled
if (isSrcNote && (!scyllaClient || options?.scyllaNote)) {
if (typeof src === "object") {
note = src;
} else {
const noteId = isSrcNote ? src.id : src;
if (scyllaClient) {
const result = await scyllaClient.execute(
prepared.note.select.byId,
[noteId],
[src],
{ prepare: true },
);
if (result.rowLength > 0) {
note = parseScyllaNote(result.first());
}
} else {
note = await this.findOneBy({ id: noteId });
note = await this.findOneBy({ id: src });
}
}
@ -405,7 +401,6 @@ export const NoteRepository = db.getRepository(Note).extend({
me?: { id: User["id"] } | null | undefined,
options?: {
detail?: boolean;
scyllaNote?: boolean;
},
) {
if (notes.length === 0) return [];

View file

@ -15,18 +15,99 @@ import {
AccessTokens,
NoteReactions,
} from "../index.js";
import {
parseScyllaNote,
parseScyllaNotification,
parseScyllaReaction,
prepared,
scyllaClient,
type ScyllaNotification,
} from "@/db/scylla.js";
export const NotificationRepository = db.getRepository(Notification).extend({
async pack(
src: Notification["id"] | Notification,
src: Notification["id"] | Notification | ScyllaNotification,
options: {
_hintForEachNotes_?: {
myReactions: Map<Note["id"], NoteReaction | null>;
};
},
): Promise<Packed<"Notification">> {
if (scyllaClient) {
let notification: ScyllaNotification;
if (typeof src === "object") {
notification = src as ScyllaNotification;
} else {
const result = await scyllaClient.execute(
prepared.notification.select.byId,
[src],
{ prepare: true },
);
if (result.rowLength === 0) {
throw new Error("notification not found");
}
notification = parseScyllaNotification(result.first());
}
const token =
notification.type === "app" && notification.entityId
? await AccessTokens.findOneByOrFail({
id: notification.entityId,
})
: null;
let data = null;
if (notification.entityId) {
switch (notification.type) {
case "mention":
case "reply":
case "renote":
case "quote":
case "reaction":
case "pollVote":
case "pollEnded":
data = {
note: Notes.pack(
notification.entityId,
{ id: notification.targetId },
{ detail: true, _hint_: options._hintForEachNotes_ },
),
reaction: notification.reaction,
choice: notification.choice,
};
break;
case "groupInvited":
data = {
invitation: UserGroupInvitations.pack(notification.entityId),
};
break;
case "app":
data = {
body: notification.customBody,
header: notification.customHeader || token?.name,
icon: notification.customIcon || token?.iconUrl,
};
break;
}
}
return await awaitAll({
id: notification.id,
createdAt: notification.createdAt.toISOString(),
type: notification.type,
isRead: true, // FIXME: Implement read checker on DragonflyDB
userId: notification.notifierId,
user: notification.notifierId
? Users.pack(notification.notifierId)
: null,
...data,
});
}
const notification =
typeof src === "object" ? src : await this.findOneByOrFail({ id: src });
typeof src === "object"
? (src as Notification)
: await this.findOneByOrFail({ id: src });
const token = notification.appAccessTokenId
? await AccessTokens.findOneByOrFail({
id: notification.appAccessTokenId,
@ -145,22 +226,52 @@ export const NotificationRepository = db.getRepository(Notification).extend({
});
},
async packMany(notifications: Notification[], meId: User["id"]) {
async packMany(
notifications: Notification[] | ScyllaNotification[],
meId: User["id"],
) {
if (notifications.length === 0) return [];
const notes = notifications
.filter((x) => x.note != null)
.map((x) => x.note!);
const noteIds = notes.map((n) => n.id);
let notes: Note[] = [];
let noteIds: Note["id"][] = [];
let renoteIds: Note["id"][] = [];
const myReactionsMap = new Map<Note["id"], NoteReaction | null>();
const renoteIds = notes
.filter((n) => n.renoteId != null)
.map((n) => n.renoteId!);
if (scyllaClient) {
noteIds = (notifications as ScyllaNotification[])
.filter((n) => !["groupInvited", "app"].includes(n.type) && n.entityId)
.map(({ entityId }) => entityId as string);
notes = await scyllaClient
.execute(prepared.note.select.byIds, [noteIds], { prepare: true })
.then((result) => result.rows.map(parseScyllaNote));
renoteIds = notes
.filter((note) => !!note.renoteId)
.map(({ renoteId }) => renoteId as string);
} else {
const notes = (notifications as Notification[])
.filter((x) => !!x.note)
.map((x) => x.note as Note);
noteIds = notes.map((n) => n.id);
renoteIds = notes
.filter((n) => !!n.renoteId)
.map((n) => n.renoteId as string);
}
const targets = [...noteIds, ...renoteIds];
const myReactions = await NoteReactions.findBy({
let myReactions: NoteReaction[] = [];
if (scyllaClient) {
const result = await scyllaClient.execute(
prepared.reaction.select.byNoteAndUser,
[targets, [meId]],
{ prepare: true },
);
myReactions = result.rows.map(parseScyllaReaction);
} else {
myReactions = await NoteReactions.findBy({
userId: meId,
noteId: In(targets),
});
}
for (const target of targets) {
myReactionsMap.set(
@ -177,7 +288,7 @@ export const NotificationRepository = db.getRepository(Notification).extend({
_hintForEachNotes_: {
myReactions: myReactionsMap,
},
}).catch((e) => null),
}).catch((_) => null),
),
);
return results.filter((x) => x != null);

View file

@ -528,7 +528,6 @@ export const UserRepository = db.getRepository(User).extend({
pinnedNoteIds,
pinnedNotes: Notes.packMany(pinnedNotes, me, {
detail: true,
scyllaNote: !!scyllaClient,
}),
pinnedPageId: profile!.pinnedPageId,
pinnedPage: profile!.pinnedPageId

View file

@ -4,12 +4,11 @@ import { Notes, Channels, UserProfiles } from "@/models/index.js";
import { makePaginationQuery } from "../../common/make-pagination-query.js";
import { activeUsersChart } from "@/services/chart/index.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterMutedNote,
filterMutedUser,
filterVisibility,
scyllaClient,
} from "@/db/scylla.js";
import {
@ -103,11 +102,11 @@ export default define(meta, paramDef, async (ps, user) => {
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery("channel", ps, filter)
(await execPaginationQuery("channel", ps, {
note: filter,
})) as ScyllaNote[]
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -11,6 +11,20 @@ import read from "@/services/note/read.js";
import { readNotification } from "../../common/read-notification.js";
import define from "../../define.js";
import { makePaginationQuery } from "../../common/make-pagination-query.js";
import {
ScyllaNotification,
execPaginationQuery,
filterMutedUser,
scyllaClient,
} from "@/db/scylla.js";
import {
InstanceMutingsCache,
LocalFollowingsCache,
UserBlockedCache,
UserBlockingCache,
UserMutingsCache,
userWordMuteCache,
} from "@/misc/cache.js";
export const meta = {
tags: ["account", "notifications"],
@ -66,13 +80,75 @@ export const paramDef = {
export default define(meta, paramDef, async (ps, user) => {
// includeTypes が空の場合はクエリしない
if (ps.includeTypes && ps.includeTypes.length === 0) {
return [];
}
// excludeTypes に全指定されている場合はクエリしない
if (notificationTypes.every((type) => ps.excludeTypes?.includes(type))) {
return [];
if (
(ps.includeTypes && ps.includeTypes.length === 0) ||
notificationTypes.every((type) => ps.excludeTypes?.includes(type))
) {
return await Notifications.packMany([], user.id);
}
if (scyllaClient) {
const [
followingUserIds,
mutedUserIds,
mutedInstances,
blockerIds,
blockingIds,
] = await Promise.all([
LocalFollowingsCache.init(user.id).then((cache) => cache.getAll()),
UserMutingsCache.init(user.id).then((cache) => cache.getAll()),
InstanceMutingsCache.init(user.id).then((cache) => cache.getAll()),
UserBlockedCache.init(user.id).then((cache) => cache.getAll()),
UserBlockingCache.init(user.id).then((cache) => cache.getAll()),
]);
const validUserIds = [user.id, ...followingUserIds];
const filter = (notifications: ScyllaNotification[]) => {
let filtered = notifications;
if (ps.unreadOnly) {
// FIXME: isRead is always true at the moment
filtered = [];
}
if (ps.following) {
filtered = filtered.filter(
(n) => n.notifierId && validUserIds.includes(n.notifierId),
);
}
if (ps.includeTypes && ps.includeTypes.length > 0) {
filtered = filtered.filter((n) => ps.includeTypes?.includes(n.type));
} else if (ps.excludeTypes && ps.excludeTypes.length > 0) {
filtered = filtered.filter(
(n) => ps.excludeTypes && !ps.excludeTypes.includes(n.type),
);
}
filtered = filtered.filter(
(n) => !(n.notifierHost && mutedInstances.includes(n.notifierHost)),
);
filtered = filtered.filter(
(n) =>
!(
n.notifierId &&
(mutedUserIds.includes(n.notifierId) ||
blockingIds.includes(n.notifierId) ||
blockerIds.includes(n.notifierId))
),
);
return filtered;
};
const foundNotifications = (
(await execPaginationQuery(
"notification",
ps,
{ notification: filter },
user.id,
30,
)) as ScyllaNotification[]
).slice(0, ps.limit);
return await Notifications.packMany(foundNotifications, user.id);
}
const followingQuery = Followings.createQueryBuilder("following")
.select("following.followeeId")
.where("following.followerId = :followerId", { followerId: user.id });
@ -97,19 +173,11 @@ export default define(meta, paramDef, async (ps, user) => {
.andWhere("notification.notifieeId = :meId", { meId: user.id })
.leftJoinAndSelect("notification.notifier", "notifier")
.leftJoinAndSelect("notification.note", "note")
.leftJoinAndSelect("notifier.avatar", "notifierAvatar")
.leftJoinAndSelect("notifier.banner", "notifierBanner")
.leftJoinAndSelect("note.user", "user")
.leftJoinAndSelect("user.avatar", "avatar")
.leftJoinAndSelect("user.banner", "banner")
.leftJoinAndSelect("note.reply", "reply")
.leftJoinAndSelect("note.renote", "renote")
.leftJoinAndSelect("reply.user", "replyUser")
.leftJoinAndSelect("replyUser.avatar", "replyUserAvatar")
.leftJoinAndSelect("replyUser.banner", "replyUserBanner")
.leftJoinAndSelect("renote.user", "renoteUser")
.leftJoinAndSelect("renoteUser.avatar", "renoteUserAvatar")
.leftJoinAndSelect("renoteUser.banner", "renoteUserBanner");
.leftJoinAndSelect("renote.user", "renoteUser");
// muted users
query.andWhere(

View file

@ -151,7 +151,6 @@ export default define(meta, paramDef, async (ps, user) => {
return await Notes.packMany(foundNotes, user, {
detail: false,
scyllaNote: true,
});
}

View file

@ -10,8 +10,8 @@ import { generateMutedNoteQuery } from "../../common/generate-muted-note-query.j
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterMutedNote,
filterMutedRenotes,
@ -157,11 +157,11 @@ export default define(meta, paramDef, async (ps, user) => {
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery("global", ps, filter)
(await execPaginationQuery("global", ps, {
note: filter,
})) as ScyllaNote[]
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -13,8 +13,8 @@ import { generateChannelQuery } from "../../common/generate-channel-query.js";
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterChannel,
filterMutedNote,
@ -174,22 +174,23 @@ export default define(meta, paramDef, async (ps, user) => {
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const [homeFoundNotes, localFoundNotes] = await Promise.all([
execNotePaginationQuery(
execPaginationQuery(
"home",
ps,
(notes) => commonFilter(homeFilter(notes)),
{ note: (notes) => commonFilter(homeFilter(notes)) },
user.id,
),
execNotePaginationQuery("local", ps, (notes) =>
commonFilter(localFilter(notes)),
),
execPaginationQuery("local", ps, {
note: (notes) => commonFilter(localFilter(notes)),
}),
]);
const foundNotes = [...homeFoundNotes, ...localFoundNotes]
const foundNotes = [
...(homeFoundNotes as ScyllaNote[]),
...(localFoundNotes as ScyllaNote[]),
]
.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime()) // Descendent
.slice(0, ps.limit * 1.5); // Some may be filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -13,8 +13,8 @@ import { generateChannelQuery } from "../../common/generate-channel-query.js";
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterChannel,
filterMutedNote,
@ -187,11 +187,11 @@ export default define(meta, paramDef, async (ps, user) => {
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery("local", ps, filter)
(await execPaginationQuery("local", ps, {
note: filter,
})) as ScyllaNote[]
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -13,8 +13,8 @@ import { generateChannelQuery } from "../../common/generate-channel-query.js";
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterMutedNote,
filterMutedRenotes,
@ -184,10 +184,10 @@ export default define(meta, paramDef, async (ps, user) => {
return filtered;
};
const foundNotes = await execNotePaginationQuery("recommended", ps, filter);
return await Notes.packMany(foundNotes.slice(0, ps.limit), user, {
scyllaNote: true,
});
const foundNotes = (await execPaginationQuery("recommended", ps, {
note: filter,
})) as ScyllaNote[];
return await Notes.packMany(foundNotes.slice(0, ps.limit), user);
}
//#region Construct query

View file

@ -7,8 +7,8 @@ import { generateMutedUserQuery } from "../../common/generate-muted-user-query.j
import { makePaginationQuery } from "../../common/make-pagination-query.js";
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterMutedUser,
filterVisibility,
@ -113,17 +113,15 @@ export default define(meta, paramDef, async (ps, user) => {
let untilDate: number | undefined;
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery(
(await execPaginationQuery(
"renotes",
{ ...ps, untilDate },
filter,
{ note: filter },
user?.id,
1,
)
)) as ScyllaNote[]
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -45,7 +45,6 @@ export default define(meta, paramDef, async (ps, user) => {
return await Notes.pack(note, user, {
// FIXME: packing with detail may throw an error if the reply or renote is not visible (#8774)
detail: true,
scyllaNote: !!scyllaClient
}).catch((err) => {
if (err.id === "9725d0ce-ba28-4dde-95a7-2cbb2c15de24")
throw new ApiError(meta.errors.noSuchNote);

View file

@ -17,7 +17,7 @@ import {
filterChannel,
filterReply,
filterVisibility,
execNotePaginationQuery,
execPaginationQuery,
filterMutedUser,
filterMutedNote,
filterBlockUser,
@ -161,11 +161,14 @@ export default define(meta, paramDef, async (ps, user) => {
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery("home", ps, filter, user.id)
(await execPaginationQuery(
"home",
ps,
{ note: filter },
user.id,
)) as ScyllaNote[]
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
foundPacked.push(...(await Notes.packMany(foundNotes, user)));
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}

View file

@ -8,8 +8,8 @@ import { generateVisibilityQuery } from "../../common/generate-visibility-query.
import { generateMutedUserQuery } from "../../common/generate-muted-user-query.js";
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
import {
ScyllaNote,
execNotePaginationQuery,
type ScyllaNote,
execPaginationQuery,
filterBlockUser,
filterMutedNote,
filterMutedUser,
@ -158,19 +158,15 @@ export default define(meta, paramDef, async (ps, me) => {
return filtered;
};
const foundPacked = [];
while (foundPacked.length < ps.limit) {
const foundNotes = (
await execNotePaginationQuery("user", ps, filter, user.id)
).slice(0, ps.limit * 1.5); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
foundPacked.push(
...(await Notes.packMany(foundNotes, user, { scyllaNote: true })),
);
if (foundNotes.length < ps.limit) break;
ps.untilDate = foundNotes[foundNotes.length - 1].createdAt.getTime();
}
return foundPacked.slice(0, ps.limit);
(await execPaginationQuery(
"user",
ps,
{ note: filter },
user.id,
)) as ScyllaNote[]
).slice(0, ps.limit);
return await Notes.packMany(foundNotes, user);
}
//#region Construct query

View file

@ -12,8 +12,13 @@ import type { User } from "@/models/entities/user.js";
import type { Notification } from "@/models/entities/notification.js";
import { sendEmailNotification } from "./send-email-notification.js";
import { shouldSilenceInstance } from "@/misc/should-block-instance.js";
import { UserMutingsCache } from "@/misc/cache.js";
import { LocalFollowingsCache, UserMutingsCache } from "@/misc/cache.js";
import { userByIdCache } from "./user-cache.js";
import {
type ScyllaNotification,
prepared,
scyllaClient,
} from "@/db/scylla.js";
export async function createNotification(
notifieeId: User["id"],
@ -24,22 +29,31 @@ export async function createNotification(
return null;
}
let notifierHost: string | null = null;
if (
data.notifierId &&
["mention", "reply", "renote", "quote", "reaction"].includes(type)
) {
const notifier = await Users.findOneBy({ id: data.notifierId });
const notifier = await userByIdCache.fetchMaybe(data.notifierId, () =>
Users.findOneBy({ id: data.notifierId ?? "" }).then(
(user) => user ?? undefined,
),
);
// suppress if the notifier does not exist or is silenced.
if (!notifier) return null;
// suppress if the notifier is silenced or in a silenced instance, and not followed by the notifiee.
notifierHost = notifier.host;
// suppress if the notifier is silenced, suspended, or in a silenced instance, and not followed by the notifiee.
if (
(notifier.isSilenced ||
notifier.isSuspended ||
(Users.isRemoteUser(notifier) &&
(await shouldSilenceInstance(notifier.host)))) &&
!(await Followings.exist({
where: { followerId: notifieeId, followeeId: data.notifierId },
}))
!(await LocalFollowingsCache.init(notifieeId).then((cache) =>
cache.has(notifier.id),
))
)
return null;
}
@ -62,7 +76,51 @@ export async function createNotification(
}
// Create notification
const notification = await Notifications.insert({
let notification: Notification | ScyllaNotification;
if (scyllaClient) {
const entityId =
data.noteId ||
data.followRequestId ||
data.userGroupInvitationId ||
data.appAccessTokenId ||
null;
const now = new Date();
notification = {
id: genId(),
createdAtDate: now,
createdAt: now,
targetId: notifieeId,
notifierId: data.notifierId ?? null,
notifierHost,
entityId,
type,
choice: data.choice ?? null,
customBody: data.customBody ?? null,
customHeader: data.customHeader ?? null,
customIcon: data.customIcon ?? null,
reaction: data.reaction ?? null,
};
await scyllaClient.execute(
prepared.notification.insert,
[
notification.targetId,
notification.createdAtDate,
notification.createdAt,
notification.id,
notification.notifierId,
notification.notifierHost,
notification.type,
notification.entityId,
notification.reaction,
notification.choice,
notification.customBody,
notification.customHeader,
notification.customIcon,
],
{ prepare: true },
);
} else {
notification = await Notifications.insert({
id: genId(),
createdAt: new Date(),
notifieeId: notifieeId,
@ -73,6 +131,7 @@ export async function createNotification(
} as Partial<Notification>).then((x) =>
Notifications.findOneByOrFail(x.identifiers[0]),
);
}
const packed = await Notifications.pack(notification, {});