Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/test-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ jobs:
echo "✅ All feature flags are properly configured"
- name: Check subblock ID stability
run: |
if [ "${{ github.event_name }}" = "pull_request" ]; then
BASE_REF="origin/${{ github.base_ref }}"
git fetch --depth=1 origin "${{ github.base_ref }}" 2>/dev/null || true
else
BASE_REF="HEAD~1"
fi
bun run apps/sim/scripts/check-subblock-id-stability.ts "$BASE_REF"
- name: Lint code
run: bun run lint:check

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import {
markOutgoingEdgesFromOutput,
updateActiveBlockRefCount,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
Expand Down Expand Up @@ -63,7 +66,7 @@ interface DebugValidationResult {
interface BlockEventHandlerConfig {
workflowId?: string
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
Expand Down Expand Up @@ -335,13 +338,9 @@ export function useWorkflowExecution() {
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}

const markIncomingEdges = (blockId: string) => {
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
if (!workflowId) return
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
incomingEdges.forEach((edge) => {
const status = edge.sourceHandle === 'error' ? 'error' : 'success'
setEdgeRunStatus(workflowId, edge.id, status)
})
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
}

const isContainerBlockType = (blockType?: string) => {
Expand Down Expand Up @@ -460,7 +459,6 @@ export function useWorkflowExecution() {
const onBlockStarted = (data: BlockStartedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, true)
markIncomingEdges(data.blockId)

if (!includeStartConsoleEntry || !workflowId) return

Expand All @@ -487,6 +485,7 @@ export function useWorkflowExecution() {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
Expand All @@ -505,7 +504,9 @@ export function useWorkflowExecution() {
}

if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
return
const output = data.output as Record<string, any> | undefined
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
if (!isEmptySubflow) return
}

accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
Expand All @@ -527,6 +528,7 @@ export function useWorkflowExecution() {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
markOutgoingEdges(data.blockId, { error: data.error })

executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,62 @@ export function updateActiveBlockRefCount(
}
}

/**
* Determines if a workflow edge should be marked as active based on its handle and the block output.
* Mirrors the executor's EdgeManager.shouldActivateEdge logic on the client side.
* Exclude sentinel handles here
*/
function shouldActivateEdgeClient(
handle: string | null | undefined,
output: Record<string, any> | undefined
): boolean {
if (!handle) return true

if (handle.startsWith('condition-')) {
return output?.selectedOption === handle.substring('condition-'.length)
}

if (handle.startsWith('router-')) {
return output?.selectedRoute === handle.substring('router-'.length)
}

switch (handle) {
case 'error':
return !!output?.error
case 'source':
return !output?.error
case 'loop-start-source':
case 'loop-end-source':
case 'parallel-start-source':
case 'parallel-end-source':
return true
default:
return true
}
}

export function markOutgoingEdgesFromOutput(
blockId: string,
output: Record<string, any> | undefined,
workflowEdges: Array<{
id: string
source: string
target: string
sourceHandle?: string | null
}>,
workflowId: string,
setEdgeRunStatus: (wfId: string, edgeId: string, status: 'success' | 'error') => void
): void {
const outgoing = workflowEdges.filter((edge) => edge.source === blockId)
for (const edge of outgoing) {
const handle = edge.sourceHandle
if (shouldActivateEdgeClient(handle, output)) {
const status = handle === 'error' ? 'error' : output?.error ? 'error' : 'success'
setEdgeRunStatus(workflowId, edge.id, status)
}
}
}

export interface WorkflowExecutionOptions {
workflowInput?: any
onStream?: (se: StreamingExecution) => Promise<void>
Expand Down Expand Up @@ -135,13 +191,6 @@ export async function executeWorkflowWithFullLogging(
true
)
setActiveBlocks(wfId, new Set(activeBlocksSet))

const incomingEdges = workflowEdges.filter(
(edge) => edge.target === event.data.blockId
)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(wfId, edge.id, 'success')
})
break
}

Expand All @@ -155,6 +204,13 @@ export async function executeWorkflowWithFullLogging(
setActiveBlocks(wfId, new Set(activeBlocksSet))

setBlockRunStatus(wfId, event.data.blockId, 'success')
markOutgoingEdgesFromOutput(
event.data.blockId,
event.data.output,
workflowEdges,
wfId,
setEdgeRunStatus
)

addConsole({
input: event.data.input || {},
Expand Down Expand Up @@ -194,6 +250,13 @@ export async function executeWorkflowWithFullLogging(
setActiveBlocks(wfId, new Set(activeBlocksSet))

setBlockRunStatus(wfId, event.data.blockId, 'error')
markOutgoingEdgesFromOutput(
event.data.blockId,
{ error: event.data.error },
workflowEdges,
wfId,
setEdgeRunStatus
)

addConsole({
input: event.data.input || {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ interface PreviewWorkflowProps {
/** Cursor style to show when hovering the canvas */
cursorStyle?: 'default' | 'pointer' | 'grab'
/** Map of executed block IDs to their status for highlighting the execution path */
executedBlocks?: Record<string, { status: string }>
executedBlocks?: Record<string, { status: string; output?: unknown }>
/** Currently selected block ID for highlighting */
selectedBlockId?: string | null
/** Skips expensive subblock computations for thumbnails/template previews */
Expand Down Expand Up @@ -274,9 +274,9 @@ export function PreviewWorkflow({

/** Maps base block IDs to execution data, handling parallel iteration variants (blockId₍n₎). */
const blockExecutionMap = useMemo(() => {
if (!executedBlocks) return new Map<string, { status: string }>()
if (!executedBlocks) return new Map<string, { status: string; output?: unknown }>()

const map = new Map<string, { status: string }>()
const map = new Map<string, { status: string; output?: unknown }>()
for (const [key, value] of Object.entries(executedBlocks)) {
// Extract base ID (remove iteration suffix like ₍0₎)
const baseId = key.includes('₍') ? key.split('₍')[0] : key
Expand Down Expand Up @@ -451,7 +451,6 @@ export function PreviewWorkflow({
const edges: Edge[] = useMemo(() => {
if (!isValidWorkflowState) return []

/** Edge is green if target executed and source condition met by edge type. */
const getEdgeExecutionStatus = (edge: {
source: string
target: string
Expand All @@ -463,17 +462,40 @@ export function PreviewWorkflow({
if (!targetStatus?.executed) return 'not-executed'

const sourceStatus = getBlockExecutionStatus(edge.source)
const { sourceHandle } = edge
if (!sourceStatus?.executed) return 'not-executed'

if (sourceHandle === 'error') {
return sourceStatus?.status === 'error' ? 'success' : 'not-executed'
const handle = edge.sourceHandle
if (!handle) {
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
}

if (sourceHandle === 'loop-start-source' || sourceHandle === 'parallel-start-source') {
return 'success'
const sourceOutput = blockExecutionMap.get(edge.source)?.output as
| Record<string, any>
| undefined

if (handle.startsWith('condition-')) {
const conditionValue = handle.substring('condition-'.length)
return sourceOutput?.selectedOption === conditionValue ? 'success' : 'not-executed'
}

if (handle.startsWith('router-')) {
const routeId = handle.substring('router-'.length)
return sourceOutput?.selectedRoute === routeId ? 'success' : 'not-executed'
}

return sourceStatus?.status === 'success' ? 'success' : 'not-executed'
switch (handle) {
case 'error':
return sourceStatus.status === 'error' ? 'error' : 'not-executed'
case 'source':
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
case 'loop-start-source':
case 'loop-end-source':
case 'parallel-start-source':
case 'parallel-end-source':
return 'success'
default:
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
}
}

return (workflowState.edges || []).map((edge) => {
Expand Down
Loading