-
Notifications
You must be signed in to change notification settings - Fork 3.4k
fix(persistence): chunk block and edge inserts to prevent sql variabl… #3428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -407,9 +407,9 @@ export async function loadWorkflowFromNormalizedTables( | |||||||||||||||||||
| if (subflow.type === SUBFLOW_TYPES.LOOP) { | ||||||||||||||||||||
| const loopType = | ||||||||||||||||||||
| (config as Loop).loopType === 'for' || | ||||||||||||||||||||
| (config as Loop).loopType === 'forEach' || | ||||||||||||||||||||
| (config as Loop).loopType === 'while' || | ||||||||||||||||||||
| (config as Loop).loopType === 'doWhile' | ||||||||||||||||||||
| (config as Loop).loopType === 'forEach' || | ||||||||||||||||||||
| (config as Loop).loopType === 'while' || | ||||||||||||||||||||
| (config as Loop).loopType === 'doWhile' | ||||||||||||||||||||
| ? (config as Loop).loopType | ||||||||||||||||||||
| : 'for' | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -446,7 +446,7 @@ export async function loadWorkflowFromNormalizedTables( | |||||||||||||||||||
| distribution: (config as Parallel).distribution ?? '', | ||||||||||||||||||||
| parallelType: | ||||||||||||||||||||
| (config as Parallel).parallelType === 'count' || | ||||||||||||||||||||
| (config as Parallel).parallelType === 'collection' | ||||||||||||||||||||
| (config as Parallel).parallelType === 'collection' | ||||||||||||||||||||
| ? (config as Parallel).parallelType | ||||||||||||||||||||
| : 'count', | ||||||||||||||||||||
| enabled: credMigratedBlocks[subflow.id]?.enabled ?? true, | ||||||||||||||||||||
|
|
@@ -490,6 +490,8 @@ export async function saveWorkflowToNormalizedTables( | |||||||||||||||||||
| tx.delete(workflowSubflows).where(eq(workflowSubflows.workflowId, workflowId)), | ||||||||||||||||||||
| ]) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const CHUNK_SIZE = 50 | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Insert blocks | ||||||||||||||||||||
| if (Object.keys(state.blocks).length > 0) { | ||||||||||||||||||||
| const blockInserts = Object.values(state.blocks).map((block) => ({ | ||||||||||||||||||||
|
|
@@ -512,7 +514,12 @@ export async function saveWorkflowToNormalizedTables( | |||||||||||||||||||
| locked: block.locked ?? false, | ||||||||||||||||||||
| })) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| await tx.insert(workflowBlocks).values(blockInserts) | ||||||||||||||||||||
| // SQLite limits bound parameters to 999 per statement. | ||||||||||||||||||||
| // workflowBlocks has 17 fields -> max safe chunk = floor(999/17) = 58. | ||||||||||||||||||||
| // Using 50 for a conservative margin. | ||||||||||||||||||||
| for (let i = 0; i < blockInserts.length; i += CHUNK_SIZE) { | ||||||||||||||||||||
| await tx.insert(workflowBlocks).values(blockInserts.slice(i, i + CHUNK_SIZE)) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+520
to
+522
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The SQLite hard limit of 999 bound parameters per statement is implicit in the choice of
Suggested change
|
||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Insert edges | ||||||||||||||||||||
|
|
@@ -526,7 +533,9 @@ export async function saveWorkflowToNormalizedTables( | |||||||||||||||||||
| targetHandle: edge.targetHandle || null, | ||||||||||||||||||||
| })) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| await tx.insert(workflowEdges).values(edgeInserts) | ||||||||||||||||||||
| for (let i = 0; i < edgeInserts.length; i += CHUNK_SIZE) { | ||||||||||||||||||||
| await tx.insert(workflowEdges).values(edgeInserts.slice(i, i + CHUNK_SIZE)) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Insert subflows (loops and parallels) | ||||||||||||||||||||
|
|
@@ -553,7 +562,9 @@ export async function saveWorkflowToNormalizedTables( | |||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (subflowInserts.length > 0) { | ||||||||||||||||||||
| await tx.insert(workflowSubflows).values(subflowInserts) | ||||||||||||||||||||
| for (let i = 0; i < subflowInserts.length; i += CHUNK_SIZE) { | ||||||||||||||||||||
| await tx.insert(workflowSubflows).values(subflowInserts.slice(i, i + CHUNK_SIZE)) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -749,11 +760,11 @@ export function regenerateWorkflowStateIds(state: RegenerateStateInput): Regener | |||||||||||||||||||
| blockIdMapping.set(oldId, crypto.randomUUID()) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Map edge IDs | ||||||||||||||||||||
| // Map edge IDs | ||||||||||||||||||||
|
|
||||||||||||||||||||
| ;(state.edges || []).forEach((edge: Edge) => { | ||||||||||||||||||||
| edgeIdMapping.set(edge.id, crypto.randomUUID()) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| ; (state.edges || []).forEach((edge: Edge) => { | ||||||||||||||||||||
| edgeIdMapping.set(edge.id, crypto.randomUUID()) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Map loop IDs | ||||||||||||||||||||
| Object.keys(state.loops || {}).forEach((oldId) => { | ||||||||||||||||||||
|
|
@@ -807,20 +818,20 @@ export function regenerateWorkflowStateIds(state: RegenerateStateInput): Regener | |||||||||||||||||||
| newBlocks[newId] = newBlock | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Regenerate edges with updated source/target references | ||||||||||||||||||||
| // Regenerate edges with updated source/target references | ||||||||||||||||||||
|
|
||||||||||||||||||||
| ;(state.edges || []).forEach((edge: Edge) => { | ||||||||||||||||||||
| const newId = edgeIdMapping.get(edge.id)! | ||||||||||||||||||||
| const newSource = blockIdMapping.get(edge.source) || edge.source | ||||||||||||||||||||
| const newTarget = blockIdMapping.get(edge.target) || edge.target | ||||||||||||||||||||
| ; (state.edges || []).forEach((edge: Edge) => { | ||||||||||||||||||||
| const newId = edgeIdMapping.get(edge.id)! | ||||||||||||||||||||
| const newSource = blockIdMapping.get(edge.source) || edge.source | ||||||||||||||||||||
| const newTarget = blockIdMapping.get(edge.target) || edge.target | ||||||||||||||||||||
|
|
||||||||||||||||||||
| newEdges.push({ | ||||||||||||||||||||
| ...edge, | ||||||||||||||||||||
| id: newId, | ||||||||||||||||||||
| source: newSource, | ||||||||||||||||||||
| target: newTarget, | ||||||||||||||||||||
| newEdges.push({ | ||||||||||||||||||||
| ...edge, | ||||||||||||||||||||
| id: newId, | ||||||||||||||||||||
| source: newSource, | ||||||||||||||||||||
| target: newTarget, | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Regenerate loops with updated node references | ||||||||||||||||||||
| Object.entries(state.loops || {}).forEach(([oldId, loop]) => { | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CHUNK_SIZE is defined inside the
db.transactionasync callback, which means it's re-created on every call tosaveWorkflowToNormalizedTables. Since it's a fixed, never-changing value, consider hoisting it to module scope. This makes the intent clearer and avoids unnecessary re-allocation.Place this near the top of the file alongside other module-level constants (e.g., after the logger definition on line 22).
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!