Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/twenty-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.60.0",
"@opentelemetry/exporter-metrics-otlp-http": "^0.200.0",
"@opentelemetry/exporter-prometheus": "^0.211.0",
"@opentelemetry/sdk-metrics": "^2.0.0",
"@opentelemetry/sdk-node": "^0.202.0",
"@ptc-org/nestjs-query-core": "4.4.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,38 @@ export class CacheStorageService {
} while (cursor !== 0);
}

async scanAndCountSetMembers(scanPattern: string): Promise<number> {
if (!this.isRedisCache()) {
throw new Error(
'scanAndCountSetMembers is only supported with Redis cache',
);
}

const redisClient = (this.cache as RedisCache).store.client;
let cursor = 0;
let totalCount = 0;

do {
const result = await redisClient.scan(cursor, {
MATCH: `${this.namespace}:${scanPattern}`,
COUNT: 100,
});

cursor = result.cursor;
const keys = result.keys;

if (keys.length > 0) {
const counts = await Promise.all(
keys.map((key) => redisClient.sCard(key)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be painful (N network calls), can we use Redis pipeline for that instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably even better with LUA

);

totalCount += counts.reduce((sum, count) => sum + count, 0);
}
} while (cursor !== 0);

return totalCount;
}

async acquireLock(key: string, ttl = 1000): Promise<boolean> {
if (!this.isRedisCache()) {
throw new Error('acquireLock is only supported with Redis cache');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
import { Injectable } from '@nestjs/common';

import { metrics, type Attributes } from '@opentelemetry/api';
import {
metrics,
type Attributes,
type Meter,
type MetricOptions,
type ObservableGauge,
type ObservableResult,
} from '@opentelemetry/api';

import { MetricsCacheService } from 'src/engine/core-modules/metrics/metrics-cache.service';
import { type MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';

const METER_NAME = 'twenty-server';

@Injectable()
export class MetricsService {
constructor(private readonly metricsCacheService: MetricsCacheService) {}

getMeter(): Meter {
return metrics.getMeter(METER_NAME);
}

createObservableGauge(
name: string,
options: MetricOptions,
callback: (observableResult: ObservableResult) => void | Promise<void>,
): ObservableGauge {
const gauge = this.getMeter().createObservableGauge(name, options);

gauge.addCallback(callback);

return gauge;
}

async incrementCounter({
key,
eventId,
Expand All @@ -20,9 +45,7 @@ export class MetricsService {
attributes?: Attributes;
shouldStoreInCache?: boolean;
}) {
//TODO : Define meter name usage in monitoring
const meter = metrics.getMeter('twenty-server');
const counter = meter.createCounter(key);
const counter = this.getMeter().createCounter(key);

counter.add(1, attributes);

Expand All @@ -42,9 +65,7 @@ export class MetricsService {
attributes?: Attributes;
shouldStoreInCache?: boolean;
}) {
//TODO : Define meter name usage in monitoring
const meter = metrics.getMeter('twenty-server');
const counter = meter.createCounter(key);
const counter = this.getMeter().createCounter(key);

counter.add(eventIds.length, attributes);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum MeterDriver {
OpenTelemetry = 'opentelemetry',
Console = 'console',
Prometheus = 'prometheus',
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';

import { type RecordGqlOperationSignature } from 'twenty-shared/types';
import { isDefined } from 'twenty-shared/utils';
Expand All @@ -9,6 +9,7 @@ import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { EVENT_STREAM_TTL_MS } from 'src/engine/subscriptions/constants/event-stream-ttl.constant';
import {
EventStreamException,
Expand All @@ -17,13 +18,38 @@ import {
import { type EventStreamData } from 'src/engine/subscriptions/types/event-stream-data.type';

@Injectable()
export class EventStreamService {
export class EventStreamService implements OnModuleInit {
private readonly logger = new Logger(EventStreamService.name);

constructor(
@InjectCacheStorage(CacheStorageNamespace.EngineSubscriptions)
private readonly cacheStorageService: CacheStorageService,
private readonly cacheLockService: CacheLockService,
private readonly metricsService: MetricsService,
) {}

onModuleInit() {
this.metricsService.createObservableGauge(
'twenty_event_streams_live_total',
{ description: 'Current number of live event streams' },
async (observableResult) => {
try {
const count = await this.getTotalActiveStreamCount();

observableResult.observe(count);
} catch (error) {
this.logger.error('Failed to collect event streams metrics', error);
}
},
);
}

async getTotalActiveStreamCount(): Promise<number> {
return this.cacheStorageService.scanAndCountSetMembers(
'workspace:*:activeStreams',
);
}

async createEventStream({
workspaceId,
eventStreamChannelId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';

import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module';
import { CacheStorageModule } from 'src/engine/core-modules/cache-storage/cache-storage.module';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { EventStreamService } from 'src/engine/subscriptions/event-stream.service';
Expand All @@ -13,6 +14,7 @@ import { SubscriptionService } from 'src/engine/subscriptions/subscription.servi
RedisClientModule,
CacheStorageModule,
CacheLockModule,
MetricsModule,
TypeOrmModule.forFeature([WorkspaceEntity]),
],
providers: [SubscriptionService, EventStreamService],
Expand Down
6 changes: 6 additions & 0 deletions packages/twenty-server/src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import process from 'process';

import opentelemetry from '@opentelemetry/api';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
import {
AggregationTemporality,
ConsoleMetricExporter,
Expand Down Expand Up @@ -49,6 +50,10 @@ if (process.env.EXCEPTION_HANDLER_DRIVER === ExceptionHandlerDriver.SENTRY) {

// Meter setup

const prometheusExporter = meterDrivers.includes(MeterDriver.Prometheus)
? new PrometheusExporter({ port: 9464 })
: null;

const meterProvider = new MeterProvider({
readers: [
...(meterDrivers.includes(MeterDriver.Console)
Expand All @@ -70,6 +75,7 @@ const meterProvider = new MeterProvider({
}),
]
: []),
...(prometheusExporter ? [prometheusExporter] : []),
],
});

Expand Down
26 changes: 26 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14136,6 +14136,19 @@ __metadata:
languageName: node
linkType: hard

"@opentelemetry/exporter-prometheus@npm:^0.211.0":
version: 0.211.0
resolution: "@opentelemetry/exporter-prometheus@npm:0.211.0"
dependencies:
"@opentelemetry/core": "npm:2.5.0"
"@opentelemetry/resources": "npm:2.5.0"
"@opentelemetry/sdk-metrics": "npm:2.5.0"
peerDependencies:
"@opentelemetry/api": ^1.3.0
checksum: 10c0/3e8c62d62a9dd336b6cce1b7073e747f912ec1182414773be6cf4c491ffca92e59b9b1144e2b10cc5f0fdf98ae70c15f8a64b8051ef944507542aa87c5be153d
languageName: node
linkType: hard

"@opentelemetry/exporter-trace-otlp-grpc@npm:0.202.0":
version: 0.202.0
resolution: "@opentelemetry/exporter-trace-otlp-grpc@npm:0.202.0"
Expand Down Expand Up @@ -15608,6 +15621,18 @@ __metadata:
languageName: node
linkType: hard

"@opentelemetry/sdk-metrics@npm:2.5.0":
version: 2.5.0
resolution: "@opentelemetry/sdk-metrics@npm:2.5.0"
dependencies:
"@opentelemetry/core": "npm:2.5.0"
"@opentelemetry/resources": "npm:2.5.0"
peerDependencies:
"@opentelemetry/api": ">=1.9.0 <1.10.0"
checksum: 10c0/8e85c14705d700d7fdd1ab9649ad786fb004663a04e0ebca15f2cbc5cbe31ac898f871ded6339bce8c998dded00c7d876ff5e749d10d5d49455af9afe73656d6
languageName: node
linkType: hard

"@opentelemetry/sdk-node@npm:^0.202.0":
version: 0.202.0
resolution: "@opentelemetry/sdk-node@npm:0.202.0"
Expand Down Expand Up @@ -57559,6 +57584,7 @@ __metadata:
"@opentelemetry/api": "npm:^1.9.0"
"@opentelemetry/auto-instrumentations-node": "npm:^0.60.0"
"@opentelemetry/exporter-metrics-otlp-http": "npm:^0.200.0"
"@opentelemetry/exporter-prometheus": "npm:^0.211.0"
"@opentelemetry/sdk-metrics": "npm:^2.0.0"
"@opentelemetry/sdk-node": "npm:^0.202.0"
"@ptc-org/nestjs-query-core": "npm:4.4.0"
Expand Down
Loading