Move query matching before publication#17121
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
3 issues found across 21 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="packages/twenty-server/src/engine/workspace-event-emitter/utils/wrap-async-iterator-with-cleanup.ts">
<violation number="1" location="packages/twenty-server/src/engine/workspace-event-emitter/utils/wrap-async-iterator-with-cleanup.ts:7">
P2: If `onClose()` throws an error, `iterator.return()` will never be called, potentially causing a resource leak. Consider wrapping `onClose()` in a try-finally block to ensure the underlying iterator is always closed.</violation>
</file>
<file name="packages/twenty-server/src/engine/subscriptions/event-stream.service.ts">
<violation number="1" location="packages/twenty-server/src/engine/subscriptions/event-stream.service.ts:74">
P1: This method will always return an empty array in Redis environments. The data is stored using `setAdd()` (which uses Redis SADD command for SET data type), but retrieved using `get()` (which uses GET for STRING data type). These are incompatible Redis data types. Consider adding a `setMembers` method to `CacheStorageService` that uses `SMEMBERS` command, or use a different storage approach.</violation>
</file>
<file name="packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts">
<violation number="1" location="packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts:101">
P2: Resource leak: if `subscribeToEventStream` throws after `createEventStream` succeeds, the event stream will never be cleaned up. Wrap the subscription in a try-catch to ensure cleanup on failure.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
...s/twenty-server/src/engine/workspace-event-emitter/utils/wrap-async-iterator-with-cleanup.ts
Show resolved
Hide resolved
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts
Outdated
Show resolved
Hide resolved
ff93b25 to
ce4c6db
Compare
|
🚀 Preview Environment Ready! Your preview environment is available at: http://bore.pub:23159 This environment will automatically shut down when the PR is closed or after 5 hours. |
📊 API Changes ReportGraphQL Schema ChangesGraphQL Schema Changes[log] [log] ✖ Field onSubscriptionMatch was removed from object type Subscription GraphQL Metadata Schema ChangesGraphQL Metadata Schema Changes[log] [log] ✖ Field onSubscriptionMatch was removed from object type Subscription
|
ce4c6db to
4dd30ee
Compare
|
Leaving it to you @thomtrp as tests are red :) |
| }): Promise<void> { | ||
| const key = this.getEventStreamKey(workspaceId, eventStreamChannelId); | ||
| const streamData: EventStreamData = { | ||
| userId, |
There was a problem hiding this comment.
What about api keys / agents mode (that are not running behind a user). Should we use an authContext here instead?
| ): Promise<void> { | ||
| const workspaceId = workspaceEventBatch.workspaceId; | ||
|
|
||
| const activeStreamIds = |
There was a problem hiding this comment.
This might not get cleared and have stale data due to missing TTL
| return; | ||
| } | ||
|
|
||
| const streamsData = await this.eventStreamService.getStreamsData( |
There was a problem hiding this comment.
This however would be cleared, meaning you might iterate below hover dead steam data with your
if (!isDefined(streamData)) {
continue;
}for no reason and this would add up.
|
|
||
| for (const [streamChannelId, streamData] of streamsData) { | ||
| if (!isDefined(streamData)) { | ||
| continue; |
There was a problem hiding this comment.
instead of continue, you could also add some "pruning" logic here for the issue I've mentioned above
0e63cd9 to
6b7444c
Compare
| iterator = await this.subscriptionService.subscribeToEventStream({ | ||
| workspaceId: workspace.id, | ||
| eventStreamChannelId, | ||
| }); |
There was a problem hiding this comment.
Bug: A client disconnect before a GraphQL subscription starts iterating can cause the cleanup logic in wrapAsyncIteratorWithCleanup to be skipped, orphaning the event stream in Redis.
Severity: HIGH
Suggested Fix
The cleanup mechanism relies on the GraphQL async iterator's return() method, which is not guaranteed to be called if the client disconnects before iteration begins. Implement a more robust cleanup mechanism, for example by using a try...finally block around the subscription processing logic or handling the disconnect event on the server to trigger the destroyEventStream call explicitly, ensuring resources are always released.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location:
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts#L110-L113
Potential issue: The cleanup logic for the `onEventSubscription` resolver, which calls
`destroyEventStream`, is wrapped in `wrapAsyncIteratorWithCleanup`. This cleanup relies
on the async iterator's `return()` method. If a client disconnects after the resolver
returns the iterator but before the GraphQL engine begins iterating over it, the
`return()` method is never called. This orphans the event stream data and its ID in
Redis until the 30-minute TTL expires. Consequently, this leads to a memory leak in
Redis and causes the server to waste CPU cycles attempting to publish events to
disconnected clients.
Did we get this right? 👍 / 👎 to inform future reviews.
1 similar comment
- new channel EVENT_STREAM_CHANNEL based on event stream id - on event, perform the matching and publish only to the right streams - store a list of active streams per workspace - store the user id along with the queries for each stream Bonus: - remove onSubscriptionMatch
Bonus: