Skip to content

refactor(cloud-agent-next): extract failExecution to deduplicate failure cleanup#916

Merged
eshurakov merged 9 commits intoeshurakov/cloud-agent-next-wrapper-fixfrom
eshurakov/cloud-agent-next-wrapper-fix-2
Mar 9, 2026
Merged

refactor(cloud-agent-next): extract failExecution to deduplicate failure cleanup#916
eshurakov merged 9 commits intoeshurakov/cloud-agent-next-wrapper-fixfrom
eshurakov/cloud-agent-next-wrapper-fix-2

Conversation

@eshurakov
Copy link
Contributor

@eshurakov eshurakov commented Mar 7, 2026

Summary

Extracts the repeated execution-failure sequence (update status → clear active execution → clear interrupt → broadcast stream event) into a single idempotent failExecution() private method on CloudAgentSession. Four call sites that previously duplicated this 20+ line pattern now delegate to it: wrapper disconnect, reaper stale-running timeout, reaper pending-start timeout, and the orchestrator catch block.

A public failExecutionRpc() wrapper is added so external callers (the interrupt handler in session-management.ts) can perform a full failure with cleanup in a single DO call, replacing three separate withDORetry calls (updateExecutionStatus, clearActiveExecution, emitExecutionError) with one.

Two new integration tests verify reaper idempotency (second alarm after failure produces no duplicate events) and interrupt-flag cleanup (reaper clears the interrupt flag when failing a stale execution).

Visual Changes

N/A

Reviewer Notes

  • The wrapper_disconnected stream event payload shape changed slightly: old { reason, wsCloseCode, wsCloseReason } → new { error, fatal: true, wsCloseCode, wsCloseReason }. No consumer in the codebase reads the reason field (frontend checks only the streamEventType discriminator), so this is a safe change.
  • failExecutionRpc uses as ExecutionId — consistent with existing codebase convention at other RPC/WebSocket boundaries (14 occurrences in src/).
  • onExecutionComplete was intentionally not refactored to use failExecution because it handles all terminal statuses (including completed) and has different active-execution clearing logic (conditional on matching ID rather than unconditional).

…ution failure cleanup

Deduplicate the status-update → clear-active → clear-interrupt → broadcast
sequence into a single idempotent failExecution() method. Exposes it via
failExecutionRpc() so the interrupt handler can use it too, replacing three
separate DO calls with one. Adds integration tests for reaper idempotency
and interrupt-flag cleanup.
…ackoff

Transient WebSocket drops no longer kill the execution. On unexpected
close the wrapper retries up to 5 times (1s, 2s, 4s, 8s, 16s) while
the SSE consumer stays alive and events buffer automatically. Heartbeat
pauses during reconnection and resumes on success. SSE health checks
are skipped while reconnecting to avoid false inactivity timeouts.

Close code 4000 (execution done) and explicit close() calls bypass
reconnection. onDisconnect only fires after all attempts are exhausted.
Add 3 integration tests covering the failExecutionRpc public RPC method:
- Basic cleanup: status, active exec, interrupt flag, error event
- Idempotency: returns false for already-terminal executions
- Custom streamEventType propagation (e.g. wrapper_disconnected)
@eshurakov eshurakov marked this pull request as ready for review March 8, 2026 19:33

// 2. Clear active execution (idempotent safety net — updateStatus clears it
// internally for the active execution, but another may have been set)
await this.executionQueries.clearActiveExecution();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Unconditional cleanup can erase a newer active execution

updateExecutionStatus() already clears active_execution_id only when it still matches executionId. This extra clearActiveExecution() runs after multiple awaits, so if another execution starts in between, we'll delete the new active marker instead. The clearInterrupt() call right below has the same problem and can drop the next execution's interrupt request.

@kilo-code-bot
Copy link
Contributor

kilo-code-bot bot commented Mar 8, 2026

Code Review Summary

Status: 2 Issues Found | Recommendation: Address before merge

Overview

Severity Count
CRITICAL 0
WARNING 2
SUGGESTION 0

Fix these issues in Kilo Cloud

Issue Details (click to expand)

WARNING

File Line Issue
cloud-agent-next/src/persistence/CloudAgentSession.ts 1381 wrapper_disconnected now emits a different payload shape than before, which is a breaking stream-contract change.
cloud-agent-next/src/persistence/CloudAgentSession.ts 2110 clearInterrupt() can still erase a newer execution's interrupt flag after ownership changes.
Other Observations (not in diff)

N/A

Files Reviewed (11 files)
  • cloud-agent-next/src/persistence/CloudAgentSession.ts - 2 issues
  • cloud-agent-next/src/router/handlers/session-management.ts
  • cloud-agent-next/src/session/ingest-handlers/execution-lifecycle.ts
  • cloud-agent-next/src/websocket/ingest.ts
  • cloud-agent-next/src/websocket/ingest.test.ts
  • cloud-agent-next/test/integration/session/disconnect-and-reaper.test.ts
  • cloud-agent-next/test/unit/wrapper/lifecycle.test.ts
  • cloud-agent-next/test/unit/wrapper/reconnection.test.ts
  • cloud-agent-next/wrapper/src/connection.ts
  • cloud-agent-next/wrapper/src/lifecycle.ts
  • cloud-agent-next/wrapper/src/main.ts

