Conversation
8bb9879 to
1d88ad3
Compare
Greptile SummaryThis PR improves workflow cron job performance by introducing a lite workspace context (skipping feature flags, permissions, indexes and RLS) that cuts workspace loading time from ~200–300ms to <100ms, and by parallelising per-workspace checks with Key changes:
Confidence Score: 4/5
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Cron Job fires] --> B[Load all active workspaces]
B --> C{Batch workspaces<br/>in groups of 50}
C --> D[Promise.allSettled per batch]
D --> E{checkAndEnqueue<br/>per workspace}
E -->|lite context| F[loadLiteWorkspaceContext<br/>skip flags/perms/indexes]
F --> G{hasRuns/hasStaledRuns<br/>hasNotStartedRuns?}
G -->|yes| H[Enqueue job]
G -->|no| I[Skip]
H --> J[WorkflowCleanWorkflowRunsJob]
J --> K[deleteOldRuns loop<br/>DELETE … RETURNING, BATCH=200]
K -->|deletedCount > 0| K
K -->|done| L[deleteExcessRunsPerWorkflow loop<br/>CTE + DELETE … RETURNING, BATCH=200]
L -->|deletedCount > 0| L
L -->|done| M[Log total deleted]
Last reviewed commit: 4a7995a |
| `, | ||
| ); | ||
|
|
||
| deletedCount = result[0].length; |
There was a problem hiding this comment.
Crash bug: result[0].length always causes TypeError or incorrect batching
dataSource.query(DELETE ... RETURNING id) returns an array of row objects, e.g. [{id: 'uuid1'}, {id: 'uuid2'}, ...]. Therefore:
result[0]is the first row object{id: 'uuid1'}, not the rows arrayresult[0].lengthisundefinedon a row object — makingdeletedCount = undefinedand thewhile (undefined > 0)check alwaysfalse
This has two consequences:
- The batch loop always exits after one iteration, so only the first 200 rows are ever deleted per job run
- When there are no rows to delete (e.g. when only
deleteExcessRunsPerWorkflowhas work butdeleteOldRunsdoes not),resultis[],result[0]isundefined, and accessingundefined.lengththrows a TypeError, crashing the job
The same issue exists at the equivalent line in deleteExcessRunsPerWorkflow (line 129).
The fix is to use result.length in both places:
| deletedCount = result[0].length; | |
| deletedCount = result.length; |
There was a problem hiding this comment.
TypeORM's dataSource.query() returns a tuple [rows, affectedCount]
| await workflowRunRepository.delete(workflowRunToDelete.id); | ||
| } | ||
| deletedCount = result[0].length; | ||
| totalDeleted += deletedCount; |
There was a problem hiding this comment.
Same result[0].length bug in deleteExcessRunsPerWorkflow
Same issue as in deleteOldRuns above — result is the array of returned row objects from TypeORM, so result[0] is a single row object {id: '...'} with no .length. The batch loop will always exit after the first iteration.
| totalDeleted += deletedCount; | |
| deletedCount = result.length; |
packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts
Show resolved
Hide resolved
...orkflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-handle-staled-runs.cron.job.ts
Outdated
Show resolved
Hide resolved
...les/workflow/workflow-runner/workflow-run-queue/utils/get-runs-to-clean-find-options.util.ts
Show resolved
Hide resolved
...modules/workflow/workflow-runner/workflow-run-queue/jobs/workflow-clean-workflow-runs.job.ts
Show resolved
Hide resolved
There was a problem hiding this comment.
2 issues found across 12 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/jobs/workflow-clean-workflow-runs.job.ts">
<violation number="1" location="packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/jobs/workflow-clean-workflow-runs.job.ts:88">
P2: `dataSource.query` results are row arrays; `result[0].length` is undefined and breaks the batching logic. Count rows with `result.length` instead.</violation>
</file>
<file name="packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts">
<violation number="1" location="packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts:165">
P1: The new `hasStepsToSkipOrFailSafely` check uses `isDefined` instead of array length, so empty skip/fail lists are treated as work to do and can prevent terminal workflow runs from being marked COMPLETED.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts
Show resolved
Hide resolved
...modules/workflow/workflow-runner/workflow-run-queue/jobs/workflow-clean-workflow-runs.job.ts
Show resolved
Hide resolved
|
@greptile-ai |
9412aed to
4a7995a
Compare
Additional Comments (1)
Consider adding an explicit assertion to lock in this behavior: Context Used: Rule from |
| let deletedCount: number; | ||
|
|
||
| do { | ||
| const result = await this.dataSource.query( |
There was a problem hiding this comment.
If you really can't avoid raw queries, can we at least write it like this
const result = await this.dataSource.query(
`
DELETE FROM ${schemaName}."workflowRun"
WHERE id IN (
SELECT id FROM ${schemaName}."workflowRun"
WHERE status IN ($1, $2)
AND "createdAt" < NOW() - MAKE_INTERVAL(days => $3)
LIMIT $4
)
RETURNING id;
`,
[
WorkflowRunStatus.COMPLETED,
WorkflowRunStatus.FAILED,
RUNS_TO_CLEAN_THRESHOLD_DAYS,
batchSize,
],
);| @@ -81,6 +87,21 @@ export class WorkflowRunEnqueueCronJob { | |||
| ); | |||
| } | |||
|
|
|||
| private async checkAndEnqueue(workspaceId: string): Promise<boolean> { | |||
| const hasNotStartedRuns = await this.hasNotStartedRuns(workspaceId); | |||
There was a problem hiding this comment.
I'm wondering if we actually want to run 50 times executeInWorkspaceContext concurrently (as we know this is quite heavy). Or if we really want to do that I'd start with something lower like 10 🤔
| authContext, | ||
| flatObjectMetadataMaps, | ||
| flatFieldMetadataMaps, | ||
| flatIndexMaps: { |
There was a problem hiding this comment.
note: Ideally those maps should throw if they are accessed in the ORM in "lite" mode. We should allow this new mode only in controlled paths where we know a limited subset of cache is accessed otherwise some things could fail silentely with empty maps
There was a problem hiding this comment.
Asked quickly AI and looks a bit complicated. Let me know if you want that we dive into this
There was a problem hiding this comment.
Asked quickly AI and looks a bit complicated. Let me know if you want that we dive into this
Ah @charlesBochet already merged it. Ok @thomtrp I'll show you what I had in mind tmr. 👍
Workflow crons take a few minutes to run. Loading each repo takes ~200 to 300ms locally. Adding a lite mode so it takes less than 100ms.
Also doing batch promises.
Finally, cleaning runs timeout when there are too many. Doing batches as well.