Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { TimelineCalendarEventModule } from 'src/engine/core-modules/calendar/ti
import { CaptchaModule } from 'src/engine/core-modules/captcha/captcha.module';
import { captchaModuleFactory } from 'src/engine/core-modules/captcha/captcha.module-factory';
import { CloudflareModule } from 'src/engine/core-modules/cloudflare/cloudflare.module';
import { codeInterpreterModuleFactory } from 'src/engine/core-modules/code-interpreter/code-interpreter-module.factory';
import { CodeInterpreterModule } from 'src/engine/core-modules/code-interpreter/code-interpreter.module';
import { DnsManagerModule } from 'src/engine/core-modules/dns-manager/dns-manager.module';
import { EmailModule } from 'src/engine/core-modules/email/email.module';
import { EmailingDomainModule } from 'src/engine/core-modules/emailing-domain/emailing-domain.module';
Expand All @@ -36,14 +38,14 @@ import { loggerModuleFactory } from 'src/engine/core-modules/logger/logger.modul
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
import { messageQueueModuleFactory } from 'src/engine/core-modules/message-queue/message-queue.module-factory';
import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timeline-messaging.module';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
import { PublicDomainModule } from 'src/engine/core-modules/public-domain/public-domain.module';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
import { SearchModule } from 'src/engine/core-modules/search/search.module';
import { codeInterpreterModuleFactory } from 'src/engine/core-modules/code-interpreter/code-interpreter-module.factory';
import { CodeInterpreterModule } from 'src/engine/core-modules/code-interpreter/code-interpreter.module';
import { serverlessModuleFactory } from 'src/engine/core-modules/serverless/serverless-module.factory';
import { ServerlessModule } from 'src/engine/core-modules/serverless/serverless.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
Expand Down Expand Up @@ -117,9 +119,10 @@ import { FileModule } from './file/file.module';
useFactory: loggerModuleFactory,
inject: [TwentyConfigService],
}),
MetricsModule,
MessageQueueModule.registerAsync({
useFactory: messageQueueModuleFactory,
inject: [TwentyConfigService, RedisClientService],
inject: [TwentyConfigService, RedisClientService, MetricsService],
}),
ExceptionHandlerModule.forRootAsync({
useFactory: exceptionHandlerModuleFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import { type MessageQueueJob } from 'src/engine/core-modules/message-queue/inte
import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';

import { QUEUE_RETENTION } from 'src/engine/core-modules/message-queue/constants/queue-retention.constants';
import { MESSAGE_QUEUE_PRIORITY } from 'src/engine/core-modules/message-queue/message-queue-priority.constant';
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { getJobKey } from 'src/engine/core-modules/message-queue/utils/get-job-key.util';
import { MESSAGE_QUEUE_PRIORITY } from 'src/engine/core-modules/message-queue/message-queue-priority.constant';
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';

export type BullMQDriverOptions = QueueOptions;

Expand All @@ -38,7 +40,10 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
Worker
>;

constructor(private options: BullMQDriverOptions) {}
constructor(
private options: BullMQDriverOptions,
private metricsService: MetricsService,
) {}

register(queueName: MessageQueue): void {
this.queueMap[queueName] = new Queue(queueName, this.options);
Expand Down Expand Up @@ -89,6 +94,30 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
},
workerOptions,
);

this.workerMap[queueName].on('completed', (job) => {
this.metricsService.incrementCounter({
key: MetricsKeys.JobCompleted,
attributes: { queue: queueName, job_name: job?.name ?? '' },
shouldStoreInCache: false,
});
});

this.workerMap[queueName].on('failed', (job, error) => {
if (!isDefined(job) || !isDefined(error)) {
return;
}

this.metricsService.incrementCounter({
key: MetricsKeys.JobFailed,
attributes: {
queue: queueName,
job_name: job.name,
error_type: error.name,
},
shouldStoreInCache: false,
});
});
}

async addCron<T>({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type BullMQDriverOptions } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';

export enum MessageQueueDriverType {
BullMQ = 'bull-mq',
Expand All @@ -8,6 +9,7 @@ export enum MessageQueueDriverType {
export interface BullMQDriverFactoryOptions {
type: MessageQueueDriverType.BullMQ;
options: BullMQDriverOptions;
metricsService: MetricsService;
}

export interface SyncDriverFactoryOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from 'src/engine/core-modules/message-queue/message-queue.module-definition';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';

@Global()
@Module({})
Expand Down Expand Up @@ -77,6 +78,7 @@ export class MessageQueueCoreModule extends ConfigurableModuleClass {

return {
...dynamicModule,
imports: [...(dynamicModule.imports ?? []), MetricsModule],
providers: [
...(dynamicModule.providers ?? []),
driverProvider,
Expand All @@ -91,17 +93,17 @@ export class MessageQueueCoreModule extends ConfigurableModuleClass {
};
}

static async createDriver({ type, options }: typeof OPTIONS_TYPE) {
switch (type) {
static async createDriver(config: typeof OPTIONS_TYPE) {
switch (config.type) {
case MessageQueueDriverType.BullMQ: {
return new BullMQDriver(options);
return new BullMQDriver(config.options, config.metricsService);
}
case MessageQueueDriverType.Sync: {
return new SyncDriver();
}
default: {
this.logger.warn(
`Unsupported message queue driver type: ${type}. Using SyncDriver by default.`,
`Unsupported message queue driver type: ${(config as { type: string })?.type}. Using SyncDriver by default.`,
);

return new SyncDriver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ import {
MessageQueueDriverType,
type MessageQueueModuleOptions,
} from 'src/engine/core-modules/message-queue/interfaces';
import { type MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { type RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
import { type TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';

/**
* MessageQueue Module factory
* @returns MessageQueueModuleOptions
* @param twentyConfigService
* @param redisClientService
* @param metricsService
*/
export const messageQueueModuleFactory = async (
_twentyConfigService: TwentyConfigService,
redisClientService: RedisClientService,
metricsService: MetricsService,
): Promise<MessageQueueModuleOptions> => {
const driverType = MessageQueueDriverType.BullMQ;

Expand All @@ -24,6 +28,7 @@ export const messageQueueModuleFactory = async (
options: {
connection: redisClientService.getQueueClient(),
},
metricsService,
} satisfies BullMQDriverFactoryOptions;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ export enum MetricsKeys {
JobWebhookCallCompleted = 'job/webhook-call-completed',
SignUpSuccess = 'sign-up/success',
CommonApiQueryRateLimited = 'common-api-query/rate-limited',
JobCompleted = 'job/completed',
JobFailed = 'job/failed',
}