wip: read row timeline from scylla
This commit is contained in:
parent
b8d8e2e5f5
commit
2937c7fa43
3 changed files with 69 additions and 58 deletions
|
@ -183,7 +183,14 @@ export const NoteRepository = db.getRepository(Note).extend({
|
||||||
|
|
||||||
const meId = me ? me.id : null;
|
const meId = me ? me.id : null;
|
||||||
let note: Note | null = null;
|
let note: Note | null = null;
|
||||||
const noteId = typeof src === "object" ? src.id : src;
|
let foundScyllaNote = false;
|
||||||
|
const isSrcNote = typeof src === "object";
|
||||||
|
|
||||||
|
// Always lookup from ScyllaDB if enabled
|
||||||
|
if (isSrcNote && !scyllaClient) {
|
||||||
|
note = src;
|
||||||
|
} else {
|
||||||
|
const noteId = isSrcNote ? src.id : src;
|
||||||
if (scyllaClient) {
|
if (scyllaClient) {
|
||||||
const result = await scyllaClient.execute(
|
const result = await scyllaClient.execute(
|
||||||
prepared.note.select.byId,
|
prepared.note.select.byId,
|
||||||
|
@ -192,15 +199,16 @@ export const NoteRepository = db.getRepository(Note).extend({
|
||||||
);
|
);
|
||||||
if (result.rowLength > 0) {
|
if (result.rowLength > 0) {
|
||||||
note = parseScyllaNote(result.first());
|
note = parseScyllaNote(result.first());
|
||||||
|
foundScyllaNote = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!foundScyllaNote) {
|
||||||
|
// Fallback to Postgres
|
||||||
|
note = await this.findOneBy({ id: noteId });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!note) {
|
if (!note) {
|
||||||
// Fallback to Postgres
|
|
||||||
note = await this.findOneBy({ id: noteId });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (note === null) {
|
|
||||||
throw new IdentifiableError(
|
throw new IdentifiableError(
|
||||||
"9725d0ce-ba28-4dde-95a7-2cbb2c15de24",
|
"9725d0ce-ba28-4dde-95a7-2cbb2c15de24",
|
||||||
"No such note.",
|
"No such note.",
|
||||||
|
@ -260,7 +268,8 @@ export const NoteRepository = db.getRepository(Note).extend({
|
||||||
emojis: noteEmoji,
|
emojis: noteEmoji,
|
||||||
tags: note.tags.length > 0 ? note.tags : undefined,
|
tags: note.tags.length > 0 ? note.tags : undefined,
|
||||||
fileIds: note.fileIds,
|
fileIds: note.fileIds,
|
||||||
files: scyllaClient
|
files:
|
||||||
|
scyllaClient && foundScyllaNote
|
||||||
? (note as ScyllaNote).files.map((file) => ({
|
? (note as ScyllaNote).files.map((file) => ({
|
||||||
...file,
|
...file,
|
||||||
createdAt: file.createdAt.toISOString(),
|
createdAt: file.createdAt.toISOString(),
|
||||||
|
|
|
@ -32,7 +32,7 @@ export async function getNote(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For legacy notes
|
// Fallback to Postgres
|
||||||
if (!note) {
|
if (!note) {
|
||||||
const query = Notes.createQueryBuilder("note").where("note.id = :id", {
|
const query = Notes.createQueryBuilder("note").where("note.id = :id", {
|
||||||
id: noteId,
|
id: noteId,
|
||||||
|
|
|
@ -11,7 +11,12 @@ import { generateChannelQuery } from "../../common/generate-channel-query.js";
|
||||||
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
|
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
|
||||||
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
|
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
|
||||||
import { ApiError } from "../../error.js";
|
import { ApiError } from "../../error.js";
|
||||||
import { parseScyllaNote, prepared, scyllaClient } from "@/db/scylla.js";
|
import {
|
||||||
|
type ScyllaNote,
|
||||||
|
parseScyllaNote,
|
||||||
|
prepared,
|
||||||
|
scyllaClient,
|
||||||
|
} from "@/db/scylla.js";
|
||||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
import { LocalFollowingsCache } from "@/misc/cache.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -69,27 +74,35 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
const followingsCache = await LocalFollowingsCache.init(user.id);
|
const followingsCache = await LocalFollowingsCache.init(user.id);
|
||||||
|
|
||||||
if (scyllaClient) {
|
if (scyllaClient) {
|
||||||
const untilDate = ps.untilDate ? new Date(ps.untilDate) : new Date();
|
let untilDate = new Date();
|
||||||
const query = [`${prepared.note.select.byDate} AND "createdAt" <= ?`];
|
const foundNotes: ScyllaNote[] = [];
|
||||||
|
const validIds = [user.id].concat(await followingsCache.getAll());
|
||||||
|
|
||||||
|
while (foundNotes.length < ps.limit) {
|
||||||
|
const query = [`${prepared.note.select.byDate} AND "createdAt" < ?`];
|
||||||
const params: (Date | string | string[])[] = [untilDate, untilDate];
|
const params: (Date | string | string[])[] = [untilDate, untilDate];
|
||||||
if (ps.sinceDate) {
|
|
||||||
query.push(`AND "createdAt" >= ?`);
|
|
||||||
params.push(new Date(ps.sinceDate));
|
|
||||||
}
|
|
||||||
if (ps.untilId) {
|
if (ps.untilId) {
|
||||||
query.push(`AND "id" <= ?`);
|
query.push(`AND "id" < ?`);
|
||||||
params.push(ps.untilId);
|
params.push(ps.untilId);
|
||||||
}
|
}
|
||||||
if (ps.sinceId) {
|
query.push("LIMIT 50"); // Hardcoded to enable prepared query for performance
|
||||||
query.push(`AND "id" >= ?`);
|
|
||||||
params.push(ps.sinceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await scyllaClient.execute(query.join(" "), params, {
|
const result = await scyllaClient.execute(query.join(" "), params, {
|
||||||
prepare: true,
|
prepare: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (result.rowLength === 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
const notes = result.rows.map(parseScyllaNote);
|
const notes = result.rows.map(parseScyllaNote);
|
||||||
return Notes.packMany(notes, user);
|
const filtered = notes.filter((note) => validIds.includes(note.userId));
|
||||||
|
foundNotes.push(...filtered);
|
||||||
|
|
||||||
|
untilDate = notes[notes.length - 1].createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Notes.packMany(foundNotes, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
const hasFollowing = await followingsCache.hasFollowing();
|
const hasFollowing = await followingsCache.hasFollowing();
|
||||||
|
@ -113,17 +126,6 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
qb.orWhere(`note.userId IN (${followingQuery.getQuery()})`);
|
qb.orWhere(`note.userId IN (${followingQuery.getQuery()})`);
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.innerJoinAndSelect("note.user", "user")
|
|
||||||
.leftJoinAndSelect("user.avatar", "avatar")
|
|
||||||
.leftJoinAndSelect("user.banner", "banner")
|
|
||||||
.leftJoinAndSelect("note.reply", "reply")
|
|
||||||
.leftJoinAndSelect("note.renote", "renote")
|
|
||||||
.leftJoinAndSelect("reply.user", "replyUser")
|
|
||||||
.leftJoinAndSelect("replyUser.avatar", "replyUserAvatar")
|
|
||||||
.leftJoinAndSelect("replyUser.banner", "replyUserBanner")
|
|
||||||
.leftJoinAndSelect("renote.user", "renoteUser")
|
|
||||||
.leftJoinAndSelect("renoteUser.avatar", "renoteUserAvatar")
|
|
||||||
.leftJoinAndSelect("renoteUser.banner", "renoteUserBanner")
|
|
||||||
.setParameters(followingQuery.getParameters());
|
.setParameters(followingQuery.getParameters());
|
||||||
|
|
||||||
generateChannelQuery(query, user);
|
generateChannelQuery(query, user);
|
||||||
|
|
Loading…
Reference in a new issue