refactor: export timeline query
This commit is contained in:
parent
f8387be81a
commit
a26553dc18
3 changed files with 164 additions and 81 deletions
|
@ -1,10 +1,12 @@
|
||||||
import config from "@/config/index.js";
|
import config from "@/config/index.js";
|
||||||
import type { PopulatedEmoji } from "@/misc/populate-emojis.js";
|
import type { PopulatedEmoji } from "@/misc/populate-emojis.js";
|
||||||
|
import type { Channel } from "@/models/entities/channel.js";
|
||||||
import type { Note } from "@/models/entities/note.js";
|
import type { Note } from "@/models/entities/note.js";
|
||||||
import type { NoteReaction } from "@/models/entities/note-reaction.js";
|
import type { NoteReaction } from "@/models/entities/note-reaction.js";
|
||||||
import { Client, types } from "cassandra-driver";
|
import { Client, types } from "cassandra-driver";
|
||||||
import type { User } from "@/models/entities/user.js";
|
import type { User } from "@/models/entities/user.js";
|
||||||
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
|
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||||
|
import { getTimestamp } from "@/misc/gen-id";
|
||||||
|
|
||||||
function newClient(): Client | null {
|
function newClient(): Client | null {
|
||||||
if (!config.scylla) {
|
if (!config.scylla) {
|
||||||
|
@ -185,51 +187,162 @@ export function parseScyllaReaction(row: types.Row): ScyllaNoteReaction {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function isVisible(
|
export function prepareTimelineQuery(
|
||||||
note: ScyllaNote,
|
untilId?: string,
|
||||||
user: { id: User["id"] } | null,
|
untilDate?: number,
|
||||||
): Promise<boolean> {
|
sinceId?: string,
|
||||||
let visible = false;
|
sinceDate?: number,
|
||||||
|
): { query: string; untilDate: Date; sinceDate: Date | null } {
|
||||||
|
const queryParts = [`${prepared.note.select.byDate} AND "createdAt" < ?`];
|
||||||
|
|
||||||
if (
|
let _untilDate = new Date();
|
||||||
["public", "home"].includes(note.visibility) // public post
|
if (untilId) {
|
||||||
) {
|
_untilDate = new Date(getTimestamp(untilId));
|
||||||
visible = true;
|
}
|
||||||
} else if (user) {
|
if (untilDate && untilDate < _untilDate.getTime()) {
|
||||||
const cache = await LocalFollowingsCache.init(user.id);
|
_untilDate = new Date(untilDate);
|
||||||
|
}
|
||||||
visible =
|
let _sinceDate: Date | null = null;
|
||||||
note.userId === user.id || // my own post
|
if (sinceId) {
|
||||||
note.visibleUserIds.includes(user.id) || // visible to me
|
_sinceDate = new Date(getTimestamp(sinceId));
|
||||||
note.mentions.includes(user.id) || // mentioned me
|
}
|
||||||
(note.visibility === "followers" &&
|
if (sinceDate && (!_sinceDate || sinceDate > _sinceDate.getTime())) {
|
||||||
(await cache.isFollowing(note.userId))) || // following
|
_sinceDate = new Date(sinceDate);
|
||||||
note.replyUserId === user.id; // replied to myself
|
}
|
||||||
|
if (_sinceDate !== null) {
|
||||||
|
queryParts.push(`AND "createdAt" > ?`);
|
||||||
}
|
}
|
||||||
|
|
||||||
return visible;
|
queryParts.push("LIMIT 50"); // Hardcoded to issue a prepared query
|
||||||
|
const query = queryParts.join(" ");
|
||||||
|
|
||||||
|
return {
|
||||||
|
query,
|
||||||
|
untilDate: _untilDate,
|
||||||
|
sinceDate: _sinceDate,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function execTimelineQuery(
|
||||||
|
ps: {
|
||||||
|
limit: number;
|
||||||
|
untilId?: string;
|
||||||
|
untilDate?: number;
|
||||||
|
sinceId?: string;
|
||||||
|
sinceDate?: number;
|
||||||
|
},
|
||||||
|
maxDays = 30,
|
||||||
|
filter?: (_: ScyllaNote[]) => Promise<ScyllaNote[]>,
|
||||||
|
): Promise<ScyllaNote[]> {
|
||||||
|
if (!scyllaClient) return [];
|
||||||
|
|
||||||
|
let { query, untilDate, sinceDate } = prepareTimelineQuery(
|
||||||
|
ps.untilId,
|
||||||
|
ps.untilDate,
|
||||||
|
ps.sinceId,
|
||||||
|
ps.sinceDate,
|
||||||
|
);
|
||||||
|
|
||||||
|
let scannedPartitions = 0;
|
||||||
|
const foundNotes: ScyllaNote[] = [];
|
||||||
|
|
||||||
|
// Try to get posts of at most <maxDays> in the single request
|
||||||
|
while (foundNotes.length < ps.limit && scannedPartitions < maxDays) {
|
||||||
|
const params: (Date | string | string[])[] = [untilDate, untilDate];
|
||||||
|
|
||||||
|
if (sinceDate) {
|
||||||
|
params.push(sinceDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await scyllaClient.execute(query, params, {
|
||||||
|
prepare: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (result.rowLength === 0) {
|
||||||
|
// Reached the end of partition. Queries posts created one day before.
|
||||||
|
scannedPartitions++;
|
||||||
|
untilDate = new Date(
|
||||||
|
untilDate.getUTCFullYear(),
|
||||||
|
untilDate.getUTCMonth(),
|
||||||
|
untilDate.getUTCDate() - 1,
|
||||||
|
23,
|
||||||
|
59,
|
||||||
|
59,
|
||||||
|
999,
|
||||||
|
);
|
||||||
|
if (sinceDate && untilDate < sinceDate) break;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const notes = result.rows.map(parseScyllaNote);
|
||||||
|
foundNotes.push(...(filter ? await filter(notes) : notes));
|
||||||
|
untilDate = notes[notes.length - 1].createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
return foundNotes;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function filterVisibility(
|
||||||
|
notes: ScyllaNote[],
|
||||||
|
user: { id: User["id"] } | null,
|
||||||
|
followingIds?: User["id"][],
|
||||||
|
): Promise<ScyllaNote[]> {
|
||||||
|
let filtered = notes;
|
||||||
|
|
||||||
|
if (!user) {
|
||||||
|
filtered = filtered.filter((note) =>
|
||||||
|
["public", "home"].includes(note.visibility),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
let followings: User["id"][];
|
||||||
|
if (followingIds) {
|
||||||
|
followings = followingIds;
|
||||||
|
} else {
|
||||||
|
const cache = await LocalFollowingsCache.init(user.id);
|
||||||
|
followings = await cache.getAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
filtered = filtered.filter(
|
||||||
|
(note) =>
|
||||||
|
["public", "home"].includes(note.visibility) ||
|
||||||
|
note.userId === user.id ||
|
||||||
|
note.visibleUserIds.includes(user.id) ||
|
||||||
|
note.mentions.includes(user.id) ||
|
||||||
|
(note.visibility === "followers" &&
|
||||||
|
(followings.includes(note.userId) || note.replyUserId === user.id)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return filtered;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function filterChannel(
|
export async function filterChannel(
|
||||||
notes: ScyllaNote[],
|
notes: ScyllaNote[],
|
||||||
user: { id: User["id"] } | null,
|
user: { id: User["id"] } | null,
|
||||||
|
followingIds?: Channel["id"][],
|
||||||
): Promise<ScyllaNote[]> {
|
): Promise<ScyllaNote[]> {
|
||||||
let foundNotes = notes;
|
let filtered = notes;
|
||||||
|
|
||||||
if (!user) {
|
if (!user) {
|
||||||
foundNotes = foundNotes.filter((note) => !note.channelId);
|
filtered = filtered.filter((note) => !note.channelId);
|
||||||
} else {
|
} else {
|
||||||
const channelNotes = foundNotes.filter((note) => !!note.channelId);
|
const channelNotes = filtered.filter((note) => !!note.channelId);
|
||||||
if (channelNotes.length > 0) {
|
if (channelNotes.length > 0) {
|
||||||
const cache = await ChannelFollowingsCache.init(user.id);
|
let followings: Channel["id"][];
|
||||||
const followingIds = await cache.getAll();
|
if (followingIds) {
|
||||||
foundNotes = foundNotes.filter(
|
followings = followingIds;
|
||||||
(note) => !note.channelId || followingIds.includes(note.channelId),
|
} else {
|
||||||
|
const cache = await ChannelFollowingsCache.init(user.id);
|
||||||
|
followings = await cache.getAll();
|
||||||
|
}
|
||||||
|
filtered = filtered.filter(
|
||||||
|
(note) => !note.channelId || followings.includes(note.channelId),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return foundNotes;
|
return filtered;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function filterReply(
|
export async function filterReply(
|
||||||
|
@ -237,14 +350,14 @@ export async function filterReply(
|
||||||
withReplies: boolean,
|
withReplies: boolean,
|
||||||
user: { id: User["id"] } | null,
|
user: { id: User["id"] } | null,
|
||||||
): Promise<ScyllaNote[]> {
|
): Promise<ScyllaNote[]> {
|
||||||
let foundNotes = notes;
|
let filtered = notes;
|
||||||
|
|
||||||
if (!user) {
|
if (!user) {
|
||||||
foundNotes = foundNotes.filter(
|
filtered = filtered.filter(
|
||||||
(note) => !note.replyId || note.replyUserId === note.userId,
|
(note) => !note.replyId || note.replyUserId === note.userId,
|
||||||
);
|
);
|
||||||
} else if (!withReplies) {
|
} else if (!withReplies) {
|
||||||
foundNotes = foundNotes.filter(
|
filtered = filtered.filter(
|
||||||
(note) =>
|
(note) =>
|
||||||
!note.replyId ||
|
!note.replyId ||
|
||||||
note.replyUserId === user.id ||
|
note.replyUserId === user.id ||
|
||||||
|
@ -253,5 +366,5 @@ export async function filterReply(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return foundNotes;
|
return filtered;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ import type { Note } from "@/models/entities/note.js";
|
||||||
import { Notes, Users } from "@/models/index.js";
|
import { Notes, Users } from "@/models/index.js";
|
||||||
import { generateVisibilityQuery } from "./generate-visibility-query.js";
|
import { generateVisibilityQuery } from "./generate-visibility-query.js";
|
||||||
import {
|
import {
|
||||||
isVisible,
|
filterVisibility,
|
||||||
parseScyllaNote,
|
parseScyllaNote,
|
||||||
prepared,
|
prepared,
|
||||||
scyllaClient,
|
scyllaClient,
|
||||||
|
@ -26,8 +26,9 @@ export async function getNote(
|
||||||
);
|
);
|
||||||
if (result.rowLength > 0) {
|
if (result.rowLength > 0) {
|
||||||
const candidate = parseScyllaNote(result.first());
|
const candidate = parseScyllaNote(result.first());
|
||||||
if (await isVisible(candidate, me)) {
|
const filtered = await filterVisibility([candidate], me);
|
||||||
note = candidate;
|
if (filtered.length > 0) {
|
||||||
|
note = filtered[0];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,14 +13,13 @@ import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-mu
|
||||||
import { ApiError } from "../../error.js";
|
import { ApiError } from "../../error.js";
|
||||||
import {
|
import {
|
||||||
type ScyllaNote,
|
type ScyllaNote,
|
||||||
parseScyllaNote,
|
|
||||||
prepared,
|
|
||||||
scyllaClient,
|
scyllaClient,
|
||||||
filterChannel,
|
filterChannel,
|
||||||
filterReply,
|
filterReply,
|
||||||
|
filterVisibility,
|
||||||
|
execTimelineQuery,
|
||||||
} from "@/db/scylla.js";
|
} from "@/db/scylla.js";
|
||||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||||
import { getTimestamp } from "@/misc/gen-id.js";
|
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["notes"],
|
tags: ["notes"],
|
||||||
|
@ -77,50 +76,20 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
const followingsCache = await LocalFollowingsCache.init(user.id);
|
const followingsCache = await LocalFollowingsCache.init(user.id);
|
||||||
|
|
||||||
if (scyllaClient) {
|
if (scyllaClient) {
|
||||||
let untilDate = new Date();
|
const channelCache = await ChannelFollowingsCache.init(user.id);
|
||||||
const foundNotes: ScyllaNote[] = [];
|
const followingChannelIds = await channelCache.getAll();
|
||||||
const validIds = [user.id].concat(await followingsCache.getAll());
|
const followingUserIds = await followingsCache.getAll();
|
||||||
const query = `${prepared.note.select.byDate} AND "createdAt" < ? LIMIT 50`; // LIMIT is hardcoded to prepare
|
const validUserIds = [user.id].concat(followingUserIds);
|
||||||
|
|
||||||
if (ps.untilId) {
|
const filter = async (notes: ScyllaNote[]) => {
|
||||||
untilDate = new Date(getTimestamp(ps.untilId));
|
let found = notes.filter((note) => validUserIds.includes(note.userId));
|
||||||
}
|
found = await filterChannel(found, user, followingChannelIds);
|
||||||
|
found = await filterReply(found, ps.withReplies, user);
|
||||||
|
found = await filterVisibility(found, user, followingUserIds);
|
||||||
|
return found;
|
||||||
|
};
|
||||||
|
|
||||||
let scanned_partitions = 0;
|
const foundNotes = await execTimelineQuery(ps, 30, filter);
|
||||||
|
|
||||||
// Try to get posts of at most 30 days in the single request
|
|
||||||
while (foundNotes.length < ps.limit && scanned_partitions < 30) {
|
|
||||||
const params: (Date | string | string[])[] = [untilDate, untilDate];
|
|
||||||
|
|
||||||
const result = await scyllaClient.execute(query, params, {
|
|
||||||
prepare: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (result.rowLength === 0) {
|
|
||||||
// Reached the end of partition. Queries posts created one day before.
|
|
||||||
scanned_partitions++;
|
|
||||||
untilDate = new Date(
|
|
||||||
untilDate.getUTCFullYear(),
|
|
||||||
untilDate.getUTCMonth(),
|
|
||||||
untilDate.getUTCDate() - 1,
|
|
||||||
23,
|
|
||||||
59,
|
|
||||||
59,
|
|
||||||
999,
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const notes = result.rows.map(parseScyllaNote);
|
|
||||||
let filtered = notes.filter((note) => validIds.includes(note.userId));
|
|
||||||
|
|
||||||
filtered = await filterChannel(filtered, user);
|
|
||||||
filtered = await filterReply(filtered, ps.withReplies, user);
|
|
||||||
|
|
||||||
foundNotes.push(...filtered);
|
|
||||||
|
|
||||||
untilDate = notes[notes.length - 1].createdAt;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Notes.packMany(foundNotes.slice(0, ps.limit), user);
|
return Notes.packMany(foundNotes.slice(0, ps.limit), user);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue