Skip to content

Commit 6043edd

Browse files
authored
Add metrics for completed and failed jobs (#16969)
Add a counter for completed and failed jobs
1 parent 2c5a757 commit 6043edd

File tree

6 files changed

+52
-9
lines changed

6 files changed

+52
-9
lines changed

packages/twenty-server/src/engine/core-modules/core-engine.module.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import { TimelineCalendarEventModule } from 'src/engine/core-modules/calendar/ti
1818
import { CaptchaModule } from 'src/engine/core-modules/captcha/captcha.module';
1919
import { captchaModuleFactory } from 'src/engine/core-modules/captcha/captcha.module-factory';
2020
import { CloudflareModule } from 'src/engine/core-modules/cloudflare/cloudflare.module';
21+
import { codeInterpreterModuleFactory } from 'src/engine/core-modules/code-interpreter/code-interpreter-module.factory';
22+
import { CodeInterpreterModule } from 'src/engine/core-modules/code-interpreter/code-interpreter.module';
2123
import { DnsManagerModule } from 'src/engine/core-modules/dns-manager/dns-manager.module';
2224
import { EmailModule } from 'src/engine/core-modules/email/email.module';
2325
import { EmailingDomainModule } from 'src/engine/core-modules/emailing-domain/emailing-domain.module';
@@ -36,14 +38,14 @@ import { loggerModuleFactory } from 'src/engine/core-modules/logger/logger.modul
3638
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
3739
import { messageQueueModuleFactory } from 'src/engine/core-modules/message-queue/message-queue.module-factory';
3840
import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timeline-messaging.module';
41+
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
42+
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
3943
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
4044
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
4145
import { PublicDomainModule } from 'src/engine/core-modules/public-domain/public-domain.module';
4246
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
4347
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
4448
import { SearchModule } from 'src/engine/core-modules/search/search.module';
45-
import { codeInterpreterModuleFactory } from 'src/engine/core-modules/code-interpreter/code-interpreter-module.factory';
46-
import { CodeInterpreterModule } from 'src/engine/core-modules/code-interpreter/code-interpreter.module';
4749
import { serverlessModuleFactory } from 'src/engine/core-modules/serverless/serverless-module.factory';
4850
import { ServerlessModule } from 'src/engine/core-modules/serverless/serverless.module';
4951
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
@@ -117,9 +119,10 @@ import { FileModule } from './file/file.module';
117119
useFactory: loggerModuleFactory,
118120
inject: [TwentyConfigService],
119121
}),
122+
MetricsModule,
120123
MessageQueueModule.registerAsync({
121124
useFactory: messageQueueModuleFactory,
122-
inject: [TwentyConfigService, RedisClientService],
125+
inject: [TwentyConfigService, RedisClientService, MetricsService],
123126
}),
124127
ExceptionHandlerModule.forRootAsync({
125128
useFactory: exceptionHandlerModuleFactory,

packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import { type MessageQueueJob } from 'src/engine/core-modules/message-queue/inte
1919
import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
2020

2121
import { QUEUE_RETENTION } from 'src/engine/core-modules/message-queue/constants/queue-retention.constants';
22+
import { MESSAGE_QUEUE_PRIORITY } from 'src/engine/core-modules/message-queue/message-queue-priority.constant';
2223
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
2324
import { getJobKey } from 'src/engine/core-modules/message-queue/utils/get-job-key.util';
24-
import { MESSAGE_QUEUE_PRIORITY } from 'src/engine/core-modules/message-queue/message-queue-priority.constant';
25+
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
26+
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
2527

2628
export type BullMQDriverOptions = QueueOptions;
2729

@@ -38,7 +40,10 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
3840
Worker
3941
>;
4042

41-
constructor(private options: BullMQDriverOptions) {}
43+
constructor(
44+
private options: BullMQDriverOptions,
45+
private metricsService: MetricsService,
46+
) {}
4247

4348
register(queueName: MessageQueue): void {
4449
this.queueMap[queueName] = new Queue(queueName, this.options);
@@ -89,6 +94,30 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
8994
},
9095
workerOptions,
9196
);
97+
98+
this.workerMap[queueName].on('completed', (job) => {
99+
this.metricsService.incrementCounter({
100+
key: MetricsKeys.JobCompleted,
101+
attributes: { queue: queueName, job_name: job?.name ?? '' },
102+
shouldStoreInCache: false,
103+
});
104+
});
105+
106+
this.workerMap[queueName].on('failed', (job, error) => {
107+
if (!isDefined(job) || !isDefined(error)) {
108+
return;
109+
}
110+
111+
this.metricsService.incrementCounter({
112+
key: MetricsKeys.JobFailed,
113+
attributes: {
114+
queue: queueName,
115+
job_name: job.name,
116+
error_type: error.name,
117+
},
118+
shouldStoreInCache: false,
119+
});
120+
});
92121
}
93122

