From ccbd6178e4336f801adae8a5d49c3f3f9f93525a Mon Sep 17 00:00:00 2001
From: naskya <m@naskya.net>
Date: Sat, 20 Apr 2024 06:54:06 +0900
Subject: [PATCH] refactor (backend): port add-note-to-antenna to backend-rs

I hit this bug: https://github.com/napi-rs/napi-rs/issues/2060
---
 Cargo.lock                                    | 31 ++++++-
 Cargo.toml                                    |  1 +
 packages/backend-rs/Cargo.toml                |  1 +
 packages/backend-rs/index.d.ts                |  1 +
 packages/backend-rs/index.js                  |  3 +-
 packages/backend-rs/src/lib.rs                |  1 +
 .../src/service/add_note_to_antenna.rs        | 20 +++++
 packages/backend-rs/src/service/mod.rs        |  2 +
 packages/backend-rs/src/service/stream.rs     | 86 +++++++++++++++++++
 packages/backend/package.json                 |  1 +
 .../backend/src/prelude/undefined-to-null.ts  | 76 ++++++++++++++++
 .../src/services/add-note-to-antenna.ts       | 24 ------
 packages/backend/src/services/note/create.ts  |  7 +-
 packages/backend/src/services/stream.ts       | 28 +++---
 pnpm-lock.yaml                                |  8 ++
 15 files changed, 247 insertions(+), 43 deletions(-)
 create mode 100644 packages/backend-rs/src/service/add_note_to_antenna.rs
 create mode 100644 packages/backend-rs/src/service/mod.rs
 create mode 100644 packages/backend-rs/src/service/stream.rs
 create mode 100644 packages/backend/src/prelude/undefined-to-null.ts
 delete mode 100644 packages/backend/src/services/add-note-to-antenna.ts

