Skip to content

Improve workflow throttling logic#16260

Merged
thomtrp merged 5 commits intomainfrom
tt-workflow-throttling-v2
Dec 3, 2025
Merged

Improve workflow throttling logic#16260
thomtrp merged 5 commits intomainfrom
tt-workflow-throttling-v2

Conversation

@thomtrp
Copy link
Copy Markdown
Contributor

@thomtrp thomtrp commented Dec 2, 2025

  • if >5000 workflows per hour, new ones should failed
  • if >100 workflow per min, new ones should be set as not started. Except manual trigger
  • when enqueued, we check if there a not started workflows that may be queued. If yes, we call the associated job

@thomtrp thomtrp force-pushed the tt-workflow-throttling-v2 branch 2 times, most recently from 800feaa to 52db40e Compare December 2, 2025 15:07
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Dec 2, 2025

Greptile Overview

Greptile Summary

This PR implements a two-tier throttling system for workflow execution with hard and soft limits:

Key Changes:

  • Hard throttle (5000 workflows/hour): New workflows are marked as FAILED with error message when exceeded
  • Soft throttle (100 workflows/min): New workflows are marked as NOT_STARTED and queued for later execution when exceeded
  • Manual triggers bypass soft throttle but respect hard throttle
  • New WorkflowNotStartedRunsWorkspaceService manages NOT_STARTED workflows with cache-based counting
  • Enhanced ThrottlerService now returns remaining token count
  • WorkflowEnqueueAwaitingRunsJob processes queued workflows when capacity becomes available
  • Added metrics tracking for throttled workflows (WorkflowRunSoftThrottled, WorkflowRunHardThrottled)

Issues Found:

  • Potential race condition where remainingRunsToEnqueueCount becomes stale between throttle check and awaiting runs enqueue decision
  • Manual trigger bypass logic may prevent enqueueing awaiting runs even when capacity is available

Confidence Score: 3/5

  • This PR has logical issues that could affect workflow execution reliability under high load
  • The implementation introduces good architectural patterns (two-tier throttling, metrics, cache-based counting) but contains race conditions with stale token counts and logic issues with manual trigger bypass that could prevent proper workflow queue processing
  • packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts requires attention for race condition and manual trigger logic fixes

Important Files Changed

File Analysis

Filename Score Overview
packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts 3/5 Refactored throttling logic with separate hard/soft limits, added metrics tracking, and new NOT_STARTED workflow run status. Contains potential race condition with stale remainingRunsToEnqueueCount and manual trigger bypass logic issue.
packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-not-started-runs.workspace-service.ts 4/5 New service managing NOT_STARTED workflow runs with cache-based counting and throttling integration. Provides methods for tracking, recomputing, and throttling workflow execution with hard/soft limits.
packages/twenty-server/src/engine/core-modules/throttler/throttler.service.ts 4/5 Enhanced token bucket throttler to return remaining tokens and added getAvailableTokensCount method. Clean refactoring with minor timing precision concern.
packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-enqueue-awaiting-runs.workspace-service.ts 4/5 New service for processing NOT_STARTED workflows and enqueueing them when capacity is available. Includes proper error handling and metrics tracking.

Sequence Diagram

sequenceDiagram
    participant Client
    participant WorkflowRunner as WorkflowRunnerService
    participant NotStartedSvc as NotStartedRunsService
    participant Throttler as ThrottlerService
    participant Cache as Redis Cache
    participant DB as Database
    participant Queue as Message Queue

    Client->>WorkflowRunner: run(workflowVersionId, payload)
    WorkflowRunner->>WorkflowRunner: Check if manual trigger
    
    WorkflowRunner->>NotStartedSvc: throttleOrThrowIfHardLimitReached()
    NotStartedSvc->>Throttler: tokenBucketThrottleOrThrow(hard-throttle key, 1, 5000, 3600000)
    Throttler->>Cache: get(hard-throttle tokens)
    Cache-->>Throttler: current token state
    alt Hard limit exceeded (>5000/hour)
        Throttler-->>NotStartedSvc: throw ThrottlerException
        NotStartedSvc-->>WorkflowRunner: throw
        WorkflowRunner->>WorkflowRunner: createFailedWorkflowRun()
        WorkflowRunner->>DB: create workflow run (status=FAILED, error="Throttle limit reached")
        WorkflowRunner-->>Client: return {workflowRunId}
    else Hard limit OK
        Throttler->>Cache: set(hard-throttle tokens - 1)
        Throttler-->>NotStartedSvc: return remaining tokens
    end

    WorkflowRunner->>NotStartedSvc: throttleAndReturnRemainingRunsToEnqueueCount()
    NotStartedSvc->>Throttler: tokenBucketThrottleOrThrow(soft-throttle key, 1, 100, 60000)
    Throttler->>Cache: get(soft-throttle tokens)
    Cache-->>Throttler: current token state
    alt Soft limit exceeded (>100/min) AND NOT manual trigger
        Throttler-->>NotStartedSvc: throw ThrottlerException
        NotStartedSvc-->>WorkflowRunner: throw
        WorkflowRunner->>WorkflowRunner: createNotStartedWorkflowRun()
        WorkflowRunner->>DB: create workflow run (status=NOT_STARTED)
        WorkflowRunner->>NotStartedSvc: increaseWorkflowRunNotStartedCount()
        NotStartedSvc->>Cache: increment NOT_STARTED count
        WorkflowRunner-->>Client: return {workflowRunId}
    else Soft limit OK OR manual trigger
        Throttler->>Cache: set(soft-throttle tokens - 1)
        Throttler-->>NotStartedSvc: return remaining tokens
        NotStartedSvc-->>WorkflowRunner: return remainingCount
    end

    WorkflowRunner->>WorkflowRunner: enqueueWorkflowRunAndPotentialAwaitingRuns()
    WorkflowRunner->>DB: create workflow run (status=ENQUEUED)
    WorkflowRunner->>Queue: add(RunWorkflowJob, {workflowRunId})
    
    WorkflowRunner->>NotStartedSvc: getNotStartedRunsCountFromCache()
    NotStartedSvc->>Cache: get(NOT_STARTED count)
    Cache-->>NotStartedSvc: current count
    NotStartedSvc-->>WorkflowRunner: return count
    
    alt remainingCount > 0 AND notStartedCount > 0
        WorkflowRunner->>Queue: add(WorkflowEnqueueAwaitingRunsJob, {workspaceId})
        Note over Queue: Background job will process<br/>NOT_STARTED workflows later
    end
    
    WorkflowRunner-->>Client: return {workflowRunId}
