fix: release redis locks

This commit is contained in:
Namekuji 2023-09-02 19:22:13 +00:00 committed by Kainoa Kanter
parent 7cdd6af262
commit fc97614782
7 changed files with 30 additions and 10 deletions

View file

@ -12,29 +12,38 @@ const retryDelay = 100;
* @param timeout Lock timeout (ms), The timeout releases previous lock. * @param timeout Lock timeout (ms), The timeout releases previous lock.
* @returns Unlock function * @returns Unlock function
*/ */
export async function getApLock(uri: string, timeout = 30 * 1000) { export async function getApLock(
uri: string,
timeout = 30 * 1000,
): Promise<Mutex> {
const lock = new Mutex(redisClient, `ap-object:${uri}`, { const lock = new Mutex(redisClient, `ap-object:${uri}`, {
lockTimeout: timeout, lockTimeout: timeout,
retryInterval: retryDelay, retryInterval: retryDelay,
}); });
await lock.acquire(); await lock.acquire();
return lock;
} }
export async function getFetchInstanceMetadataLock( export async function getFetchInstanceMetadataLock(
host: string, host: string,
timeout = 30 * 1000, timeout = 30 * 1000,
) { ): Promise<Mutex> {
const lock = new Mutex(redisClient, `instance:${host}`, { const lock = new Mutex(redisClient, `instance:${host}`, {
lockTimeout: timeout, lockTimeout: timeout,
retryInterval: retryDelay, retryInterval: retryDelay,
}); });
await lock.acquire(); await lock.acquire();
return lock;
} }
export async function getChartInsertLock(lockKey: string, timeout = 30 * 1000) { export async function getChartInsertLock(
lockKey: string,
timeout = 30 * 1000,
): Promise<Mutex> {
const lock = new Mutex(redisClient, `chart-insert:${lockKey}`, { const lock = new Mutex(redisClient, `chart-insert:${lockKey}`, {
lockTimeout: timeout, lockTimeout: timeout,
retryInterval: retryDelay, retryInterval: retryDelay,
}); });
await lock.acquire(); await lock.acquire();
return lock;
} }

View file

@ -32,6 +32,8 @@ export default async function (
// Interrupt if you block the announcement destination // Interrupt if you block the announcement destination
if (await shouldBlockInstance(extractDbHost(uri))) return; if (await shouldBlockInstance(extractDbHost(uri))) return;
const lock = await getApLock(uri);
try { try {
// Check if something with the same URI is already registered // Check if something with the same URI is already registered
const exist = await fetchNote(uri); const exist = await fetchNote(uri);
@ -78,6 +80,6 @@ export default async function (
uri, uri,
}); });
} finally { } finally {
await getApLock(uri); await lock.release();
} }
} }

View file

@ -31,6 +31,8 @@ export default async function (
} }
} }
const lock = await getApLock(uri);
try { try {
const exist = await fetchNote(note); const exist = await fetchNote(note);
if (exist) return "skip: note exists"; if (exist) return "skip: note exists";
@ -44,6 +46,6 @@ export default async function (
throw e; throw e;
} }
} finally { } finally {
await getApLock(uri); await lock.release();
} }
} }

View file

@ -13,6 +13,8 @@ export default async function (
): Promise<string> { ): Promise<string> {
logger.info(`Deleting the Note: ${uri}`); logger.info(`Deleting the Note: ${uri}`);
const lock = await getApLock(uri);
try { try {
const dbResolver = new DbResolver(); const dbResolver = new DbResolver();
const note = await dbResolver.getNoteFromApId(uri); const note = await dbResolver.getNoteFromApId(uri);
@ -37,6 +39,6 @@ export default async function (
await deleteNode(actor, note); await deleteNode(actor, note);
return "ok: note deleted"; return "ok: note deleted";
} finally { } finally {
await getApLock(uri); await lock.release();
} }
} }

View file

@ -415,6 +415,8 @@ export async function resolveNote(
`host ${extractDbHost(uri)} is blocked`, `host ${extractDbHost(uri)} is blocked`,
); );
const lock = await getApLock(uri);
try { try {
//#region Returns if already registered with this server //#region Returns if already registered with this server
const exist = await fetchNote(uri); const exist = await fetchNote(uri);
@ -437,7 +439,7 @@ export async function resolveNote(
// Since the attached Note Object may be disguised, always specify the uri and fetch it from the server. // Since the attached Note Object may be disguised, always specify the uri and fetch it from the server.
return await createNote(uri, resolver, true); return await createNote(uri, resolver, true);
} finally { } finally {
await getApLock(uri); await lock.release();
} }
} }

View file

@ -430,6 +430,7 @@ export default abstract class Chart<T extends Schema> {
? `${this.name}:${date}:${span}:${group}` ? `${this.name}:${date}:${span}:${group}`
: `${this.name}:${date}:${span}`; : `${this.name}:${date}:${span}`;
const lock = await getChartInsertLock(lockKey);
try { try {
// ロック内でもう1回チェックする // ロック内でもう1回チェックする
const currentLog = (await repository.findOneBy({ const currentLog = (await repository.findOneBy({
@ -465,7 +466,7 @@ export default abstract class Chart<T extends Schema> {
return log; return log;
} finally { } finally {
await getChartInsertLock(lockKey); await lock.release();
} }
} }

View file

@ -15,6 +15,8 @@ export async function fetchInstanceMetadata(
instance: Instance, instance: Instance,
force = false, force = false,
): Promise<void> { ): Promise<void> {
const lock = await getFetchInstanceMetadataLock(instance.host);
if (!force) { if (!force) {
const _instance = await Instances.findOneBy({ host: instance.host }); const _instance = await Instances.findOneBy({ host: instance.host });
const now = Date.now(); const now = Date.now();
@ -22,7 +24,7 @@ export async function fetchInstanceMetadata(
_instance?.infoUpdatedAt && _instance?.infoUpdatedAt &&
now - _instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24 now - _instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24
) { ) {
await getFetchInstanceMetadataLock(instance.host); await lock.release();
return; return;
} }
} }
@ -78,7 +80,7 @@ export async function fetchInstanceMetadata(
} catch (e) { } catch (e) {
logger.error(`Failed to update metadata of ${instance.host}: ${e}`); logger.error(`Failed to update metadata of ${instance.host}: ${e}`);
} finally { } finally {
await getFetchInstanceMetadataLock(instance.host); await lock.release();
} }
} }