diff --git a/Cargo.lock b/Cargo.lock
index 52ef14933f..6cb32a21d0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -227,6 +227,7 @@ dependencies = [
  "serde",
  "serde_json",
  "serde_yaml",
+ "strum 0.26.2",
  "thiserror",
  "tokio",
  "url",
@@ -2096,6 +2097,12 @@ dependencies = [
  "untrusted",
 ]
 
+[[package]]
+name = "rustversion"
+version = "1.0.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47"
+
 [[package]]
 name = "ryu"
 version = "1.0.17"
@@ -2176,7 +2183,7 @@ dependencies = [
  "serde",
  "serde_json",
  "sqlx",
- "strum",
+ "strum 0.25.0",
  "thiserror",
  "time",
  "tracing",
@@ -2708,6 +2715,28 @@ version = "0.25.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
 
+[[package]]
+name = "strum"
+version = "0.26.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
+dependencies = [
+ "strum_macros",
+]
+
+[[package]]
+name = "strum_macros"
+version = "0.26.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.58",
+]
+
 [[package]]
 name = "subtle"
 version = "2.5.0"
diff --git a/Cargo.toml b/Cargo.toml
index b82635d8b4..0bde324447 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ sea-orm = "0.12.15"
 serde = "1.0.197"
 serde_json = "1.0.115"
 serde_yaml = "0.9.34"
+strum = "0.26.2"
 syn = "2.0.58"
 thiserror = "1.0.58"
 tokio = "1.37.0"
diff --git a/packages/backend-rs/Cargo.toml b/packages/backend-rs/Cargo.toml
index d077028e13..93c57dc321 100644
--- a/packages/backend-rs/Cargo.toml
+++ b/packages/backend-rs/Cargo.toml
@@ -37,6 +37,7 @@ sea-orm = { workspace = true, features = ["sqlx-postgres", "runtime-tokio-rustls
 serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 serde_yaml = { workspace = true }
+strum = { workspace = true, features = ["derive"] }
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
 url = { workspace = true }
diff --git a/packages/backend-rs/index.d.ts b/packages/backend-rs/index.d.ts
index 0c75def671..f13cb0bc45 100644
--- a/packages/backend-rs/index.d.ts
+++ b/packages/backend-rs/index.d.ts
@@ -1098,6 +1098,7 @@ export interface Webhook {
   latestSentAt: Date | null
   latestStatus: number | null
 }
+export function addNoteToAntenna(antennaId: string, note: Note): void
 /** Initializes Cuid2 generator. Must be called before any [create_id]. */
 export function initIdGenerator(length: number, fingerprint: string): void
 export function getTimestamp(id: string): number
diff --git a/packages/backend-rs/index.js b/packages/backend-rs/index.js
index 5fd1e9d784..fbaf1dc352 100644
--- a/packages/backend-rs/index.js
+++ b/packages/backend-rs/index.js
@@ -310,7 +310,7 @@ if (!nativeBinding) {
   throw new Error(`Failed to load native binding`)
 }
 
-const { loadEnv, loadConfig, stringToAcct, acctToString, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getNoteSummary, toMastodonId, fromMastodonId, fetchMeta, metaToPugArgs, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, AntennaSrcEnum, MutedNoteReasonEnum, NoteVisibilityEnum, NotificationTypeEnum, PageVisibilityEnum, PollNotevisibilityEnum, RelayStatusEnum, UserEmojimodpermEnum, UserProfileFfvisibilityEnum, UserProfileMutingnotificationtypesEnum, initIdGenerator, getTimestamp, genId, secureRndstr } = nativeBinding
+const { loadEnv, loadConfig, stringToAcct, acctToString, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getNoteSummary, toMastodonId, fromMastodonId, fetchMeta, metaToPugArgs, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, AntennaSrcEnum, MutedNoteReasonEnum, NoteVisibilityEnum, NotificationTypeEnum, PageVisibilityEnum, PollNotevisibilityEnum, RelayStatusEnum, UserEmojimodpermEnum, UserProfileFfvisibilityEnum, UserProfileMutingnotificationtypesEnum, addNoteToAntenna, initIdGenerator, getTimestamp, genId, secureRndstr } = nativeBinding
 
 module.exports.loadEnv = loadEnv
 module.exports.loadConfig = loadConfig
@@ -348,6 +348,7 @@ module.exports.RelayStatusEnum = RelayStatusEnum
 module.exports.UserEmojimodpermEnum = UserEmojimodpermEnum
 module.exports.UserProfileFfvisibilityEnum = UserProfileFfvisibilityEnum
 module.exports.UserProfileMutingnotificationtypesEnum = UserProfileMutingnotificationtypesEnum
+module.exports.addNoteToAntenna = addNoteToAntenna
 module.exports.initIdGenerator = initIdGenerator
 module.exports.getTimestamp = getTimestamp
 module.exports.genId = genId
diff --git a/packages/backend-rs/src/lib.rs b/packages/backend-rs/src/lib.rs
index bef7a41808..0dcc4e6251 100644
--- a/packages/backend-rs/src/lib.rs
+++ b/packages/backend-rs/src/lib.rs
@@ -4,4 +4,5 @@ pub mod config;
 pub mod database;
 pub mod misc;
 pub mod model;
+pub mod service;
 pub mod util;
diff --git a/packages/backend-rs/src/service/add_note_to_antenna.rs b/packages/backend-rs/src/service/add_note_to_antenna.rs
new file mode 100644
index 0000000000..b919f4e1a0
--- /dev/null
+++ b/packages/backend-rs/src/service/add_note_to_antenna.rs
@@ -0,0 +1,20 @@
+use crate::database::{redis_conn, redis_key};
+use crate::model::entity::note;
+use crate::service::stream::{publish, Error, Stream};
+use crate::util::id::get_timestamp;
+use redis::{streams::StreamMaxlen, Commands};
+
+#[crate::export]
+pub fn add_note_to_antenna(antenna_id: &str, note: &note::Model) -> Result<(), Error> {
+    redis_conn()?.xadd_maxlen(
+        redis_key(format!("antennaTimeline:{}", antenna_id)),
+        StreamMaxlen::Approx(200),
+        format!("{}-*", get_timestamp(&note.id)),
+        &[("note", &note.id)],
+    )?;
+
+    let stream = Stream::Antenna {
+        id: antenna_id.to_string(),
+    };
+    publish(&stream, Some("note"), Some(serde_json::to_value(note)?))
+}
diff --git a/packages/backend-rs/src/service/mod.rs b/packages/backend-rs/src/service/mod.rs
new file mode 100644
index 0000000000..cc239e3f9e
--- /dev/null
+++ b/packages/backend-rs/src/service/mod.rs
@@ -0,0 +1,2 @@
+pub mod add_note_to_antenna;
+pub mod stream;
diff --git a/packages/backend-rs/src/service/stream.rs b/packages/backend-rs/src/service/stream.rs
new file mode 100644
index 0000000000..fc545fa95c
--- /dev/null
+++ b/packages/backend-rs/src/service/stream.rs
@@ -0,0 +1,86 @@
+use crate::database::redis_conn;
+use redis::{Commands, RedisError};
+
+#[derive(strum::Display, serde::Serialize)]
+pub enum Stream {
+    #[strum(serialize = "internal")]
+    Internal,
+    #[strum(serialize = "broadcast")]
+    Broadcast,
+    #[strum(to_string = "adminStream:{id}")]
+    Admin { id: String },
+    #[strum(to_string = "user:{id}")]
+    User { id: String },
+    #[strum(to_string = "channelStream:{id}")]
+    Channel { id: String },
+    #[strum(to_string = "noteStream:{id}")]
+    Note { id: String },
+    #[strum(serialize = "notesStream")]
+    Notes,
+    #[strum(to_string = "userListStream:{id}")]
+    UserList { id: String },
+    #[strum(to_string = "mainStream:{id}")]
+    Main { id: String },
+    #[strum(to_string = "driveStream:{id}")]
+    Drive { id: String },
+    #[strum(to_string = "antennaStream:{id}")]
+    Antenna { id: String },
+    #[strum(to_string = "messagingStream:{id}")]
+    Messaging { id: String },
+    #[strum(to_string = "messagingIndexStream:{id}")]
+    MessagingIndex { id: String },
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("Redis error: {0}")]
+    RedisError(#[from] RedisError),
+    #[error("Json serialization error: {0}")]
+    JsonError(#[from] serde_json::Error),
+    #[error("Value error: {0}")]
+    ValueError(String),
+}
+
+pub fn publish(
+    channel: &Stream,
+    kind: Option<&str>,
+    value: Option<serde_json::Value>,
+) -> Result<(), Error> {
+    #[derive(serde::Serialize)]
+    struct Message {
+        r#type: String,
+        body: Option<serde_json::Value>,
+    }
+
+    let message = if let Some(kind) = kind {
+        serde_json::to_value(Message {
+            r#type: kind.to_string(),
+            body: value,
+        })?
+    } else {
+        value.ok_or(Error::ValueError("Invalid streaming message".to_string()))?
+    };
+
+    redis_conn()?.publish(channel.to_string(), message.to_string())?;
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod unit_test {
+    use super::Stream;
+    use pretty_assertions::assert_eq;
+
+    #[test]
+    fn channel_to_string() {
+        assert_eq!(Stream::Internal.to_string(), "internal");
+        assert_eq!(Stream::Broadcast.to_string(), "broadcast");
+        assert_eq!(
+            Stream::Admin {
+                id: "9tb42br63g5apjcq".to_string()
+            }
+            .to_string(),
+            "adminStream:9tb42br63g5apjcq"
+        );
+    }
+}
diff --git a/packages/backend/package.json b/packages/backend/package.json
index 9289c2f7ea..0599a39d4b 100644
--- a/packages/backend/package.json
+++ b/packages/backend/package.json
@@ -178,6 +178,7 @@
 		"ts-loader": "9.5.1",
 		"ts-node": "10.9.2",
 		"tsconfig-paths": "4.2.0",
+		"type-fest": "4.15.0",
 		"typescript": "5.4.5",
 		"webpack": "^5.91.0",
 		"ws": "8.16.0"
diff --git a/packages/backend/src/prelude/undefined-to-null.ts b/packages/backend/src/prelude/undefined-to-null.ts
new file mode 100644
index 0000000000..013be3cf9e
--- /dev/null
+++ b/packages/backend/src/prelude/undefined-to-null.ts
@@ -0,0 +1,76 @@
+// https://gist.github.com/tkrotoff/a6baf96eb6b61b445a9142e5555511a0
+import type { Primitive } from "type-fest";
+
+type NullToUndefined<T> = T extends null
+	? undefined
+	: T extends Primitive | Function | Date | RegExp
+		? T
+		: T extends Array<infer U>
+			? Array<NullToUndefined<U>>
+			: T extends Map<infer K, infer V>
+				? Map<K, NullToUndefined<V>>
+				: T extends Set<infer U>
+					? Set<NullToUndefined<U>>
+					: T extends object
+						? { [K in keyof T]: NullToUndefined<T[K]> }
+						: unknown;
+
+type UndefinedToNull<T> = T extends undefined
+	? null
+	: T extends Primitive | Function | Date | RegExp
+		? T
+		: T extends Array<infer U>
+			? Array<UndefinedToNull<U>>
+			: T extends Map<infer K, infer V>
+				? Map<K, UndefinedToNull<V>>
+				: T extends Set<infer U>
+					? Set<NullToUndefined<U>>
+					: T extends object
+						? { [K in keyof T]: UndefinedToNull<T[K]> }
+						: unknown;
+
+function _nullToUndefined<T>(obj: T): NullToUndefined<T> {
+	if (obj === null) {
+		return undefined as any;
+	}
+
+	if (typeof obj === "object") {
+		if (obj instanceof Map) {
+			obj.forEach((value, key) => obj.set(key, _nullToUndefined(value)));
+		} else {
+			for (const key in obj) {
+				obj[key] = _nullToUndefined(obj[key]) as any;
+			}
+		}
+	}
+
+	return obj as any;
+}
+
+function _undefinedToNull<T>(obj: T): UndefinedToNull<T> {
+	if (obj === undefined) {
+		return null as any;
+	}
+
+	if (typeof obj === "object") {
+		if (obj instanceof Map) {
+			obj.forEach((value, key) => obj.set(key, _undefinedToNull(value)));
+		} else {
+			for (const key in obj) {
+				obj[key] = _undefinedToNull(obj[key]) as any;
+			}
+		}
+	}
+
+	return obj as any;
+}
+
+/**
+ * Recursively converts all undefined values to null.
+ *
+ * @param obj object to convert
+ * @returns a copy of the object with all its undefined values converted to null
+ */
+export function undefinedToNull<T>(obj: T) {
+	return _undefinedToNull(structuredClone(obj));
+}
diff --git a/packages/backend/src/services/add-note-to-antenna.ts b/packages/backend/src/services/add-note-to-antenna.ts
deleted file mode 100644
index 66bc898263..0000000000
--- a/packages/backend/src/services/add-note-to-antenna.ts
+++ /dev/null
@@ -1,24 +0,0 @@
-import type { Antenna } from "@/models/entities/antenna.js";
-import type { Note } from "@/models/entities/note.js";
-import { getTimestamp } from "backend-rs";
-import { redisClient } from "@/db/redis.js";
-import { publishAntennaStream } from "@/services/stream.js";
-import type { User } from "@/models/entities/user.js";
-
-export async function addNoteToAntenna(
-	antenna: Antenna,
-	note: Note,
-	_noteUser: { id: User["id"] },
-) {
-	redisClient.xadd(
-		`antennaTimeline:${antenna.id}`,
-		"MAXLEN",
-		"~",
-		"200",
-		`${getTimestamp(note.id)}-*`,
-		"note",
-		note.id,
-	);
-
-	publishAntennaStream(antenna.id, "note", note);
-}
diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts
index 662064da57..206f6a50ad 100644
--- a/packages/backend/src/services/note/create.ts
+++ b/packages/backend/src/services/note/create.ts
@@ -44,8 +44,7 @@ import { Poll } from "@/models/entities/poll.js";
 import { createNotification } from "@/services/create-notification.js";
 import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js";
 import { checkHitAntenna } from "@/misc/check-hit-antenna.js";
-import { checkWordMute } from "backend-rs";
-import { addNoteToAntenna } from "@/services/add-note-to-antenna.js";
+import { addNoteToAntenna, checkWordMute } from "backend-rs";
 import { countSameRenotes } from "@/misc/count-same-renotes.js";
 import { deliverToRelays, getCachedRelays } from "../relay.js";
 import type { Channel } from "@/models/entities/channel.js";
@@ -63,6 +62,7 @@ import { Mutex } from "redis-semaphore";
 import { langmap } from "@/misc/langmap.js";
 import Logger from "@/services/logger.js";
 import { inspect } from "node:util";
+import { undefinedToNull } from "@/prelude/undefined-to-null.js";
 
 const logger = new Logger("create-note");
 
@@ -399,7 +399,8 @@ export default async (
 		for (const antenna of await getAntennas()) {
 			checkHitAntenna(antenna, note, user).then((hit) => {
 				if (hit) {
-					addNoteToAntenna(antenna, note, user);
+					// TODO: do this more sanely
+					addNoteToAntenna(antenna.id, undefinedToNull(note) as Note);
 				}
 			});
 		}
diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts
index 0e2395e87e..6dff9602cd 100644
--- a/packages/backend/src/services/stream.ts
+++ b/packages/backend/src/services/stream.ts
@@ -4,12 +4,12 @@ import type { Note } from "@/models/entities/note.js";
 import type { UserList } from "@/models/entities/user-list.js";
 import type { UserGroup } from "@/models/entities/user-group.js";
 import { config } from "@/config.js";
-import type { Antenna } from "@/models/entities/antenna.js";
+// import type { Antenna } from "@/models/entities/antenna.js";
 import type { Channel } from "@/models/entities/channel.js";
 import type {
 	StreamChannels,
 	AdminStreamTypes,
-	AntennaStreamTypes,
+	// AntennaStreamTypes,
 	BroadcastTypes,
 	ChannelStreamTypes,
 	DriveStreamTypes,
@@ -134,17 +134,17 @@ class Publisher {
 		);
 	};
 
-	public publishAntennaStream = <K extends keyof AntennaStreamTypes>(
-		antennaId: Antenna["id"],
-		type: K,
-		value?: AntennaStreamTypes[K],
-	): void => {
-		this.publish(
-			`antennaStream:${antennaId}`,
-			type,
-			typeof value === "undefined" ? null : value,
-		);
-	};
+	// public publishAntennaStream = <K extends keyof AntennaStreamTypes>(
+	// 	antennaId: Antenna["id"],
+	// 	type: K,
+	// 	value?: AntennaStreamTypes[K],
+	// ): void => {
+	// 	this.publish(
+	// 		`antennaStream:${antennaId}`,
+	// 		type,
+	// 		typeof value === "undefined" ? null : value,
+	// 	);
+	// };
 
 	public publishMessagingStream = <K extends keyof MessagingStreamTypes>(
 		userId: User["id"],
@@ -217,7 +217,7 @@ export const publishNoteStream = publisher.publishNoteStream;
 export const publishNotesStream = publisher.publishNotesStream;
 export const publishChannelStream = publisher.publishChannelStream;
 export const publishUserListStream = publisher.publishUserListStream;
-export const publishAntennaStream = publisher.publishAntennaStream;
+// export const publishAntennaStream = publisher.publishAntennaStream;
 export const publishMessagingStream = publisher.publishMessagingStream;
 export const publishGroupMessagingStream =
 	publisher.publishGroupMessagingStream;
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index b591ffe9c1..16a4ad6ad4 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -521,6 +521,9 @@ importers:
       tsconfig-paths:
         specifier: 4.2.0
         version: 4.2.0
+      type-fest:
+        specifier: 4.15.0
+        version: 4.15.0
       typescript:
         specifier: 5.4.5
         version: 5.4.5
@@ -16632,6 +16635,11 @@ packages:
     engines: {node: '>=8'}
     dev: true
 
+  /type-fest@4.15.0:
+    resolution: {integrity: sha512-tB9lu0pQpX5KJq54g+oHOLumOx+pMep4RaM6liXh2PKmVRFF+/vAtUP0ZaJ0kOySfVNjF6doBWPHhBhISKdlIA==}
+    engines: {node: '>=16'}
+    dev: true
+
   /type-is@1.6.18:
     resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==}
     engines: {node: '>= 0.6'}