From 04709bee289c1f088ab3d72c1744d590e301130e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 4 Mar 2026 12:22:37 -0800 Subject: [PATCH 1/3] graph, runtime, core: introduce VidGenerator for shared VID sequences Replace the per-EntityCache vid_seq and seq fields with a shared VidGenerator backed by Arc. Created once per block and cheaply cloned across all EntityCache instances, this eliminates VID collisions in ipfs.map() callbacks and offchain triggers without manual sequence threading. --- .../amp_subgraph/runner/data_processing.rs | 33 +++--- core/src/amp_subgraph/runner/mod.rs | 9 +- core/src/subgraph/runner/mod.rs | 32 +++--- graph/src/components/store/entity_cache.rs | 92 ++++++++++------ graph/src/components/store/mod.rs | 4 +- graph/src/components/subgraph/instance.rs | 10 +- graph/src/lib.rs | 4 +- runtime/test/src/common.rs | 3 +- runtime/test/src/test.rs | 3 +- runtime/wasm/src/host_exports.rs | 29 +---- runtime/wasm/src/mapping.rs | 6 +- runtime/wasm/src/module/context.rs | 10 +- store/test-store/src/store.rs | 7 +- store/test-store/tests/graph/entity_cache.rs | 101 +++++++----------- .../test-store/tests/postgres/aggregation.rs | 7 +- 15 files changed, 180 insertions(+), 170 deletions(-) diff --git a/core/src/amp_subgraph/runner/data_processing.rs b/core/src/amp_subgraph/runner/data_processing.rs index 033d4423726..17d0690c0a8 100644 --- a/core/src/amp_subgraph/runner/data_processing.rs +++ b/core/src/amp_subgraph/runner/data_processing.rs @@ -11,7 +11,7 @@ use graph::{ }, blockchain::block_stream::FirehoseCursor, cheap_clone::CheapClone, - components::store::{EntityCache, ModificationsAndCache}, + components::store::{EntityCache, ModificationsAndCache, SeqGenerator}, }; use slog::{debug, trace}; @@ -96,6 +96,8 @@ async fn process_record_batch_group( .stopwatch .start_section("process_record_batch_group"); + entity_cache.seq_gen = SeqGenerator::new(block_number.compat()); + let RecordBatchGroup { record_batches } = record_batch_group; if record_batches.is_empty() { @@ -121,7 +123,6 @@ async fn process_record_batch_group( process_record_batch( cx, &mut entity_cache, - block_number, record_batch, stream_table_ptr[stream_index], ) @@ -134,6 +135,7 @@ async fn process_record_batch_group( } let section = cx.metrics.stopwatch.start_section("as_modifications"); + let vid_gen = entity_cache.vid_gen(); let ModificationsAndCache { modifications, entity_lfu_cache, @@ -172,13 +174,13 @@ async fn process_record_batch_group( Ok(EntityCache::with_current( cx.store.cheap_clone(), entity_lfu_cache, + vid_gen, )) } async fn process_record_batch( cx: &mut Context, entity_cache: &mut EntityCache, - block_number: BlockNumber, record_batch: RecordBatch, (i, j): TablePtr, ) -> Result<(), Error> { @@ -209,13 +211,11 @@ async fn process_record_batch( let key = match key { Some(key) => key, None => { - let entity_id = entity_cache - .generate_id(id_type, block_number.compat()) - .map_err(|e| { - Error::Deterministic(e.context(format!( - "failed to generate a new id for an entity of type '{entity_name}'" - ))) - })?; + let entity_id = entity_cache.seq_gen.id(id_type).map_err(|e| { + Error::Deterministic(e.context(format!( + "failed to generate a new id for an entity of type '{entity_name}'" + ))) + })?; entity_data.push(("id".into(), entity_id.clone().into())); entity_type.key(entity_id) @@ -229,14 +229,11 @@ async fn process_record_batch( ))) })?; - entity_cache - .set(key, entity, block_number.compat(), None) - .await - .map_err(|e| { - Error::Deterministic(e.context(format!( - "failed to store a new entity of type '{entity_name}' with id '{entity_id}'" - ))) - })?; + entity_cache.set(key, entity, None).await.map_err(|e| { + Error::Deterministic(e.context(format!( + "failed to store a new entity of type '{entity_name}' with id '{entity_id}'" + ))) + })?; } Ok(()) diff --git a/core/src/amp_subgraph/runner/mod.rs b/core/src/amp_subgraph/runner/mod.rs index 64b10c01f9f..5a85dc093f8 100644 --- a/core/src/amp_subgraph/runner/mod.rs +++ b/core/src/amp_subgraph/runner/mod.rs @@ -11,7 +11,9 @@ use std::time::{Duration, Instant}; use anyhow::Result; use futures::StreamExt; use graph::{ - amp::Client, cheap_clone::CheapClone, components::store::EntityCache, + amp::Client, + cheap_clone::CheapClone, + components::store::{EntityCache, SeqGenerator}, data::subgraph::schema::SubgraphError, }; use slog::{debug, error, warn}; @@ -104,7 +106,10 @@ where .update(latest_block.min(cx.end_block())); let mut deployment_is_failed = cx.store.health().await?.is_failed(); - let mut entity_cache = EntityCache::new(cx.store.cheap_clone()); + // The SeqGenerator gets replaced with one for the correct block + // number in `process_record_batch_groups`, so the initial value + // doesn't matter much. + let mut entity_cache = EntityCache::new(cx.store.cheap_clone(), SeqGenerator::new(0)); let mut stream = new_data_stream(cx, latest_block); while let Some(result) = stream.next().await { diff --git a/core/src/subgraph/runner/mod.rs b/core/src/subgraph/runner/mod.rs index a925c652e6c..ef037e6141d 100644 --- a/core/src/subgraph/runner/mod.rs +++ b/core/src/subgraph/runner/mod.rs @@ -17,7 +17,9 @@ use graph::blockchain::{ Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _, TriggerFilterWrapper, }; -use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource}; +use graph::components::store::{ + EmptyStore, GetScope, ReadStore, SeqGenerator, StoredDynamicDataSource, +}; use graph::components::subgraph::InstanceDSTemplate; use graph::components::trigger_processor::RunnableTriggers; use graph::components::{ @@ -1023,9 +1025,9 @@ where .ready_offchain_events() .non_deterministic()?; - let onchain_vid_seq = block_state.entity_cache.vid_seq; + let vid_gen = block_state.vid_gen(); let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = - self.handle_offchain_triggers(offchain_events, block, onchain_vid_seq) + self.handle_offchain_triggers(offchain_events, block, vid_gen) .await .non_deterministic()?; @@ -1070,9 +1072,11 @@ where // Causality region for onchain triggers. let causality_region = PoICausalityRegion::from_network(&self.inputs.network); + let vid_gen = SeqGenerator::new(block_ptr.number); let mut block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), + vid_gen, ); let _section = self @@ -1475,7 +1479,7 @@ where &mut self, triggers: Vec, block: &Arc, - mut next_vid_seq: u32, + vid_gen: SeqGenerator, ) -> Result< ( Vec, @@ -1492,12 +1496,11 @@ where // Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to // get causality region isolation. let schema = ReadStore::input_schema(&self.inputs.store); - let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new()); - - // Continue the vid sequence from the previous trigger (or from - // onchain processing) so that each offchain trigger does not - // reset to RESERVED_VIDS and produce duplicate VIDs. - block_state.entity_cache.vid_seq = next_vid_seq; + let mut block_state = BlockState::new( + EmptyStore::new(schema), + LfuCache::new(), + vid_gen.cheap_clone(), + ); // PoI ignores offchain events. // See also: poi-ignores-offchain @@ -1564,10 +1567,6 @@ where return Err(anyhow!("{}", err)); } - // Carry forward the vid sequence so the next iteration doesn't - // reset to RESERVED_VIDS and produce duplicate VIDs. - next_vid_seq = block_state.entity_cache.vid_seq; - mods.extend( block_state .entity_cache @@ -1695,7 +1694,6 @@ async fn update_proof_of_indexing( key: EntityKey, digest: Bytes, block_time: BlockTime, - block: BlockNumber, ) -> Result<(), Error> { let digest_name = entity_cache.schema.poi_digest(); let mut data = vec![ @@ -1710,12 +1708,11 @@ async fn update_proof_of_indexing( data.push((entity_cache.schema.poi_block_time(), block_time)); } let poi = entity_cache.make_entity(data)?; - entity_cache.set(key, poi, block, None).await + entity_cache.set(key, poi, None).await } let _section_guard = stopwatch.start_section("update_proof_of_indexing"); - let block_number = proof_of_indexing.get_block(); let mut proof_of_indexing = proof_of_indexing.take(); for (causality_region, stream) in proof_of_indexing.drain() { @@ -1752,7 +1749,6 @@ async fn update_proof_of_indexing( entity_key, updated_proof_of_indexing, block_time, - block_number, ) .await?; } diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index ca5493d78e7..58abbe52f91 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, bail}; use std::borrow::Borrow; use std::collections::HashMap; use std::fmt::{self, Debug}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use crate::cheap_clone::CheapClone; @@ -21,6 +22,52 @@ pub type EntityLfuCache = LfuCache>>; // Currently none is used, but lets reserve a few more. const RESERVED_VIDS: u32 = 100; +/// Shared generator for VID and entity ID sequences within a block. +/// +/// Created once per block and shared (via `Arc`) across all `EntityCache` +/// instances that operate on the same block. This prevents VID collisions +/// when multiple isolated caches (e.g. ipfs.map callbacks, offchain +/// triggers) write entities in the same block. +#[derive(Clone, Debug)] +pub struct SeqGenerator { + block: BlockNumber, + vid_seq: Arc, + id_seq: Arc, +} + +impl CheapClone for SeqGenerator { + fn cheap_clone(&self) -> Self { + self.clone() + } +} + +impl SeqGenerator { + pub fn new(block: BlockNumber) -> Self { + SeqGenerator { + block, + vid_seq: Arc::new(AtomicU32::new(RESERVED_VIDS)), + id_seq: Arc::new(AtomicU32::new(0)), + } + } + + /// Return the next VID. The VID encodes the block number in the upper + /// 32 bits and a monotonically increasing sequence in the lower 32 + /// bits. + pub fn vid(&self) -> i64 { + let seq = self.vid_seq.fetch_add(1, Ordering::Relaxed); + ((self.block as i64) << 32) + seq as i64 + } + + /// Generate the next entity ID for the given ID type. The ID encodes + /// the block number in the upper 32 bits and a monotonically + /// increasing sequence in the lower 32 bits. + pub fn id(&self, id_type: IdType) -> anyhow::Result { + let seq = self.id_seq.fetch_add(1, Ordering::Relaxed); + + id_type.generate_id(self.block, seq) + } +} + /// The scope in which the `EntityCache` should perform a `get` operation pub enum GetScope { /// Get from all previously stored entities in the store @@ -103,16 +150,8 @@ pub struct EntityCache { pub schema: InputSchema, - /// A sequence number for generating entity IDs. We use one number for - /// all id's as the id's are scoped by block and a u32 has plenty of - /// room for all changes in one block. To ensure reproducability of - /// generated IDs, the `EntityCache` needs to be newly instantiated for - /// each block - seq: u32, - - // Sequence number of the next VID value for this block. The value written - // in the database consist of a block number and this SEQ number. - pub vid_seq: u32, + /// Shared sequence generator for VIDs and entity IDs within a block. + pub seq_gen: SeqGenerator, } impl Debug for EntityCache { @@ -131,7 +170,7 @@ pub struct ModificationsAndCache { } impl EntityCache { - pub fn new(store: Arc) -> Self { + pub fn new(store: Arc, seq_gen: SeqGenerator) -> Self { Self { current: LfuCache::new(), updates: HashMap::new(), @@ -139,8 +178,7 @@ impl EntityCache { in_handler: false, schema: store.input_schema(), store, - seq: 0, - vid_seq: RESERVED_VIDS, + seq_gen, } } @@ -152,7 +190,11 @@ impl EntityCache { self.schema.make_entity(iter) } - pub fn with_current(store: Arc, current: EntityLfuCache) -> EntityCache { + pub fn with_current( + store: Arc, + current: EntityLfuCache, + vid_gen: SeqGenerator, + ) -> EntityCache { EntityCache { current, updates: HashMap::new(), @@ -160,11 +202,14 @@ impl EntityCache { in_handler: false, schema: store.input_schema(), store, - seq: 0, - vid_seq: RESERVED_VIDS, + seq_gen: vid_gen, } } + pub fn vid_gen(&self) -> SeqGenerator { + self.seq_gen.cheap_clone() + } + pub(crate) fn enter_handler(&mut self) { assert!(!self.in_handler); self.in_handler = true; @@ -368,7 +413,6 @@ impl EntityCache { &mut self, key: EntityKey, entity: Entity, - block: BlockNumber, write_capacity_remaining: Option<&mut usize>, ) -> Result<(), anyhow::Error> { // check the validate for derived fields @@ -386,9 +430,7 @@ impl EntityCache { *write_capacity_remaining -= weight; } - // The next VID is based on a block number and a sequence within the block - let vid = ((block as i64) << 32) + self.vid_seq as i64; - self.vid_seq += 1; + let vid = self.seq_gen.vid(); let mut entity = entity; let old_vid = entity.set_vid(vid).expect("the vid should be set"); // Make sure that there was no VID previously set for this entity. @@ -457,16 +499,6 @@ impl EntityCache { for (key, op) in other.updates { self.entity_op(key, op); } - // Carry forward vid_seq to prevent VID collisions when the caller - // continues writing entities after merging. - self.vid_seq = self.vid_seq.max(other.vid_seq); - } - - /// Generate an id. - pub fn generate_id(&mut self, id_type: IdType, block: BlockNumber) -> anyhow::Result { - let id = id_type.generate_id(block, self.seq)?; - self.seq += 1; - Ok(id) } /// Return the changes that have been made via `set` and `remove` as diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index eb4aebe90be..b4b688444be 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -9,7 +9,9 @@ use diesel::pg::Pg; use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Integer; use diesel_derives::{AsExpression, FromSqlRow}; -pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; +pub use entity_cache::{ + EntityCache, EntityLfuCache, GetScope, ModificationsAndCache, SeqGenerator, +}; use slog::Logger; use tokio_stream::wrappers::ReceiverStream; diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index bbba7b5b32b..e6c25591edd 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -2,7 +2,7 @@ use crate::{ blockchain::{Blockchain, DataSourceTemplate as _}, components::{ metrics::block_state::BlockStateMetrics, - store::{EntityLfuCache, ReadStore, StoredDynamicDataSource}, + store::{EntityLfuCache, ReadStore, SeqGenerator, StoredDynamicDataSource}, }, data::subgraph::schema::SubgraphError, data_source::{DataSourceTemplate, DataSourceTemplateInfo}, @@ -87,9 +87,9 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, vid_gen: SeqGenerator) -> Self { BlockState { - entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache), + entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, vid_gen), deterministic_errors: Vec::new(), created_data_sources: Vec::new(), persisted_data_sources: Vec::new(), @@ -100,6 +100,10 @@ impl BlockState { write_capacity_remaining: ENV_VARS.block_write_capacity, } } + + pub fn vid_gen(&self) -> SeqGenerator { + self.entity_cache.vid_gen() + } } impl BlockState { diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 37dfa549468..bf3777ae05e 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -128,8 +128,8 @@ pub mod prelude { EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow, EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore, - QueryStoreManager, StoreError, StoreEvent, StoreEventStreamBox, SubgraphStore, - UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, + QueryStoreManager, SeqGenerator, StoreError, StoreEvent, StoreEventStreamBox, + SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, }; pub use crate::components::subgraph::{ BlockState, BlockStateCheckpoint, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index a760a1b588d..b08251c8604 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -1,5 +1,5 @@ use graph::blockchain::BlockTime; -use graph::components::store::DeploymentLocator; +use graph::components::store::{DeploymentLocator, SeqGenerator}; use graph::components::subgraph::SharedProofOfIndexing; use graph::data::subgraph::*; use graph::data_source; @@ -126,6 +126,7 @@ pub fn mock_context( )) .unwrap(), Default::default(), + SeqGenerator::new(12), ), proof_of_indexing: SharedProofOfIndexing::ignored(), host_fns: Arc::new(Vec::new()), diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 7eadb653c80..52969ef82f3 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -1070,7 +1070,7 @@ async fn test_entity_store(api_version: Version) { let ctx = instance.store.data_mut(); let cache = std::mem::replace( &mut ctx.ctx.state.entity_cache, - EntityCache::new(Arc::new(writable.clone())), + EntityCache::new(Arc::new(writable.clone()), SeqGenerator::new(12)), ); let mut mods = cache .as_modifications(0, &STOPWATCH) @@ -1361,7 +1361,6 @@ impl Host { self.host_exports .store_set( &self.ctx.logger, - 12, // Arbitrary block number &mut self.ctx.state, &self.ctx.proof_of_indexing, entity_type.to_string(), diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index d72dd21e99c..ab45777c428 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -224,7 +224,6 @@ impl HostExports { pub(crate) async fn store_set( &self, logger: &Logger, - block: BlockNumber, state: &mut BlockState, proof_of_indexing: &SharedProofOfIndexing, block_time: BlockTime, @@ -251,7 +250,7 @@ impl HostExports { .into()); } let id_type = entity_type.id_type()?; - let id = state.entity_cache.generate_id(id_type, block)?; + let id = state.entity_cache.seq_gen.id(id_type)?; data.insert(store::ID.clone(), id.clone().into()); id.to_string() } else { @@ -338,12 +337,7 @@ impl HostExports { state .entity_cache - .set( - key, - entity, - block, - Some(&mut state.write_capacity_remaining), - ) + .set(key, entity, Some(&mut state.write_capacity_remaining)) .await?; Ok(()) @@ -537,7 +531,7 @@ impl HostExports { let host_metrics = wasm_ctx.host_metrics.clone(); let valid_module = wasm_ctx.valid_module.clone(); - let mut ctx = wasm_ctx.ctx.derive_with_empty_block_state(); + let ctx = wasm_ctx.ctx.derive_with_empty_block_state(); let callback = callback.to_owned(); // Create a base error message to avoid borrowing headaches let errmsg = format!( @@ -560,14 +554,9 @@ impl HostExports { let mut v = Vec::new(); while let Some(sv) = stream.next().await { let sv = sv?; - let mut derived = ctx.derive_with_empty_block_state(); - // Continue the vid sequence from the previous callback so - // each iteration doesn't reset to RESERVED_VIDS and - // produce duplicate VIDs. - derived.state.entity_cache.vid_seq = ctx.state.entity_cache.vid_seq; let module = WasmInstance::from_valid_module_with_ctx_boxed( valid_module.clone(), - derived, + ctx.derive_with_empty_block_state(), host_metrics.clone(), wasm_ctx.experimental_features, ) @@ -575,9 +564,6 @@ impl HostExports { let result = module .handle_json_callback(&callback, &sv.value, &user_data) .await?; - // Carry forward vid_seq so the next iteration continues - // the sequence. - ctx.state.entity_cache.vid_seq = result.entity_cache.vid_seq; // Log progress every 15s if last_log.elapsed() > Duration::from_secs(15) { debug!( @@ -1310,10 +1296,7 @@ pub mod test_support { use graph::{ blockchain::BlockTime, - components::{ - store::{BlockNumber, GetScope}, - subgraph::SharedProofOfIndexing, - }, + components::{store::GetScope, subgraph::SharedProofOfIndexing}, data::value::Word, prelude::{BlockState, Entity, StopwatchMetrics, Value}, runtime::{gas::GasCounter, HostExportError}, @@ -1338,7 +1321,6 @@ pub mod test_support { pub async fn store_set( &self, logger: &Logger, - block: BlockNumber, state: &mut BlockState, proof_of_indexing: &SharedProofOfIndexing, entity_type: String, @@ -1350,7 +1332,6 @@ pub mod test_support { self.host_exports .store_set( logger, - block, state, proof_of_indexing, self.block_time, diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index e2613d5a9fb..6709518584e 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -207,7 +207,11 @@ impl MappingContext { host_exports: self.host_exports.cheap_clone(), block_ptr: self.block_ptr.cheap_clone(), timestamp: self.timestamp, - state: BlockState::new(self.state.entity_cache.store.clone(), Default::default()), + state: BlockState::new( + self.state.entity_cache.store.clone(), + Default::default(), + self.state.vid_gen(), + ), proof_of_indexing: self.proof_of_indexing.cheap_clone(), host_fns: self.host_fns.cheap_clone(), debug_fork: self.debug_fork.cheap_clone(), diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 3e4f26cfd58..68639e463b3 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::Instant; use anyhow::Error; -use graph::components::store::GetScope; +use graph::components::store::{GetScope, SeqGenerator}; use never::Never; use crate::asc_abi::class::*; @@ -131,7 +131,11 @@ impl WasmInstanceData { std::mem::replace( state, - BlockState::new(state.entity_cache.store.cheap_clone(), LfuCache::default()), + BlockState::new( + state.entity_cache.store.cheap_clone(), + LfuCache::default(), + SeqGenerator::new(self.ctx.block_ptr.number), + ), ) } } @@ -256,7 +260,6 @@ impl WasmInstanceContext<'_> { ) -> Result<(), HostExportError> { let stopwatch = self.as_ref().host_metrics.stopwatch.cheap_clone(); let logger = self.as_ref().ctx.logger.cheap_clone(); - let block_number = self.as_ref().ctx.block_ptr.block_number(); stopwatch.start_section("host_export_store_set__wasm_instance_context_store_set"); let entity: String = asc_get(self, entity_ptr, gas)?; @@ -275,7 +278,6 @@ impl WasmInstanceContext<'_> { host_exports .store_set( &logger, - block_number, &mut ctx.state, &ctx.proof_of_indexing, ctx.timestamp, diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index b65080f3cf4..b90d244bbcc 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -2,7 +2,7 @@ use diesel; use graph::blockchain::mock::MockDataSource; use graph::blockchain::BlockTime; use graph::blockchain::ChainIdentifier; -use graph::components::store::BlockStore; +use graph::components::store::{BlockStore, SeqGenerator}; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::QueryResults; use graph::data::query::QueryTarget; @@ -378,7 +378,10 @@ pub async fn transact_entities_and_dynamic_data_sources( store.shard().to_string(), ); - let mut entity_cache = EntityCache::new(Arc::new(store.clone())); + let mut entity_cache = EntityCache::new( + Arc::new(store.clone()), + SeqGenerator::new(block_ptr_to.number), + ); entity_cache.append(ops); let mods = entity_cache .as_modifications(block_ptr_to.number, &stopwatch_metrics) diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 9b9f2c4c574..983ea578ded 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -194,7 +194,7 @@ fn sort_by_entity_key(mut mods: Vec) -> Vec id: "mogwai", name: "Mogwai" }; let mogwai_key = make_band_key("mogwai"); cache - .set(mogwai_key.clone(), mogwai_data.clone(), 0, None) + .set(mogwai_key.clone(), mogwai_data.clone(), None) .await .unwrap(); let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone(), 0, None) + .set(sigurros_key.clone(), sigurros_data.clone(), None) .await .unwrap(); @@ -257,19 +257,19 @@ async fn overwrite_modifications() { }; let store = Arc::new(store); - let mut cache = EntityCache::new(store); + let mut cache = EntityCache::new(store, SeqGenerator::new(0)); let mut mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; let mogwai_key = make_band_key("mogwai"); cache - .set(mogwai_key.clone(), mogwai_data.clone(), 0, None) + .set(mogwai_key.clone(), mogwai_data.clone(), None) .await .unwrap(); let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994}; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone(), 0, None) + .set(sigurros_key.clone(), sigurros_data.clone(), None) .await .unwrap(); @@ -298,19 +298,19 @@ async fn consecutive_modifications() { }; let store = Arc::new(store); - let mut cache = EntityCache::new(store); + let mut cache = EntityCache::new(store, SeqGenerator::new(0)); // First, add "founded" and change the "label". let update_data = entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" }; let update_key = make_band_key("mogwai"); - cache.set(update_key, update_data, 0, None).await.unwrap(); + cache.set(update_key, update_data, None).await.unwrap(); // Then, just reset the "label". let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null }; let update_key = make_band_key("mogwai"); cache - .set(update_key.clone(), update_data, 0, None) + .set(update_key.clone(), update_data, None) .await .unwrap(); @@ -331,7 +331,7 @@ async fn consecutive_modifications() { async fn check_vid_sequence() { let store = MockStore::new(BTreeMap::new()); let store = Arc::new(store); - let mut cache = EntityCache::new(store); + let mut cache = EntityCache::new(store, SeqGenerator::new(0)); for n in 0..10 { let id = (10 - n).to_string(); @@ -339,7 +339,7 @@ async fn check_vid_sequence() { let mogwai_key = make_band_key(id.as_str()); let mogwai_data = entity! { SCHEMA => id: id, name: name }; cache - .set(mogwai_key.clone(), mogwai_data.clone(), 0, None) + .set(mogwai_key.clone(), mogwai_data.clone(), None) .await .unwrap(); } @@ -367,39 +367,34 @@ async fn check_vid_sequence() { } // Test that demonstrates the VID collision bug when multiple offchain triggers -// (e.g. file data sources) are processed in the same block. Each trigger creates -// a fresh EntityCache with vid_seq reset to RESERVED_VIDS (100). When two triggers -// in the same block both insert an entity, they produce the same VID, violating -// the primary key constraint. -// -// The fix is to thread vid_seq from one trigger's EntityCache to the next. +// use separate VidGenerators. Each gets its own sequence starting at RESERVED_VIDS +// (100), so entities in the same block get identical VIDs. #[graph::test] -async fn offchain_trigger_vid_collision_without_fix() { - let block: i32 = 2_163_923; // any block number +async fn offchain_trigger_vid_collision_without_shared_generator() { + let block: i32 = 2_163_923; - // Simulate first offchain trigger: fresh EntityCache (vid_seq starts at 100) + // Simulate first offchain trigger with its own VidGenerator let store1 = Arc::new(MockStore::new(BTreeMap::new())); - let mut cache1 = EntityCache::new(store1); + let mut cache1 = EntityCache::new(store1, SeqGenerator::new(block)); let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" }; let band1_key = make_band_key("band1"); cache1 - .set(band1_key.clone(), band1_data, block, None) + .set(band1_key.clone(), band1_data, None) .await .unwrap(); let result1 = cache1.as_modifications(block, &STOPWATCH).await.unwrap(); - // Simulate second offchain trigger: another fresh EntityCache (vid_seq ALSO starts at 100) + // Simulate second offchain trigger with a SEPARATE VidGenerator let store2 = Arc::new(MockStore::new(BTreeMap::new())); - let mut cache2 = EntityCache::new(store2); + let mut cache2 = EntityCache::new(store2, SeqGenerator::new(block)); let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" }; let band2_key = make_band_key("band2"); cache2 - .set(band2_key.clone(), band2_data, block, None) + .set(band2_key.clone(), band2_data, None) .await .unwrap(); let result2 = cache2.as_modifications(block, &STOPWATCH).await.unwrap(); - // Extract VIDs from both modifications let vid1 = match &result1.modifications[0] { EntityModification::Insert { data, .. } => data.vid(), _ => panic!("expected Insert"), @@ -409,52 +404,38 @@ async fn offchain_trigger_vid_collision_without_fix() { _ => panic!("expected Insert"), }; - // BUG: Both VIDs are identical! This is what causes - // "duplicate key value violates unique constraint" - // vid = (block << 32) + 100 for BOTH triggers + // BUG: Both VIDs are identical because each VidGenerator starts at 100 let expected_vid = ((block as i64) << 32) + 100; - assert_eq!( - vid1, expected_vid, - "first trigger vid should be (block << 32) + 100" - ); - assert_eq!( - vid2, expected_vid, - "second trigger vid should ALSO be (block << 32) + 100 — the bug!" - ); - assert_eq!( - vid1, vid2, - "VIDs collide — this is the bug that causes the DB constraint violation" - ); + assert_eq!(vid1, expected_vid); + assert_eq!(vid2, expected_vid); + assert_eq!(vid1, vid2, "VIDs collide when using separate VidGenerators"); } -// Test that demonstrates the fix: threading vid_seq from one trigger's -// EntityCache to the next prevents VID collisions. +// Test that demonstrates the fix: sharing a single VidGenerator across +// multiple EntityCaches prevents VID collisions. #[graph::test] -async fn offchain_trigger_vid_no_collision_with_fix() { +async fn offchain_trigger_vid_no_collision_with_shared_generator() { let block: i32 = 2_163_923; + let vid_gen = SeqGenerator::new(block); // First offchain trigger let store1 = Arc::new(MockStore::new(BTreeMap::new())); - let mut cache1 = EntityCache::new(store1); + let mut cache1 = EntityCache::new(store1, vid_gen.cheap_clone()); let band1_data = entity! { SCHEMA => id: "band1", name: "First Band" }; let band1_key = make_band_key("band1"); cache1 - .set(band1_key.clone(), band1_data, block, None) + .set(band1_key.clone(), band1_data, None) .await .unwrap(); - - // THE FIX: capture vid_seq BEFORE as_modifications consumes the cache - let next_vid_seq = cache1.vid_seq; let result1 = cache1.as_modifications(block, &STOPWATCH).await.unwrap(); - // Second offchain trigger: set vid_seq to where first trigger left off + // Second offchain trigger shares the same VidGenerator let store2 = Arc::new(MockStore::new(BTreeMap::new())); - let mut cache2 = EntityCache::new(store2); - cache2.vid_seq = next_vid_seq; // <-- the fix + let mut cache2 = EntityCache::new(store2, vid_gen.cheap_clone()); let band2_data = entity! { SCHEMA => id: "band2", name: "Second Band" }; let band2_key = make_band_key("band2"); cache2 - .set(band2_key.clone(), band2_data, block, None) + .set(band2_key.clone(), band2_data, None) .await .unwrap(); let result2 = cache2.as_modifications(block, &STOPWATCH).await.unwrap(); @@ -468,10 +449,10 @@ async fn offchain_trigger_vid_no_collision_with_fix() { _ => panic!("expected Insert"), }; - // With the fix, VIDs are different + // With a shared VidGenerator, VIDs are different assert_ne!( vid1, vid2, - "VIDs should NOT collide when vid_seq is threaded" + "VIDs should NOT collide when sharing a VidGenerator" ); let expected_vid1 = ((block as i64) << 32) + 100; let expected_vid2 = ((block as i64) << 32) + 101; @@ -556,7 +537,7 @@ where let read_store = Arc::new(writable.clone()); - let cache = EntityCache::new(read_store); + let cache = EntityCache::new(read_store, SeqGenerator::new(0)); // Run test and wait for the background writer to finish its work so // it won't conflict with the next test test(cache, subgraph_store.clone(), deployment, writable.clone()).await; @@ -889,7 +870,7 @@ fn scoped_get() { let mut wallet5 = create_wallet_entity_no_vid("5", &account5, 100); let key5 = WALLET_TYPE.parse_key("5").unwrap(); cache - .set(key5.clone(), wallet5.clone(), 0, None) + .set(key5.clone(), wallet5.clone(), None) .await .unwrap(); @@ -917,7 +898,7 @@ fn scoped_get() { let mut wallet1 = wallet1; wallet1.set("balance", 70).unwrap(); cache - .set(key1.clone(), wallet1.clone(), 0, None) + .set(key1.clone(), wallet1.clone(), None) .await .unwrap(); wallet1a = wallet1; @@ -968,6 +949,6 @@ fn no_interface_mods() { let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 }; - cache.set(key, entity, 0, None).await.unwrap_err(); + cache.set(key, entity, None).await.unwrap_err(); }) } diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index 7cc300b04d8..c2cb619d63f 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -9,7 +9,7 @@ use graph::{ store::{ AggregationCurrent, AttributeNames, BlockNumber, ChildMultiplicity, DeploymentLocator, EntityCache, EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityQuery, - EntityWindow, ReadStore, StoreError, SubgraphStore as _, WindowAttribute, + EntityWindow, ReadStore, SeqGenerator, StoreError, SubgraphStore as _, WindowAttribute, WritableStore, }, }, @@ -108,7 +108,10 @@ pub async fn insert_entities( }) .collect(); - let mut entity_cache = EntityCache::new(Arc::new(store.clone())); + let mut entity_cache = EntityCache::new( + Arc::new(store.clone()), + SeqGenerator::new(block_ptr_to.number), + ); entity_cache.append(ops); let mods = entity_cache .as_modifications(block_ptr_to.number, &STOPWATCH) From d53c3fd4388de7e937b8ec450324bebe08080793 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 4 Mar 2026 15:47:39 -0800 Subject: [PATCH 2/3] core: thread EntityLfuCache instead of EntityCache across blocks The AMP runner previously created an EntityCache with a fake SeqGenerator(0) and shared it across block boundaries, even though an EntityCache should always be tied to a single block. Now only the LfuCache is threaded through, and EntityCache is created locally in process_record_batch_group with the correct block's SeqGenerator. --- .../amp_subgraph/runner/data_processing.rs | 35 +++++++++---------- core/src/amp_subgraph/runner/mod.rs | 15 +++----- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/core/src/amp_subgraph/runner/data_processing.rs b/core/src/amp_subgraph/runner/data_processing.rs index 17d0690c0a8..b9b2f20aba1 100644 --- a/core/src/amp_subgraph/runner/data_processing.rs +++ b/core/src/amp_subgraph/runner/data_processing.rs @@ -11,7 +11,7 @@ use graph::{ }, blockchain::block_stream::FirehoseCursor, cheap_clone::CheapClone, - components::store::{EntityCache, ModificationsAndCache, SeqGenerator}, + components::store::{EntityCache, EntityLfuCache, ModificationsAndCache, SeqGenerator}, }; use slog::{debug, trace}; @@ -19,14 +19,14 @@ use super::{data_stream::TablePtr, Compat, Context, Error}; pub(super) async fn process_record_batch_groups( cx: &mut Context, - mut entity_cache: EntityCache, + mut entity_lfu_cache: EntityLfuCache, record_batch_groups: RecordBatchGroups, stream_table_ptr: Arc<[TablePtr]>, latest_block: BlockNumber, -) -> Result { +) -> Result { if record_batch_groups.is_empty() { debug!(cx.logger, "Received no record batch groups"); - return Ok(entity_cache); + return Ok(entity_lfu_cache); } let from_block = record_batch_groups @@ -50,9 +50,9 @@ pub(super) async fn process_record_batch_groups( "record_batches_count" => record_batch_group.record_batches.len() ); - entity_cache = process_record_batch_group( + entity_lfu_cache = process_record_batch_group( cx, - entity_cache, + entity_lfu_cache, block_number, block_hash, record_batch_group, @@ -79,32 +79,36 @@ pub(super) async fn process_record_batch_groups( "to_block" => to_block ); - Ok(entity_cache) + Ok(entity_lfu_cache) } async fn process_record_batch_group( cx: &mut Context, - mut entity_cache: EntityCache, + entity_lfu_cache: EntityLfuCache, block_number: BlockNumber, block_hash: BlockHash, record_batch_group: RecordBatchGroup, stream_table_ptr: &[TablePtr], latest_block: BlockNumber, -) -> Result { +) -> Result { let _section = cx .metrics .stopwatch .start_section("process_record_batch_group"); - entity_cache.seq_gen = SeqGenerator::new(block_number.compat()); - let RecordBatchGroup { record_batches } = record_batch_group; if record_batches.is_empty() { debug!(cx.logger, "Record batch group is empty"); - return Ok(entity_cache); + return Ok(entity_lfu_cache); } + let mut entity_cache = EntityCache::with_current( + cx.store.cheap_clone(), + entity_lfu_cache, + SeqGenerator::new(block_number.compat()), + ); + let block_timestamp = if cx.manifest.schema.has_aggregations() { decode_block_timestamp(&record_batches) .map_err(|e| e.context("failed to decode block timestamp"))? @@ -135,7 +139,6 @@ async fn process_record_batch_group( } let section = cx.metrics.stopwatch.start_section("as_modifications"); - let vid_gen = entity_cache.vid_gen(); let ModificationsAndCache { modifications, entity_lfu_cache, @@ -171,11 +174,7 @@ async fn process_record_batch_group( cx.metrics.deployment_synced.record(true); } - Ok(EntityCache::with_current( - cx.store.cheap_clone(), - entity_lfu_cache, - vid_gen, - )) + Ok(entity_lfu_cache) } async fn process_record_batch( diff --git a/core/src/amp_subgraph/runner/mod.rs b/core/src/amp_subgraph/runner/mod.rs index 5a85dc093f8..e90b5c257c2 100644 --- a/core/src/amp_subgraph/runner/mod.rs +++ b/core/src/amp_subgraph/runner/mod.rs @@ -11,10 +11,8 @@ use std::time::{Duration, Instant}; use anyhow::Result; use futures::StreamExt; use graph::{ - amp::Client, - cheap_clone::CheapClone, - components::store::{EntityCache, SeqGenerator}, - data::subgraph::schema::SubgraphError, + amp::Client, cheap_clone::CheapClone, components::store::EntityLfuCache, + data::subgraph::schema::SubgraphError, util::lfu_cache::LfuCache, }; use slog::{debug, error, warn}; use tokio_util::sync::CancellationToken; @@ -106,18 +104,15 @@ where .update(latest_block.min(cx.end_block())); let mut deployment_is_failed = cx.store.health().await?.is_failed(); - // The SeqGenerator gets replaced with one for the correct block - // number in `process_record_batch_groups`, so the initial value - // doesn't matter much. - let mut entity_cache = EntityCache::new(cx.store.cheap_clone(), SeqGenerator::new(0)); + let mut entity_lfu_cache: EntityLfuCache = LfuCache::new(); let mut stream = new_data_stream(cx, latest_block); while let Some(result) = stream.next().await { let (record_batch_groups, stream_table_ptr) = result?; - entity_cache = process_record_batch_groups( + entity_lfu_cache = process_record_batch_groups( cx, - entity_cache, + entity_lfu_cache, record_batch_groups, stream_table_ptr, latest_block, From 8245b86f7ad5b50e51d1d512e539d83fc03beea9 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 4 Mar 2026 16:08:10 -0800 Subject: [PATCH 3/3] test-store, tests: Add tests for VID collision prevention Add a unit test that simulates the ipfs.map() pattern (multiple EntityCache instances sharing one SeqGenerator) and verifies VIDs are sequential. Extend the file_data_sources runner test so that two offchain triggers in the same block each create an entity, exercising the shared VID generator end-to-end. --- store/test-store/tests/graph/entity_cache.rs | 31 +++++++++++++++ .../file-data-sources/src/mapping.ts | 7 ++++ tests/tests/runner_tests.rs | 39 +++++++++++++++---- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 983ea578ded..b84524e8ec0 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -463,6 +463,37 @@ async fn offchain_trigger_vid_no_collision_with_shared_generator() { ); } +// Simulate the ipfs.map() pattern: multiple EntityCache instances each create +// an entity using a shared SeqGenerator. VIDs must be unique and sequential. +#[graph::test] +async fn ipfs_map_pattern_vid_uniqueness() { + let block: i32 = 42; + let vid_gen = SeqGenerator::new(block); + + let mut all_vids = Vec::new(); + for i in 0..5u32 { + let store = Arc::new(MockStore::new(BTreeMap::new())); + let mut cache = EntityCache::new(store, vid_gen.cheap_clone()); + let data = entity! { SCHEMA => id: format!("band{i}"), name: format!("Band {i}") }; + let key = make_band_key(&format!("band{i}")); + cache.set(key, data, None).await.unwrap(); + let result = cache.as_modifications(block, &STOPWATCH).await.unwrap(); + let vid = match &result.modifications[0] { + EntityModification::Insert { data, .. } => data.vid(), + _ => panic!("expected Insert"), + }; + all_vids.push(vid); + } + + for i in 1..all_vids.len() { + assert_eq!( + all_vids[i], + all_vids[i - 1] + 1, + "VIDs should be sequential" + ); + } +} + const ACCOUNT_GQL: &str = " type Account @entity { id: ID! diff --git a/tests/runner-tests/file-data-sources/src/mapping.ts b/tests/runner-tests/file-data-sources/src/mapping.ts index 19716ce4503..a7f6ffa70ab 100644 --- a/tests/runner-tests/file-data-sources/src/mapping.ts +++ b/tests/runner-tests/file-data-sources/src/mapping.ts @@ -126,6 +126,13 @@ export function handleFile(data: Bytes): void { let contextCommand = context.getString('command'); if (contextCommand == SPAWN_FDS_FROM_OFFCHAIN_HANDLER) { + // Create an entity for THIS file data source too, so that two offchain + // triggers in the same block each write an entity. This exercises the + // shared VID generator: if VIDs collide the DB will reject the insert. + let entity = new FileEntity(dataSource.stringParam()); + entity.content = data.toString(); + entity.save(); + let hash = context.getString('hash'); log.info('Creating file data source from handleFile: {}', [hash]); dataSource.createWithContext('File', [hash], new DataSourceContext()); diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index f1dbda62c41..27ffe9422f6 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -673,11 +673,25 @@ async fn file_data_sources() { assert!(datasources.len() == 1); } - // Create a File data source from a same type of file data source handler + // Create a File data source from a same type of file data source handler. + // Both hash_2 and hash_3 handlers create entities in the same block, + // exercising the shared VID generator: colliding VIDs would cause a DB + // constraint violation. { ctx.start_and_sync_to(test_ptr(4)).await; - let content = "EXAMPLE_3"; + let query_res = ctx + .query(&format!( + r#"{{ fileEntity(id: "{}") {{ id, content }} }}"#, + hash_2.clone() + )) + .await + .unwrap(); + assert_json_eq!( + query_res, + Some(object! { fileEntity: object!{ id: hash_2.clone(), content: "EXAMPLE_2" } }) + ); + let query_res = ctx .query(&format!( r#"{{ fileEntity(id: "{}") {{ id, content }} }}"#, @@ -687,7 +701,7 @@ async fn file_data_sources() { .unwrap(); assert_json_eq!( query_res, - Some(object! { fileEntity: object!{ id: hash_3.clone(), content: content } }) + Some(object! { fileEntity: object!{ id: hash_3.clone(), content: "EXAMPLE_3" } }) ); } @@ -813,11 +827,22 @@ async fn file_data_sources() { chain.set_block_stream(blocks); - let message = "error while executing at wasm backtrace:\t 0: 0x3490 - !generated/schema/Foo#save\t 1: 0x3eb2 - !src/mapping/handleFile: entity type `Foo` is not on the 'entities' list for data source `File`. Hint: Add `Foo` to the 'entities' list, which currently is: `FileEntity`. in handler `handleFile` at block #5 () at block #5 (0000000000000000000000000000000000000000000000000000000000000005)"; - let err = ctx.start_and_sync_to_error(block_5.ptr()).await; - - assert_eq!(err.to_string(), message); + let err_msg = err.to_string(); + + // Don't check exact wasm hex offsets since they change with any + // modification to the wasm binary. + assert!( + err_msg + .contains("entity type `Foo` is not on the 'entities' list for data source `File`"), + "unexpected error: {err_msg}" + ); + assert!( + err_msg.contains( + "Hint: Add `Foo` to the 'entities' list, which currently is: `FileEntity`" + ), + "unexpected error: {err_msg}" + ); } }