Skip to content

Commit

Permalink
Fix memory leak by calling punsubscribe instead of unsubscribe
Browse files Browse the repository at this point in the history
Also adding a /metrics endpoint with a few custom metrics and the built-in prom-client ones
  • Loading branch information
ericallam committed Apr 9, 2024
1 parent f148059 commit a5aed0d
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 13 deletions.
19 changes: 19 additions & 0 deletions apps/webapp/app/metrics.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { OpenMetricsContentType, Registry, collectDefaultMetrics, register } from "prom-client";
import { singleton } from "./utils/singleton";
import { env } from "./env.server";

export const metricsRegister = singleton("metricsRegister", initializeMetricsRegister);

function initializeMetricsRegister() {
const registry = new Registry<OpenMetricsContentType>();

register.setDefaultLabels({
serviceName: env.SERVICE_NAME,
});

registry.setContentType("application/openmetrics-text; version=1.0.0; charset=utf-8");

collectDefaultMetrics({ register: registry });

return registry;
}
10 changes: 10 additions & 0 deletions apps/webapp/app/routes/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { metricsRegister } from "~/metrics.server";

export async function loader({ request }: LoaderFunctionArgs) {
return new Response(await metricsRegister.metrics(), {
headers: {
"Content-Type": metricsRegister.contentType,
},
});
}
9 changes: 6 additions & 3 deletions apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import {
} from "@trigger.dev/core/v3";
import { Evt } from "evt";
import { randomUUID } from "node:crypto";
import type { CloseEvent, ErrorEvent, MessageEvent, WebSocket } from "ws";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { DevQueueConsumer } from "./marqs/devQueueConsumer.server";
import type { WebSocket, MessageEvent, CloseEvent, ErrorEvent } from "ws";
import { env } from "~/env.server";

export class AuthenticatedSocketConnection {
public id: string;
Expand Down Expand Up @@ -93,7 +92,11 @@ export class AuthenticatedSocketConnection {

await this._consumer.stop();

this.onClose.post(ev);
const result = this.onClose.post(ev);

logger.debug("[AuthenticatedSocketConnection] Called onClose", {
result,
});
}

async #handleError(ev: ErrorEvent) {
Expand Down
18 changes: 16 additions & 2 deletions apps/webapp/app/v3/handleWebsockets.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { authenticateApiKey } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { singleton } from "../utils/singleton";
import { AuthenticatedSocketConnection } from "./authenticatedSocketConnection.server";
import { Gauge } from "prom-client";
import { metricsRegister } from "~/metrics.server";

export const wss = singleton("wss", initalizeWebSocketServer);

Expand All @@ -16,6 +18,15 @@ function initalizeWebSocketServer() {

authenticatedConnections = new Map();

new Gauge({
name: "dev_authenticated_connections",
help: "Number of authenticated dev connections",
collect() {
this.set(authenticatedConnections.size);
},
registers: [metricsRegister],
});

return server;
}

Expand Down Expand Up @@ -47,8 +58,11 @@ async function handleWebSocketConnection(ws: WebSocket, req: IncomingMessage) {

authenticatedConnections.set(authenticatedConnection.id, authenticatedConnection);

authenticatedConnection.onClose.attach((closeEvent) => {
logger.debug("Websocket closed", { closeEvent });
authenticatedConnection.onClose.attachOnce((closeEvent) => {
logger.debug("Websocket closed", {
closeEvent,
authenticatedConnectionId: authenticatedConnection.id,
});

authenticatedConnections.delete(authenticatedConnection.id);
});
Expand Down
15 changes: 14 additions & 1 deletion apps/webapp/app/v3/marqs/devPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { z } from "zod";
import { singleton } from "~/utils/singleton";
import { ZodPubSub, ZodSubscriber } from "../utils/zodPubSub.server";
import { env } from "~/env.server";
import { Gauge } from "prom-client";
import { metricsRegister } from "~/metrics.server";

const messageCatalog = {
CANCEL_ATTEMPT: z.object({
Expand All @@ -17,7 +19,7 @@ export type DevSubscriber = ZodSubscriber<typeof messageCatalog>;
export const devPubSub = singleton("devPubSub", initializeDevPubSub);

function initializeDevPubSub() {
return new ZodPubSub({
const pubSub = new ZodPubSub({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
Expand All @@ -28,4 +30,15 @@ function initializeDevPubSub() {
},
schema: messageCatalog,
});

new Gauge({
name: "dev_pub_sub_subscribers",
help: "Number of dev pub sub subscribers",
collect() {
this.set(pubSub.subscriberCount);
},
registers: [metricsRegister],
});

return pubSub;
}
22 changes: 21 additions & 1 deletion apps/webapp/app/v3/utils/zodPubSub.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Logger } from "@trigger.dev/core-backend";
import { ZodMessageCatalogSchema, ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3";
import { Evt } from "evt";
import Redis, { RedisOptions } from "ioredis";
import { z } from "zod";
import { logger } from "~/services/logger.server";
Expand All @@ -26,6 +27,10 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
private _listeners: Map<string, (payload: unknown) => Promise<void>> = new Map();
private _messageHandler: ZodMessageHandler<TMessageCatalog>;

public onUnsubscribed: Evt<{
pattern: string;
}> = new Evt();

constructor(
private readonly _pattern: string,
private readonly _options: ZodPubSubOptions<TMessageCatalog>,
Expand All @@ -51,7 +56,9 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>

public async stopListening(): Promise<void> {
this._listeners.clear();
await this._subscriber.unsubscribe();
await this._subscriber.punsubscribe();

this.onUnsubscribed.post({ pattern: this._pattern });
}

async #onMessage(pattern: string, channel: string, serializedMessage: string) {
Expand Down Expand Up @@ -90,6 +97,11 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
private _publisher: Redis;
private _logger = logger.child({ module: "ZodPubSub" });
private _subscriberCount = 0;

get subscriberCount() {
return this._subscriberCount;
}

constructor(private _options: ZodPubSubOptions<TMessageCatalog>) {
this._publisher = new Redis(_options.redis);
Expand All @@ -112,6 +124,14 @@ export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {

await subscriber.initialize();

this._subscriberCount++;

subscriber.onUnsubscribed.attachOnce(({ pattern }) => {
logger.debug("Subscriber unsubscribed", { pattern });

this._subscriberCount--;
});

return subscriber;
}
}
1 change: 1 addition & 0 deletions apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"posthog-node": "^3.1.3",
"prism-react-renderer": "^1.3.5",
"prismjs": "^1.29.0",
"prom-client": "^15.1.0",
"random-words": "^2.0.0",
"react": "^18.2.0",
"react-aria": "^3.31.1",
Expand Down
10 changes: 4 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a5aed0d

Please sign in to comment.