…on race conditions

Address PR review feedback:

- Add 35s disconnect grace period in webSocketClose before failing execution,
  giving the wrapper time to reconnect via exponential backoff
- Add generation counter in wrapper reconnect loop to prevent stale in-flight
  openIngestWs() from installing zombie sockets after close()
- Reset closedByUs flag in close() to prevent it leaking into next execution
- Extract cancelDisconnectGrace, startDisconnectGrace, handleDisconnectGraceExpired,
  completeReconnect, and discardStaleReconnect for readability
…getWebSockets()

The in-memory activeConnections Map was lossy across DO hibernation —
after wake, the map was empty but hibernated sockets still existed.
This meant handleIngestRequest couldn't find/close old sockets on
reconnection, potentially leaving stale hibernated sockets alive.

Replace all activeConnections usage with state.getWebSockets() which
is the Cloudflare hibernation API's authoritative source — it survives
hibernation and excludes already-disconnected sockets. This matches
how stream.ts already works (no in-memory tracking).

- handleIngestRequest: use getWebSockets to close all existing sockets
  including hibernated ones
- handleIngestClose: check remaining sockets instead of identity compare
- hasActiveConnection: delegate to getWebSockets
- Remove unused getActiveConnectionCount and getConnection methods
…to survive hibernation

Replace setTimeout-based grace timer with DO storage + alarm approach.
The in-memory timer was lost during DO hibernation, causing the grace
period to silently expire and leave executions stuck in running state.

- Persist grace state under 'disconnect_grace' storage key
- Reschedule alarm to fire at grace deadline (10s)
- Reduce grace period from 35s to 10s, wrapper attempts from 5 to 3
- Add 3 integration tests for grace period scenarios
Copy link
Contributor

@jeanduplessis jeanduplessis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving with the condition that this seems worth attending to before merge: #916 (comment)

…ackoff (#917)

## Summary

Adds WebSocket reconnection with exponential backoff for the wrapper's
ingest WS connection. Previously, any transient WS drop would kill the
execution immediately. Now the wrapper retries up to 5 times (1s, 2s,
4s, 8s, 16s backoff) while keeping the SSE consumer alive and buffering
events automatically.

Key design decisions:
- **Event buffering**: SSE events buffer in memory during disconnection
and flush (with a `wrapper_resumed` marker) on reconnect. Buffer is
capped at 1000 events with an overflow flag.
- **Heartbeat lifecycle**: Heartbeat pauses on unexpected close and
resumes after successful reconnect.
- **SSE health check suppression**: Lifecycle manager skips SSE
inactivity checks while `isReconnecting()` is true, preventing false
timeout aborts.
- **Non-reconnectable close codes**: Close code `4000` (execution done
from DO) and explicit `close()` calls bypass reconnection entirely.
- **State sync**: After reconnect, `state.setConnections()` is called to
keep the state's WS reference consistent.
- `onDisconnect` only fires after all reconnection attempts are
exhausted or for non-reconnectable close codes.

## Verification

- [x] `pnpm run typecheck` — passed (tsgo + wrapper tsc)
- [x] Manually tested everything in parent branches too

## Visual Changes

N/A

## Reviewer Notes

- This is Phase 3 of the reaper plan — Phase 4 (timer simplification)
builds on this to tighten the stale threshold from 10min to 2min, since
reconnection now handles transient drops.
- No DO-side changes needed — the existing ingest handler replaces old
connections for the same `executionId`, and the `pending → running`
transition is idempotent.
- The `attemptReconnect()` function has a re-entrancy guard (`if
(reconnecting) return`) as a defensive measure against hypothetical
double `onclose` firings.
if (event.code === CLOSE_CODE_EXECUTION_DONE) {
// DO says execution is done — don't reconnect
logToFile('ingest WS closed by DO (execution done) — not reconnecting');
callbacks.onDisconnect('ingest websocket closed (execution done)');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Expected completion is routed through the disconnect failure path

onDisconnect in cloud-agent-next/wrapper/src/main.ts aborts the Kilo session, marks the run aborted, and suppresses the final complete event. Calling it for the "execution done" close code will therefore turn a normal shutdown into a failure path as soon as the DO starts using code 4000.

@eshurakov eshurakov merged commit 0bd8b24 into eshurakov/cloud-agent-next-wrapper-fix Mar 9, 2026
1 check passed
@eshurakov eshurakov deleted the eshurakov/cloud-agent-next-wrapper-fix-2 branch March 9, 2026 09:40
sessionId,
streamEventType,
payload: JSON.stringify({
error,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: wrapper_disconnected changes its payload contract

Before this refactor, /stream clients received { reason, wsCloseCode, wsCloseReason } for wrapper_disconnected. Routing this event through the generic failExecution() payload builder replaces reason with error and adds fatal, which is a breaking change for any consumer that reads the old shape while the event type stays the same.

if (activeExecutionId === executionId) {
await this.executionQueries.clearActiveExecution();
}
await this.executionQueries.clearInterrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Interrupt cleanup can still clear the next execution's request

wasActive only tells us this execution owned the slot before updateExecutionStatus(). If another execution becomes active before this branch finishes, the unconditional clearInterrupt() will delete that new execution's interrupt flag even though the activeExecutionId === executionId guard already told us ownership changed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants