diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 8e6a16bc7a1..ca5493d78e7 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -457,6 +457,9 @@ impl EntityCache { for (key, op) in other.updates { self.entity_op(key, op); } + // Carry forward vid_seq to prevent VID collisions when the caller + // continues writing entities after merging. + self.vid_seq = self.vid_seq.max(other.vid_seq); } /// Generate an id. diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 0d9c0d2b3bc..d72dd21e99c 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -537,7 +537,7 @@ impl HostExports { let host_metrics = wasm_ctx.host_metrics.clone(); let valid_module = wasm_ctx.valid_module.clone(); - let ctx = wasm_ctx.ctx.derive_with_empty_block_state(); + let mut ctx = wasm_ctx.ctx.derive_with_empty_block_state(); let callback = callback.to_owned(); // Create a base error message to avoid borrowing headaches let errmsg = format!( @@ -560,9 +560,14 @@ impl HostExports { let mut v = Vec::new(); while let Some(sv) = stream.next().await { let sv = sv?; + let mut derived = ctx.derive_with_empty_block_state(); + // Continue the vid sequence from the previous callback so + // each iteration doesn't reset to RESERVED_VIDS and + // produce duplicate VIDs. + derived.state.entity_cache.vid_seq = ctx.state.entity_cache.vid_seq; let module = WasmInstance::from_valid_module_with_ctx_boxed( valid_module.clone(), - ctx.derive_with_empty_block_state(), + derived, host_metrics.clone(), wasm_ctx.experimental_features, ) @@ -570,6 +575,9 @@ impl HostExports { let result = module .handle_json_callback(&callback, &sv.value, &user_data) .await?; + // Carry forward vid_seq so the next iteration continues + // the sequence. + ctx.state.entity_cache.vid_seq = result.entity_cache.vid_seq; // Log progress every 15s if last_log.elapsed() > Duration::from_secs(15) { debug!(