refactor(cloud-agent-next): extract failExecution to deduplicate failure cleanup#916
Conversation
…ution failure cleanup Deduplicate the status-update → clear-active → clear-interrupt → broadcast sequence into a single idempotent failExecution() method. Exposes it via failExecutionRpc() so the interrupt handler can use it too, replacing three separate DO calls with one. Adds integration tests for reaper idempotency and interrupt-flag cleanup.
…ackoff Transient WebSocket drops no longer kill the execution. On unexpected close the wrapper retries up to 5 times (1s, 2s, 4s, 8s, 16s) while the SSE consumer stays alive and events buffer automatically. Heartbeat pauses during reconnection and resumes on success. SSE health checks are skipped while reconnecting to avoid false inactivity timeouts. Close code 4000 (execution done) and explicit close() calls bypass reconnection. onDisconnect only fires after all attempts are exhausted.
Add 3 integration tests covering the failExecutionRpc public RPC method: - Basic cleanup: status, active exec, interrupt flag, error event - Idempotency: returns false for already-terminal executions - Custom streamEventType propagation (e.g. wrapper_disconnected)
|
|
||
| // 2. Clear active execution (idempotent safety net — updateStatus clears it | ||
| // internally for the active execution, but another may have been set) | ||
| await this.executionQueries.clearActiveExecution(); |
There was a problem hiding this comment.
WARNING: Unconditional cleanup can erase a newer active execution
updateExecutionStatus() already clears active_execution_id only when it still matches executionId. This extra clearActiveExecution() runs after multiple awaits, so if another execution starts in between, we'll delete the new active marker instead. The clearInterrupt() call right below has the same problem and can drop the next execution's interrupt request.
Code Review SummaryStatus: 2 Issues Found | Recommendation: Address before merge Overview
Fix these issues in Kilo Cloud Issue Details (click to expand)WARNING
Other Observations (not in diff)N/A Files Reviewed (11 files)
|
…on race conditions Address PR review feedback: - Add 35s disconnect grace period in webSocketClose before failing execution, giving the wrapper time to reconnect via exponential backoff - Add generation counter in wrapper reconnect loop to prevent stale in-flight openIngestWs() from installing zombie sockets after close() - Reset closedByUs flag in close() to prevent it leaking into next execution - Extract cancelDisconnectGrace, startDisconnectGrace, handleDisconnectGraceExpired, completeReconnect, and discardStaleReconnect for readability
…getWebSockets() The in-memory activeConnections Map was lossy across DO hibernation — after wake, the map was empty but hibernated sockets still existed. This meant handleIngestRequest couldn't find/close old sockets on reconnection, potentially leaving stale hibernated sockets alive. Replace all activeConnections usage with state.getWebSockets() which is the Cloudflare hibernation API's authoritative source — it survives hibernation and excludes already-disconnected sockets. This matches how stream.ts already works (no in-memory tracking). - handleIngestRequest: use getWebSockets to close all existing sockets including hibernated ones - handleIngestClose: check remaining sockets instead of identity compare - hasActiveConnection: delegate to getWebSockets - Remove unused getActiveConnectionCount and getConnection methods
…to survive hibernation Replace setTimeout-based grace timer with DO storage + alarm approach. The in-memory timer was lost during DO hibernation, causing the grace period to silently expire and leave executions stuck in running state. - Persist grace state under 'disconnect_grace' storage key - Reschedule alarm to fire at grace deadline (10s) - Reduce grace period from 35s to 10s, wrapper attempts from 5 to 3 - Add 3 integration tests for grace period scenarios
jeanduplessis
left a comment
There was a problem hiding this comment.
Approving with the condition that this seems worth attending to before merge: #916 (comment)
…ackoff (#917) ## Summary Adds WebSocket reconnection with exponential backoff for the wrapper's ingest WS connection. Previously, any transient WS drop would kill the execution immediately. Now the wrapper retries up to 5 times (1s, 2s, 4s, 8s, 16s backoff) while keeping the SSE consumer alive and buffering events automatically. Key design decisions: - **Event buffering**: SSE events buffer in memory during disconnection and flush (with a `wrapper_resumed` marker) on reconnect. Buffer is capped at 1000 events with an overflow flag. - **Heartbeat lifecycle**: Heartbeat pauses on unexpected close and resumes after successful reconnect. - **SSE health check suppression**: Lifecycle manager skips SSE inactivity checks while `isReconnecting()` is true, preventing false timeout aborts. - **Non-reconnectable close codes**: Close code `4000` (execution done from DO) and explicit `close()` calls bypass reconnection entirely. - **State sync**: After reconnect, `state.setConnections()` is called to keep the state's WS reference consistent. - `onDisconnect` only fires after all reconnection attempts are exhausted or for non-reconnectable close codes. ## Verification - [x] `pnpm run typecheck` — passed (tsgo + wrapper tsc) - [x] Manually tested everything in parent branches too ## Visual Changes N/A ## Reviewer Notes - This is Phase 3 of the reaper plan — Phase 4 (timer simplification) builds on this to tighten the stale threshold from 10min to 2min, since reconnection now handles transient drops. - No DO-side changes needed — the existing ingest handler replaces old connections for the same `executionId`, and the `pending → running` transition is idempotent. - The `attemptReconnect()` function has a re-entrancy guard (`if (reconnecting) return`) as a defensive measure against hypothetical double `onclose` firings.
| if (event.code === CLOSE_CODE_EXECUTION_DONE) { | ||
| // DO says execution is done — don't reconnect | ||
| logToFile('ingest WS closed by DO (execution done) — not reconnecting'); | ||
| callbacks.onDisconnect('ingest websocket closed (execution done)'); |
There was a problem hiding this comment.
WARNING: Expected completion is routed through the disconnect failure path
onDisconnect in cloud-agent-next/wrapper/src/main.ts aborts the Kilo session, marks the run aborted, and suppresses the final complete event. Calling it for the "execution done" close code will therefore turn a normal shutdown into a failure path as soon as the DO starts using code 4000.
…d route completion through failure path
0bd8b24
into
eshurakov/cloud-agent-next-wrapper-fix
| sessionId, | ||
| streamEventType, | ||
| payload: JSON.stringify({ | ||
| error, |
There was a problem hiding this comment.
WARNING: wrapper_disconnected changes its payload contract
Before this refactor, /stream clients received { reason, wsCloseCode, wsCloseReason } for wrapper_disconnected. Routing this event through the generic failExecution() payload builder replaces reason with error and adds fatal, which is a breaking change for any consumer that reads the old shape while the event type stays the same.
| if (activeExecutionId === executionId) { | ||
| await this.executionQueries.clearActiveExecution(); | ||
| } | ||
| await this.executionQueries.clearInterrupt(); |
There was a problem hiding this comment.
WARNING: Interrupt cleanup can still clear the next execution's request
wasActive only tells us this execution owned the slot before updateExecutionStatus(). If another execution becomes active before this branch finishes, the unconditional clearInterrupt() will delete that new execution's interrupt flag even though the activeExecutionId === executionId guard already told us ownership changed.
Summary
Extracts the repeated execution-failure sequence (update status → clear active execution → clear interrupt → broadcast stream event) into a single idempotent
failExecution()private method onCloudAgentSession. Four call sites that previously duplicated this 20+ line pattern now delegate to it: wrapper disconnect, reaper stale-running timeout, reaper pending-start timeout, and the orchestrator catch block.A public
failExecutionRpc()wrapper is added so external callers (the interrupt handler insession-management.ts) can perform a full failure with cleanup in a single DO call, replacing three separatewithDORetrycalls (updateExecutionStatus,clearActiveExecution,emitExecutionError) with one.Two new integration tests verify reaper idempotency (second alarm after failure produces no duplicate events) and interrupt-flag cleanup (reaper clears the interrupt flag when failing a stale execution).
Visual Changes
N/A
Reviewer Notes
wrapper_disconnectedstream event payload shape changed slightly: old{ reason, wsCloseCode, wsCloseReason }→ new{ error, fatal: true, wsCloseCode, wsCloseReason }. No consumer in the codebase reads thereasonfield (frontend checks only thestreamEventTypediscriminator), so this is a safe change.failExecutionRpcusesas ExecutionId— consistent with existing codebase convention at other RPC/WebSocket boundaries (14 occurrences insrc/).onExecutionCompletewas intentionally not refactored to usefailExecutionbecause it handles all terminal statuses (includingcompleted) and has different active-execution clearing logic (conditional on matching ID rather than unconditional).