wip: pack with scylla

This commit is contained in:
Namekuji 2023-07-30 21:41:45 -04:00
parent 41930bda52
commit 31e4e0b75a
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
12 changed files with 128 additions and 61 deletions

View file

@ -2,7 +2,7 @@ DROP MATERIALIZED VIEW IF EXISTS reaction_by_userid;
DROP INDEX IF EXISTS reaction_by_id;
DROP TABLE IF EXISTS reaction;
DROP MATERIALIZED VIEW IF EXISTS note_by_userid;
DROP INDEX IF EXISTS note_by_id;
DROP MATERIALIZED VIEW IF EXISTS note_by_id;
DROP INDEX IF EXISTS note_by_uri;
DROP INDEX IF EXISTS note_by_url;
DROP TABLE IF EXISTS note;

View file

@ -10,7 +10,9 @@ CREATE TYPE IF NOT EXISTS drive_file (
"isSensitive" boolean,
"isLink" boolean,
"md5" ascii,
"size" int
"size" int,
"width" int,
"height" int,
);
CREATE TYPE IF NOT EXISTS note_edit_history (
@ -44,6 +46,7 @@ CREATE TABLE IF NOT EXISTS note ( -- Models timeline
"files" set<frozen<drive_file>>,
"visibleUserIds" set<ascii>,
"mentions" set<ascii>,
"mentionedRemoteUsers" text,
"emojis" set<text>,
"tags" set<text>,
"hasPoll" boolean,
@ -63,9 +66,16 @@ CREATE TABLE IF NOT EXISTS note ( -- Models timeline
PRIMARY KEY ("createdAtDate", "createdAt", "id")
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE INDEX IF NOT EXISTS note_by_id ON note (id);
CREATE INDEX IF NOT EXISTS note_by_uri ON note (uri);
CREATE INDEX IF NOT EXISTS note_by_url ON note (url);
CREATE INDEX IF NOT EXISTS note_by_uri ON note ("uri");
CREATE INDEX IF NOT EXISTS note_by_url ON note ("url");
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_id AS
SELECT * FROM note
WHERE "id" IS NOT NULL
AND "createdAt" IS NOT NULL
AND "createdAtDate" IS NOT NULL
PRIMARY KEY ("id", "createdAt", "createdAtDate")
WITH CLUSTERING ORDER BY ("createdAt" DESC);
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_userid AS
SELECT * FROM note
@ -86,7 +96,7 @@ CREATE TABLE IF NOT EXISTS reaction (
PRIMARY KEY ("noteId", "userId")
);
CREATE INDEX IF NOT EXISTS reaction_by_id ON reaction (id);
CREATE INDEX IF NOT EXISTS reaction_by_id ON reaction ("id");
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_userid AS
SELECT * FROM reaction

View file

@ -39,6 +39,7 @@ export const prepared = {
"files",
"visibleUserIds",
"mentions",
"mentionedRemoteUsers",
"emojis",
"tags",
"hasPoll",
@ -57,12 +58,12 @@ export const prepared = {
"updatedAt"
)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
select: {
byDate: `SELECT * FROM note WHERE "createdAtDate" = ?`,
byId: `SELECT * FROM note WHERE "id" IN ?`,
byUri: `SELECT * FROM note WHERE "uri" IN ?`,
byUrl: `SELECT * FROM note WHERE "url" IN ?`,
byId: `SELECT * FROM note_by_id WHERE "id" IN ?`,
byUserId: `SELECT * FROM note_by_userid WHERE "userId" IN ?`,
},
delete: `DELETE FROM note WHERE "createdAtDate" = ? AND "createdAt" = ? AND "id" = ?`,
@ -105,6 +106,8 @@ export interface ScyllaDriveFile {
isLink: boolean;
md5: string;
size: number;
width: number | null;
height: number | null;
}
export interface ScyllaNoteEditHistory {
@ -121,47 +124,48 @@ export type ScyllaNote = Note & {
};
export function parseScyllaNote(row: types.Row): ScyllaNote {
const files: ScyllaDriveFile[] = row.get("files");
const files: ScyllaDriveFile[] = row.get("files") ?? [];
return {
createdAtDate: row.get("createdAtDate"),
createdAt: row.get("createdAt"),
id: row.get("id"),
visibility: row.get("visibility"),
text: row.get("content"),
name: row.get("name"),
cw: row.get("cw"),
text: row.get("content") ?? null,
name: row.get("name") ?? null,
cw: row.get("cw") ?? null,
localOnly: row.get("localOnly"),
renoteCount: row.get("renoteCount"),
repliesCount: row.get("repliesCount"),
uri: row.get("uri"),
url: row.get("url"),
uri: row.get("uri") ?? null,
url: row.get("url") ?? null,
score: row.get("score"),
files,
fileIds: files.map((file) => file.id),
attachedFileTypes: files.map((file) => file.type),
visibleUserIds: row.get("visibleUserIds"),
mentions: row.get("mentions"),
emojis: row.get("emojis"),
tags: row.get("tags"),
hasPoll: row.get("hasPoll"),
threadId: row.get("threadId"),
channelId: row.get("channelId"),
attachedFileTypes: files.map((file) => file.type) ?? [],
visibleUserIds: row.get("visibleUserIds") ?? [],
mentions: row.get("mentions") ?? [],
emojis: row.get("emojis") ?? [],
tags: row.get("tags") ?? [],
hasPoll: row.get("hasPoll") ?? false,
threadId: row.get("threadId") ?? null,
channelId: row.get("channelId") ?? null,
userId: row.get("userId"),
userHost: row.get("userHost"),
replyId: row.get("replyId"),
replyUserId: row.get("replyUserId"),
replyUserHost: row.get("replyUserHost"),
renoteId: row.get("replyId"),
renoteUserId: row.get("renoteUserId"),
renoteUserHost: row.get("renoteUserHost"),
reactions: row.get("reactions"),
noteEdit: row.get("noteEdit"),
updatedAt: row.get("updatedAt"),
userHost: row.get("userHost") ?? null,
replyId: row.get("replyId") ?? null,
replyUserId: row.get("replyUserId") ?? null,
replyUserHost: row.get("replyUserHost") ?? null,
renoteId: row.get("renoteId") ?? null,
renoteUserId: row.get("renoteUserId") ?? null,
renoteUserHost: row.get("renoteUserHost") ?? null,
reactions: row.get("reactions") ?? {},
noteEdit: row.get("noteEdit") ?? [],
updatedAt: row.get("updatedAt") ?? null,
mentionedRemoteUsers: row.get("mentionedRemoteUsers") ?? "[]",
/* unused postgres denormalization */
channel: null,
renote: null,
reply: null,
mentionedRemoteUsers: "",
user: null,
};
}

View file

@ -2,6 +2,7 @@ import { redisClient } from "@/db/redis.js";
import { encode, decode } from "msgpackr";
import { ChainableCommander } from "ioredis";
import { Followings } from "@/models/index.js";
import { IsNull } from "typeorm";
export class Cache<T> {
private ttl: number;
@ -147,7 +148,7 @@ export class LocalFollowingsCache {
if (!(await cache.hasFollowing())) {
const rel = await Followings.find({
select: { followeeId: true },
where: { followerId: cache.myId },
where: { followerId: cache.myId, followerHost: IsNull() },
});
await cache.follow(...rel.map((r) => r.followeeId));
}
@ -178,6 +179,6 @@ export class LocalFollowingsCache {
}
public async getAll(): Promise<string[]> {
return (await redisClient.smembers(this.key))
return await redisClient.smembers(this.key);
}
}

View file

@ -105,7 +105,9 @@ export async function toDbReaction(
);
if (emoji) {
const emojiName = _reacterHost ? `:${name}@${_reacterHost}:` : `:${name}:`;
const emojiName = _reacterHost
? `:${name}@${_reacterHost}:`
: `:${name}:`;
return { name: emojiName, emoji };
}
}

View file

@ -27,6 +27,14 @@ import {
} from "@/misc/populate-emojis.js";
import { db } from "@/db/postgre.js";
import { IdentifiableError } from "@/misc/identifiable-error.js";
import {
ScyllaNote,
parseScyllaNote,
prepared,
scyllaClient,
} from "@/db/scylla.js";
import { LocalFollowingsCache } from "@/misc/cache.js";
import { userByIdCache } from "@/services/user-cache.js";
export async function populatePoll(note: Note, meId: User["id"] | null) {
const poll = await Polls.findOneByOrFail({ noteId: note.id });
@ -124,16 +132,23 @@ export const NoteRepository = db.getRepository(Note).extend({
return true;
} else {
// フォロワーかどうか
const [following, user] = await Promise.all([
Followings.count({
where: {
followeeId: note.userId,
followerId: meId,
},
take: 1,
}),
const user = await userByIdCache.fetch(meId, () =>
Users.findOneByOrFail({ id: meId }),
]);
);
if (!user.host) {
// user is local
const cache = await LocalFollowingsCache.init(meId);
return await cache.isFollowing(note.userId);
}
const following = await Followings.exist({
where: {
followeeId: note.userId,
followerId: meId,
},
});
/* If we know the following, everyhting is fine.
@ -142,7 +157,7 @@ export const NoteRepository = db.getRepository(Note).extend({
in which case we can never know the following. Instead we have
to assume that the users are following each other.
*/
return following > 0 || (note.userHost != null && user.host != null);
return following || !!note.userHost;
}
}
@ -167,8 +182,31 @@ export const NoteRepository = db.getRepository(Note).extend({
);
const meId = me ? me.id : null;
const note =
typeof src === "object" ? src : await this.findOneByOrFail({ id: src });
let note: Note | null = null;
const noteId = typeof src === "object" ? src.id : src;
if (scyllaClient) {
const result = await scyllaClient.execute(
prepared.note.select.byId,
[[noteId]],
{ prepare: true },
);
if (result.rowLength > 0) {
note = parseScyllaNote(result.first());
}
}
if (!note) {
// Fallback to Postgres
note = await this.findOneBy({ id: noteId });
}
if (note === null) {
throw new IdentifiableError(
"9725d0ce-ba28-4dde-95a7-2cbb2c15de24",
"No such note.",
);
}
const host = note.userHost;
if (!(await this.isVisibleForMe(note, meId))) {
@ -222,7 +260,18 @@ export const NoteRepository = db.getRepository(Note).extend({
emojis: noteEmoji,
tags: note.tags.length > 0 ? note.tags : undefined,
fileIds: note.fileIds,
files: DriveFiles.packMany(note.fileIds),
files: scyllaClient
? (note as ScyllaNote).files.map((file) => ({
...file,
createdAt: file.createdAt.toISOString(),
properties: {
width: file.width ?? undefined,
height: file.height ?? undefined,
},
userId: null,
folderId: null,
}))
: DriveFiles.packMany(note.fileIds),
replyId: note.replyId,
renoteId: note.renoteId,
channelId: note.channelId || undefined,

View file

@ -309,10 +309,7 @@ export default define(meta, paramDef, async (ps, _user, token) => {
if (Object.keys(updates).length > 0) {
await Users.update(user.id, updates);
const data = await Users.findOneByOrFail({ id: user.id });
await userByIdCache.set(
data.id,
data,
);
await userByIdCache.set(data.id, data);
if (data.avatarId) {
data.avatar = await DriveFiles.findOneBy({ id: data.avatarId });
}

View file

@ -84,7 +84,7 @@ export default define(meta, paramDef, async (ps, user) => {
.leftJoinAndSelect("note.reply", "reply")
.leftJoinAndSelect("note.renote", "renote")
.leftJoinAndSelect("reply.user", "replyUser")
.leftJoinAndSelect("renote.user", "renoteUser")
.leftJoinAndSelect("renote.user", "renoteUser");
generateRepliesQuery(query, ps.withReplies, user);
if (user) {

View file

@ -96,7 +96,7 @@ export default define(meta, paramDef, async (ps, user) => {
.leftJoinAndSelect("note.reply", "reply")
.leftJoinAndSelect("note.renote", "renote")
.leftJoinAndSelect("reply.user", "replyUser")
.leftJoinAndSelect("renote.user", "renoteUser")
.leftJoinAndSelect("renote.user", "renoteUser");
generateChannelQuery(query, user);
generateRepliesQuery(query, ps.withReplies, user);

View file

@ -85,7 +85,9 @@ export default define(meta, paramDef, async (ps, user) => {
params.push(ps.sinceId);
}
const result = await scyllaClient.execute(query.join(" "), params, { prepare: true });
const result = await scyllaClient.execute(query.join(" "), params, {
prepare: true,
});
const notes = result.rows.map(parseScyllaNote);
return Notes.packMany(notes, user);
}

View file

@ -787,9 +787,14 @@ async function insertNote(
insert.uri,
insert.url,
insert.score ?? 0,
data.files,
data.files?.map((file) => ({
...file,
width: file.properties.width ?? null,
height: file.properties.height ?? null,
})),
insert.visibleUserIds,
insert.mentions,
insert.mentionedRemoteUsers,
insert.emojis,
insert.tags,
insert.hasPoll,

View file

@ -1,10 +1,7 @@
import { publishMainStream } from "@/services/stream.js";
import type { Note } from "@/models/entities/note.js";
import type { User } from "@/models/entities/user.js";
import {
NoteUnreads,
ChannelFollowings,
} from "@/models/index.js";
import { NoteUnreads, ChannelFollowings } from "@/models/index.js";
import { Not, IsNull, In } from "typeorm";
import type { Channel } from "@/models/entities/channel.js";
import { readNotificationByQuery } from "@/server/api/common/read-notification.js";