94123
async addCron<T>({

packages/twenty-server/src/engine/core-modules/message-queue/interfaces/message-queue-module-options.interface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { type BullMQDriverOptions } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
2+
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
23

34
export enum MessageQueueDriverType {
45
BullMQ = 'bull-mq',
@@ -8,6 +9,7 @@ export enum MessageQueueDriverType {
89
export interface BullMQDriverFactoryOptions {
910
type: MessageQueueDriverType.BullMQ;
1011
options: BullMQDriverOptions;
12+
metricsService: MetricsService;
1113
}
1214

1315
export interface SyncDriverFactoryOptions {

packages/twenty-server/src/engine/core-modules/message-queue/message-queue-core.module.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
} from 'src/engine/core-modules/message-queue/message-queue.module-definition';
2323
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
2424
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
25+
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
2526

2627
@Global()
2728
@Module({})
@@ -77,6 +78,7 @@ export class MessageQueueCoreModule extends ConfigurableModuleClass {
7778

7879
return {
7980
...dynamicModule,
81+
imports: [...(dynamicModule.imports ?? []), MetricsModule],
8082
providers: [
8183
...(dynamicModule.providers ?? []),
8284
driverProvider,
@@ -91,17 +93,17 @@ export class MessageQueueCoreModule extends ConfigurableModuleClass {
9193
};
9294
}
9395

94-
static async createDriver({ type, options }: typeof OPTIONS_TYPE) {
95-
switch (type) {
96+
static async createDriver(config: typeof OPTIONS_TYPE) {
97+
switch (config.type) {
9698
case MessageQueueDriverType.BullMQ: {
97-
return new BullMQDriver(options);
99+
return new BullMQDriver(config.options, config.metricsService);
98100
}
99101
case MessageQueueDriverType.Sync: {
100102
return new SyncDriver();
101103
}
102104
default: {
103105
this.logger.warn(
104-
`Unsupported message queue driver type: ${type}. Using SyncDriver by default.`,
106+
`Unsupported message queue driver type: ${(config as { type: string })?.type}. Using SyncDriver by default.`,
105107
);
106108

107109
return new SyncDriver();

packages/twenty-server/src/engine/core-modules/message-queue/message-queue.module-factory.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@ import {
33
MessageQueueDriverType,
44
type MessageQueueModuleOptions,
55
} from 'src/engine/core-modules/message-queue/interfaces';
6+
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
67
import { type RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
78
import { type TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
89

910
/**
1011
* MessageQueue Module factory
1112
* @returns MessageQueueModuleOptions
1213
* @param twentyConfigService
14+
* @param redisClientService
15+
* @param metricsService
1316
*/
1417
export const messageQueueModuleFactory = async (
1518
_twentyConfigService: TwentyConfigService,
1619
redisClientService: RedisClientService,
20+
metricsService: MetricsService,
1721
): Promise<MessageQueueModuleOptions> => {
1822
const driverType = MessageQueueDriverType.BullMQ;
1923

@@ -24,6 +28,7 @@ export const messageQueueModuleFactory = async (
2428
options: {
2529
connection: redisClientService.getQueueClient(),
2630
},
31+
metricsService,
2732
} satisfies BullMQDriverFactoryOptions;
2833
}
2934
default:

packages/twenty-server/src/engine/core-modules/metrics/types/metrics-keys.type.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ export enum MetricsKeys {
3030
JobWebhookCallCompleted = 'job/webhook-call-completed',
3131
SignUpSuccess = 'sign-up/success',
3232
CommonApiQueryRateLimited = 'common-api-query/rate-limited',
33+
JobCompleted = 'job/completed',
34+
JobFailed = 'job/failed',
3335
}

0 commit comments

Comments
 (0)