Loading

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 2, 2025

🚀 Preview Environment Ready!

Your preview environment is available at: http://bore.pub:55106

This environment will automatically shut down when the PR is closed or after 5 hours.

@thomtrp
Copy link
Copy Markdown
Contributor Author

thomtrp commented Dec 2, 2025

@greptile-ai

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile


@Process(WorkflowEnqueueAwaitingRunsJob.name)
async handle({ workspaceId }: { workspaceId: string }): Promise<void> {
await this.workflowEnqueueAwaitingRunsWorkspaceService.enqueueRuns({
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotStarted

@@ -0,0 +1,3 @@
export const getWorkflowRunNotStartedCountCacheKey = (
workspaceId: string,
): string => `workflow-run-not-started-count:${workspaceId}`;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using workspace cache service?

const remainingWorkflowRunToEnqueueCount =
await this.workflowRunQueueWorkspaceService.getRemainingRunsToEnqueueCountFromDatabase(
const notStartedRunsCount =
await this.workflowNotStartedRunsWorkspaceService.getNotStartedRunsCountFromDatabase(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should maintain in cache

@@ -50,7 +50,7 @@ export class WorkflowHandleStaledRunsWorkspaceService {
},
);

await this.workflowRunQueueWorkspaceService.recomputeWorkflowRunQueuedCount(
await this.workflowNotStartedRunsWorkspaceService.recomputeWorkflowRunNotStartedCount(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not useful anymore?

import { WorkflowEnqueueAwaitingRunsWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-enqueue-awaiting-runs.workspace-service';

@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowEnqueueAwaitingRunsJob {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workflow enqueue not started Job
(i would also add an optional workflowRunId? )

@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowEnqueueAwaitingRunsJob {
constructor(
private readonly workflowEnqueueAwaitingRunsWorkspaceService: WorkflowEnqueueAwaitingRunsWorkspaceService,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workflowEnqueueRunWorkspaceService

import { getWorkflowRunNotStartedCountCacheKey } from 'src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-workflow-run-not-started-count-cache-key.util';

@Injectable()
export class WorkflowNotStartedRunsWorkspaceService {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workflow throttling

triggerPayload: payload,
});

await this.enqueueWorkflowRun(workspaceId, workflowRunId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this


if (remainingRunsToEnqueueCount > 0 && currentNotStartedRunsCount > 0) {
await this.messageQueueService.add<{ workspaceId: string }>(
WorkflowEnqueueAwaitingRunsJob.name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass prioritiarty workflowRunId if needed here

@@ -244,4 +253,136 @@ export class WorkflowRunnerWorkspaceService {
status: newStatus,
};
}

private async checkThrottleLimits(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check only hard limit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to handle soft here

thomtrp and others added 4 commits December 2, 2025 17:48
…rkflow-run-queue/jobs/workflow-enqueue-awaiting-runs.job.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@thomtrp thomtrp force-pushed the tt-workflow-throttling-v2 branch from 32ab36d to acba2b0 Compare December 2, 2025 17:40
workflowRunId,
});

await this.messageQueueService.add<RunWorkflowJobData>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should also go through workflow-run-enqueue job

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enqueue notion means NOT_STARTED => ENQUEUE. The service only looks for not started workflows. In case of a running workflow, like here, we still need to run the job separately. So three cases: delays (here), forms and when a workflow has two many steps, we cut and re-send the job in the queue


export type WorkflowRunEnqueueJobData = {
workspaceId: string;
workflowRunId?: string;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prioritaryWorkflowRunId

@thomtrp thomtrp enabled auto-merge (squash) December 3, 2025 08:59
@thomtrp thomtrp merged commit a0f196e into main Dec 3, 2025
53 checks passed
@thomtrp thomtrp deleted the tt-workflow-throttling-v2 branch December 3, 2025 09:01
@twenty-eng-sync
Copy link
Copy Markdown

Hey @thomtrp! After you've done the QA of your Pull Request, you can mark it as done here. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants