diff --git a/packages/twenty-server/package.json b/packages/twenty-server/package.json index 388b519bec1b7..743d4eaaae399 100644 --- a/packages/twenty-server/package.json +++ b/packages/twenty-server/package.json @@ -65,6 +65,7 @@ "@opentelemetry/api": "^1.9.0", "@opentelemetry/auto-instrumentations-node": "^0.60.0", "@opentelemetry/exporter-metrics-otlp-http": "^0.200.0", + "@opentelemetry/exporter-prometheus": "^0.211.0", "@opentelemetry/sdk-metrics": "^2.0.0", "@opentelemetry/sdk-node": "^0.202.0", "@ptc-org/nestjs-query-core": "4.4.0", diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts index 683485363d524..63a3380026521 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts @@ -206,6 +206,38 @@ export class CacheStorageService { } while (cursor !== 0); } + async scanAndCountSetMembers(scanPattern: string): Promise { + if (!this.isRedisCache()) { + throw new Error( + 'scanAndCountSetMembers is only supported with Redis cache', + ); + } + + const redisClient = (this.cache as RedisCache).store.client; + let cursor = 0; + let totalCount = 0; + + do { + const result = await redisClient.scan(cursor, { + MATCH: `${this.namespace}:${scanPattern}`, + COUNT: 100, + }); + + cursor = result.cursor; + const keys = result.keys; + + if (keys.length > 0) { + const counts = await Promise.all( + keys.map((key) => redisClient.sCard(key)), + ); + + totalCount += counts.reduce((sum, count) => sum + count, 0); + } + } while (cursor !== 0); + + return totalCount; + } + async acquireLock(key: string, ttl = 1000): Promise { if (!this.isRedisCache()) { throw new Error('acquireLock is only supported with Redis cache'); diff --git a/packages/twenty-server/src/engine/core-modules/metrics/metrics.service.ts b/packages/twenty-server/src/engine/core-modules/metrics/metrics.service.ts index 07507925b0d5c..b6304f640ae69 100644 --- a/packages/twenty-server/src/engine/core-modules/metrics/metrics.service.ts +++ b/packages/twenty-server/src/engine/core-modules/metrics/metrics.service.ts @@ -1,14 +1,39 @@ import { Injectable } from '@nestjs/common'; -import { metrics, type Attributes } from '@opentelemetry/api'; +import { + metrics, + type Attributes, + type Meter, + type MetricOptions, + type ObservableGauge, + type ObservableResult, +} from '@opentelemetry/api'; import { MetricsCacheService } from 'src/engine/core-modules/metrics/metrics-cache.service'; import { type MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type'; +const METER_NAME = 'twenty-server'; + @Injectable() export class MetricsService { constructor(private readonly metricsCacheService: MetricsCacheService) {} + getMeter(): Meter { + return metrics.getMeter(METER_NAME); + } + + createObservableGauge( + name: string, + options: MetricOptions, + callback: (observableResult: ObservableResult) => void | Promise, + ): ObservableGauge { + const gauge = this.getMeter().createObservableGauge(name, options); + + gauge.addCallback(callback); + + return gauge; + } + async incrementCounter({ key, eventId, @@ -20,9 +45,7 @@ export class MetricsService { attributes?: Attributes; shouldStoreInCache?: boolean; }) { - //TODO : Define meter name usage in monitoring - const meter = metrics.getMeter('twenty-server'); - const counter = meter.createCounter(key); + const counter = this.getMeter().createCounter(key); counter.add(1, attributes); @@ -42,9 +65,7 @@ export class MetricsService { attributes?: Attributes; shouldStoreInCache?: boolean; }) { - //TODO : Define meter name usage in monitoring - const meter = metrics.getMeter('twenty-server'); - const counter = meter.createCounter(key); + const counter = this.getMeter().createCounter(key); counter.add(eventIds.length, attributes); diff --git a/packages/twenty-server/src/engine/core-modules/metrics/types/meter-driver.type.ts b/packages/twenty-server/src/engine/core-modules/metrics/types/meter-driver.type.ts index fb8be359bff37..8bcfc641116cf 100644 --- a/packages/twenty-server/src/engine/core-modules/metrics/types/meter-driver.type.ts +++ b/packages/twenty-server/src/engine/core-modules/metrics/types/meter-driver.type.ts @@ -1,4 +1,5 @@ export enum MeterDriver { OpenTelemetry = 'opentelemetry', Console = 'console', + Prometheus = 'prometheus', } diff --git a/packages/twenty-server/src/engine/subscriptions/event-stream.service.ts b/packages/twenty-server/src/engine/subscriptions/event-stream.service.ts index 51c81387f94ba..8e0a5c1b5303b 100644 --- a/packages/twenty-server/src/engine/subscriptions/event-stream.service.ts +++ b/packages/twenty-server/src/engine/subscriptions/event-stream.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { type RecordGqlOperationSignature } from 'twenty-shared/types'; import { isDefined } from 'twenty-shared/utils'; @@ -9,6 +9,7 @@ import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; +import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service'; import { EVENT_STREAM_TTL_MS } from 'src/engine/subscriptions/constants/event-stream-ttl.constant'; import { EventStreamException, @@ -17,13 +18,38 @@ import { import { type EventStreamData } from 'src/engine/subscriptions/types/event-stream-data.type'; @Injectable() -export class EventStreamService { +export class EventStreamService implements OnModuleInit { + private readonly logger = new Logger(EventStreamService.name); + constructor( @InjectCacheStorage(CacheStorageNamespace.EngineSubscriptions) private readonly cacheStorageService: CacheStorageService, private readonly cacheLockService: CacheLockService, + private readonly metricsService: MetricsService, ) {} + onModuleInit() { + this.metricsService.createObservableGauge( + 'twenty_event_streams_live_total', + { description: 'Current number of live event streams' }, + async (observableResult) => { + try { + const count = await this.getTotalActiveStreamCount(); + + observableResult.observe(count); + } catch (error) { + this.logger.error('Failed to collect event streams metrics', error); + } + }, + ); + } + + async getTotalActiveStreamCount(): Promise { + return this.cacheStorageService.scanAndCountSetMembers( + 'workspace:*:activeStreams', + ); + } + async createEventStream({ workspaceId, eventStreamChannelId, diff --git a/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts b/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts index 6204ce820da76..fddd8a010ed43 100644 --- a/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts +++ b/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts @@ -3,6 +3,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module'; import { CacheStorageModule } from 'src/engine/core-modules/cache-storage/cache-storage.module'; +import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module'; import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module'; import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; import { EventStreamService } from 'src/engine/subscriptions/event-stream.service'; @@ -13,6 +14,7 @@ import { SubscriptionService } from 'src/engine/subscriptions/subscription.servi RedisClientModule, CacheStorageModule, CacheLockModule, + MetricsModule, TypeOrmModule.forFeature([WorkspaceEntity]), ], providers: [SubscriptionService, EventStreamService], diff --git a/packages/twenty-server/src/instrument.ts b/packages/twenty-server/src/instrument.ts index dd9c5de4ace5d..217d1231399e0 100644 --- a/packages/twenty-server/src/instrument.ts +++ b/packages/twenty-server/src/instrument.ts @@ -2,6 +2,7 @@ import process from 'process'; import opentelemetry from '@opentelemetry/api'; import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http'; +import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'; import { AggregationTemporality, ConsoleMetricExporter, @@ -49,6 +50,10 @@ if (process.env.EXCEPTION_HANDLER_DRIVER === ExceptionHandlerDriver.SENTRY) { // Meter setup +const prometheusExporter = meterDrivers.includes(MeterDriver.Prometheus) + ? new PrometheusExporter({ port: 9464 }) + : null; + const meterProvider = new MeterProvider({ readers: [ ...(meterDrivers.includes(MeterDriver.Console) @@ -70,6 +75,7 @@ const meterProvider = new MeterProvider({ }), ] : []), + ...(prometheusExporter ? [prometheusExporter] : []), ], }); diff --git a/yarn.lock b/yarn.lock index 184b437d080bb..98a962d6b5de4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -14136,6 +14136,19 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/exporter-prometheus@npm:^0.211.0": + version: 0.211.0 + resolution: "@opentelemetry/exporter-prometheus@npm:0.211.0" + dependencies: + "@opentelemetry/core": "npm:2.5.0" + "@opentelemetry/resources": "npm:2.5.0" + "@opentelemetry/sdk-metrics": "npm:2.5.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/3e8c62d62a9dd336b6cce1b7073e747f912ec1182414773be6cf4c491ffca92e59b9b1144e2b10cc5f0fdf98ae70c15f8a64b8051ef944507542aa87c5be153d + languageName: node + linkType: hard + "@opentelemetry/exporter-trace-otlp-grpc@npm:0.202.0": version: 0.202.0 resolution: "@opentelemetry/exporter-trace-otlp-grpc@npm:0.202.0" @@ -15608,6 +15621,18 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/sdk-metrics@npm:2.5.0": + version: 2.5.0 + resolution: "@opentelemetry/sdk-metrics@npm:2.5.0" + dependencies: + "@opentelemetry/core": "npm:2.5.0" + "@opentelemetry/resources": "npm:2.5.0" + peerDependencies: + "@opentelemetry/api": ">=1.9.0 <1.10.0" + checksum: 10c0/8e85c14705d700d7fdd1ab9649ad786fb004663a04e0ebca15f2cbc5cbe31ac898f871ded6339bce8c998dded00c7d876ff5e749d10d5d49455af9afe73656d6 + languageName: node + linkType: hard + "@opentelemetry/sdk-node@npm:^0.202.0": version: 0.202.0 resolution: "@opentelemetry/sdk-node@npm:0.202.0" @@ -57559,6 +57584,7 @@ __metadata: "@opentelemetry/api": "npm:^1.9.0" "@opentelemetry/auto-instrumentations-node": "npm:^0.60.0" "@opentelemetry/exporter-metrics-otlp-http": "npm:^0.200.0" + "@opentelemetry/exporter-prometheus": "npm:^0.211.0" "@opentelemetry/sdk-metrics": "npm:^2.0.0" "@opentelemetry/sdk-node": "npm:^0.202.0" "@ptc-org/nestjs-query-core": "npm:4.4.0"