diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 94417b9b7..ac68f2cd8 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -270,6 +270,13 @@ func (e *Executor) initializeState() error { e.logger.Info().Uint64("height", state.LastBlockHeight). Str("chain_id", state.ChainID).Msg("initialized state") + // Migrate any old-style pending block (stored at height N+1 via SaveBlockData + // with empty signature) to the new metadata-key format. + // Todo remove in the future: https://github.com/evstack/ev-node/issues/2795 + if err := e.migrateLegacyPendingBlock(e.ctx); err != nil { + return fmt.Errorf("failed to migrate legacy pending block: %w", err) + } + // Determine sync target: use Raft height if node is behind Raft consensus syncTargetHeight := state.LastBlockHeight if e.raftNode != nil { @@ -429,12 +436,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // Check if there's an already stored block at the newHeight // If there is use that instead of creating a new block - pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight) - if err == nil { + pendingHeader, pendingData, err := e.getPendingBlock(ctx) + if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { e.logger.Info().Uint64("height", newHeight).Msg("using pending block") header = pendingHeader data = pendingData - } else if !errors.Is(err, datastore.ErrNotFound) { + } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { return fmt.Errorf("failed to get block data: %w", err) } else { // get batch from sequencer @@ -452,18 +459,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create block: %w", err) } - - // saved early for crash recovery, will be overwritten later with the final signature - batch, err := e.store.NewBatch(ctx) - if err != nil { - return fmt.Errorf("failed to create batch for early save: %w", err) - } - if err = batch.SaveBlockData(header, data, &types.Signature{}); err != nil { + if err := e.savePendingBlock(ctx, header, data); err != nil { return fmt.Errorf("failed to save block data: %w", err) } - if err = batch.Commit(); err != nil { - return fmt.Errorf("failed to commit early save batch: %w", err) - } } if e.raftNode != nil && !e.raftNode.HasQuorum() { @@ -535,6 +533,10 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { } e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") } + if err := e.deletePendingBlock(batch); err != nil { + e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") + } + if err := batch.Commit(); err != nil { return fmt.Errorf("failed to commit batch: %w", err) } diff --git a/block/internal/executing/executor_restart_test.go b/block/internal/executing/executor_restart_test.go index 571bc7521..e5c3b6af4 100644 --- a/block/internal/executing/executor_restart_test.go +++ b/block/internal/executing/executor_restart_test.go @@ -79,7 +79,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, exec1.initializeState()) // Set up context for first executor - exec1.ctx, exec1.cancel = context.WithCancel(context.Background()) + exec1.ctx, exec1.cancel = context.WithCancel(t.Context()) // First executor produces a block normally mockSeq1.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). @@ -101,12 +101,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, err) // Verify first block was produced - h1, err := memStore.Height(context.Background()) + h1, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(1), h1) // Store the produced block data for later verification - originalHeader, originalData, err := memStore.GetBlockData(context.Background(), 1) + originalHeader, originalData, err := memStore.GetBlockData(t.Context(), 1) require.NoError(t, err) assert.Equal(t, 2, len(originalData.Txs), "first block should have 2 transactions") @@ -158,11 +158,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { pendingHeader.DataHash = pendingData.DACommitment() // Save pending block data (this is what would happen during a crash) - batch, err := memStore.NewBatch(context.Background()) - require.NoError(t, err) - err = batch.SaveBlockData(pendingHeader, pendingData, &types.Signature{}) - require.NoError(t, err) - err = batch.Commit() + err = exec1.savePendingBlock(t.Context(), pendingHeader, pendingData) require.NoError(t, err) // Stop first executor (simulating crash/restart) @@ -199,7 +195,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, exec2.initializeState()) // Set up context for second executor - exec2.ctx, exec2.cancel = context.WithCancel(context.Background()) + exec2.ctx, exec2.cancel = context.WithCancel(t.Context()) defer exec2.cancel() // Verify that the state is at height 1 (pending block at height 2 wasn't committed) @@ -221,12 +217,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { require.NoError(t, err) // Verify height advanced to 2 - h2, err := memStore.Height(context.Background()) + h2, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(2), h2, "height should advance to 2 using pending block") // Verify the block at height 2 matches the pending block data - finalHeader, finalData, err := memStore.GetBlockData(context.Background(), 2) + finalHeader, finalData, err := memStore.GetBlockData(t.Context(), 2) require.NoError(t, err) assert.Equal(t, 3, len(finalData.Txs), "should use pending block with 3 transactions") assert.Equal(t, []byte("pending_tx1"), []byte(finalData.Txs[0])) @@ -388,7 +384,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) { require.NoError(t, err) // Verify normal operation - h, err := memStore.Height(context.Background()) + h, err := memStore.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(numBlocks+1), h) diff --git a/block/internal/executing/pending.go b/block/internal/executing/pending.go new file mode 100644 index 000000000..dc758e558 --- /dev/null +++ b/block/internal/executing/pending.go @@ -0,0 +1,147 @@ +package executing + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" + ds "github.com/ipfs/go-datastore" +) + +const ( + headerKey = "pending_header" + dataKey = "pending_data" +) + +// getPendingBlock retrieves the pending block from metadata if it exists +func (e *Executor) getPendingBlock(ctx context.Context) (*types.SignedHeader, *types.Data, error) { + headerBytes, err := e.store.GetMetadata(ctx, headerKey) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, nil, nil + } + return nil, nil, err + } + + dataBytes, err := e.store.GetMetadata(ctx, dataKey) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, nil, fmt.Errorf("pending header exists but data is missing: corrupt state") + } + return nil, nil, err + } + + header := new(types.SignedHeader) + if err := header.UnmarshalBinary(headerBytes); err != nil { + return nil, nil, fmt.Errorf("unmarshal pending header: %w", err) + } + + data := new(types.Data) + if err := data.UnmarshalBinary(dataBytes); err != nil { + return nil, nil, fmt.Errorf("unmarshal pending data: %w", err) + } + return header, data, nil +} + +// savePendingBlock saves a block to metadata as pending +func (e *Executor) savePendingBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) error { + headerBytes, err := header.MarshalBinary() + if err != nil { + return fmt.Errorf("marshal header: %w", err) + } + + dataBytes, err := data.MarshalBinary() + if err != nil { + return fmt.Errorf("marshal data: %w", err) + } + + batch, err := e.store.NewBatch(ctx) + if err != nil { + return fmt.Errorf("create batch for early save: %w", err) + } + + if err := batch.Put(ds.NewKey(store.GetMetaKey(headerKey)), headerBytes); err != nil { + return fmt.Errorf("save pending header: %w", err) + } + + if err := batch.Put(ds.NewKey(store.GetMetaKey(dataKey)), dataBytes); err != nil { + return fmt.Errorf("save pending data: %w", err) + } + + if err := batch.Commit(); err != nil { + return fmt.Errorf("commit pending block: %w", err) + } + return nil +} + +// deletePendingBlock removes pending block metadata +func (e *Executor) deletePendingBlock(batch store.Batch) error { + if err := batch.Delete(ds.NewKey(store.GetMetaKey(headerKey))); err != nil { + return fmt.Errorf("delete pending header: %w", err) + } + + if err := batch.Delete(ds.NewKey(store.GetMetaKey(dataKey))); err != nil { + return fmt.Errorf("delete pending data: %w", err) + } + return nil +} + +// migrateLegacyPendingBlock detects old-style pending blocks that were stored +// at height N+1 via SaveBlockData with an empty signature (pre-upgrade format) +// and migrates them to the new metadata-key format (m/pending_header, m/pending_data). +// +// This prevents double-signing when a node is upgraded: without migration the +// new code would not find the pending block and would create+sign a new one at +// the same height. +func (e *Executor) migrateLegacyPendingBlock(ctx context.Context) error { + candidateHeight := e.getLastState().LastBlockHeight + 1 + pendingHeader, pendingData, err := e.store.GetBlockData(ctx, candidateHeight) + if err != nil { + if !errors.Is(err, ds.ErrNotFound) { + return fmt.Errorf("get block data: %w", err) + } + return nil + } + if len(pendingHeader.Signature) != 0 { + return errors.New("pending block with signatures found") + } + // Migrate: write header+data to the new metadata keys. + if err := e.savePendingBlock(ctx, pendingHeader, pendingData); err != nil { + return fmt.Errorf("save migrated pending block: %w", err) + } + + // Clean up old-style keys. + batch, err := e.store.NewBatch(ctx) + if err != nil { + return fmt.Errorf("create cleanup batch: %w", err) + } + + headerBytes, err := pendingHeader.MarshalBinary() + if err != nil { + return fmt.Errorf("marshal header for hash: %w", err) + } + headerHash := sha256.Sum256(headerBytes) + + for _, key := range []string{ + store.GetHeaderKey(candidateHeight), + store.GetDataKey(candidateHeight), + store.GetSignatureKey(candidateHeight), + store.GetIndexKey(headerHash[:]), + } { + if err := batch.Delete(ds.NewKey(key)); err != nil && !errors.Is(err, ds.ErrNotFound) { + return fmt.Errorf("delete legacy key %s: %w", key, err) + } + } + + if err := batch.Commit(); err != nil { + return fmt.Errorf("commit cleanup batch: %w", err) + } + + e.logger.Info(). + Uint64("height", candidateHeight). + Msg("migrated legacy pending block to metadata format") + return nil +} diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index c80374aa7..33f804c67 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -599,7 +599,7 @@ func TestHeaderStoreAdapter_HeadPrefersPending(t *testing.T) { func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() ds, err := NewTestInMemoryKVStore() require.NoError(t, err) diff --git a/pkg/store/keys.go b/pkg/store/keys.go index f64c9cadc..f2aa45d8a 100644 --- a/pkg/store/keys.go +++ b/pkg/store/keys.go @@ -39,30 +39,43 @@ const ( heightPrefix = "t" ) -func getHeaderKey(height uint64) string { +// GetHeaderKey returns the store key for a block header at the given height. +func GetHeaderKey(height uint64) string { return GenerateKey([]string{headerPrefix, strconv.FormatUint(height, 10)}) } -func getDataKey(height uint64) string { +func getHeaderKey(height uint64) string { return GetHeaderKey(height) } + +// GetDataKey returns the store key for block data at the given height. +func GetDataKey(height uint64) string { return GenerateKey([]string{dataPrefix, strconv.FormatUint(height, 10)}) } -func getSignatureKey(height uint64) string { +func getDataKey(height uint64) string { return GetDataKey(height) } + +// GetSignatureKey returns the store key for a block signature at the given height. +func GetSignatureKey(height uint64) string { return GenerateKey([]string{signaturePrefix, strconv.FormatUint(height, 10)}) } +func getSignatureKey(height uint64) string { return GetSignatureKey(height) } + func getStateAtHeightKey(height uint64) string { return GenerateKey([]string{statePrefix, strconv.FormatUint(height, 10)}) } -func getMetaKey(key string) string { +// GetMetaKey returns the store key for a metadata entry. +func GetMetaKey(key string) string { return GenerateKey([]string{metaPrefix, key}) } -func getIndexKey(hash types.Hash) string { +// GetIndexKey returns the store key for indexing a block by its hash. +func GetIndexKey(hash types.Hash) string { return GenerateKey([]string{indexPrefix, hash.String()}) } +func getIndexKey(hash types.Hash) string { return GetIndexKey(hash) } + func getHeightKey() string { return GenerateKey([]string{heightPrefix}) } diff --git a/pkg/store/store.go b/pkg/store/store.go index 908f42dbf..b04214904 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -113,6 +113,7 @@ func (s *DefaultStore) GetHeader(ctx context.Context, height uint64) (*types.Sig if err = header.UnmarshalBinary(headerBlob); err != nil { return nil, fmt.Errorf("unmarshal block header: %w", err) } + return header, nil } @@ -184,7 +185,7 @@ func (s *DefaultStore) DeleteStateAtHeight(ctx context.Context, height uint64) e // // Metadata is separated from other data by using prefix in KV. func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte) error { - err := s.db.Put(ctx, ds.NewKey(getMetaKey(key)), value) + err := s.db.Put(ctx, ds.NewKey(GetMetaKey(key)), value) if err != nil { return fmt.Errorf("failed to set metadata for key '%s': %w", key, err) } @@ -193,7 +194,7 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte // GetMetadata returns values stored for given key with SetMetadata. func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { - data, err := s.db.Get(ctx, ds.NewKey(getMetaKey(key))) + data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key))) if err != nil { return nil, fmt.Errorf("failed to get metadata for key '%s': %w", key, err) } @@ -204,7 +205,7 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err // This is more efficient than iterating through known keys when the set of keys is unknown. func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { // The full key in the datastore includes the meta prefix - fullPrefix := getMetaKey(prefix) + fullPrefix := GetMetaKey(prefix) results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix}) if err != nil { @@ -221,7 +222,7 @@ func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ( // Extract the original key by removing the meta prefix // The key from datastore is like "/m/cache/header-da-included/hash" // We want to return "cache/header-da-included/hash" - metaKeyPrefix := getMetaKey("") + metaKeyPrefix := GetMetaKey("") key := strings.TrimPrefix(result.Key, metaKeyPrefix) key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency @@ -236,7 +237,7 @@ func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ( // DeleteMetadata removes a metadata key from the store. func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { - err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key))) + err := s.db.Delete(ctx, ds.NewKey(GetMetaKey(key))) if err != nil { return fmt.Errorf("failed to delete metadata for key '%s': %w", key, err) } @@ -287,7 +288,7 @@ func (s *DefaultStore) Rollback(ctx context.Context, height uint64, aggregator b } else { // in case of syncing issues, rollback the included height is OK. bz := make([]byte, 8) binary.LittleEndian.PutUint64(bz, height) - if err := batch.Put(ctx, ds.NewKey(getMetaKey(DAIncludedHeightKey)), bz); err != nil { + if err := batch.Put(ctx, ds.NewKey(GetMetaKey(DAIncludedHeightKey)), bz); err != nil { return fmt.Errorf("failed to update DA included height: %w", err) } } @@ -402,12 +403,12 @@ func (s *DefaultStore) PruneBlocks(ctx context.Context, height uint64) error { } // Delete per-height DA metadata associated with this height, if any. - if err := batch.Delete(ctx, ds.NewKey(getMetaKey(GetHeightToDAHeightHeaderKey(h)))); err != nil { + if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(GetHeightToDAHeightHeaderKey(h)))); err != nil { if !errors.Is(err, ds.ErrNotFound) { return fmt.Errorf("failed to delete header DA height metadata at height %d during pruning: %w", h, err) } } - if err := batch.Delete(ctx, ds.NewKey(getMetaKey(GetHeightToDAHeightDataKey(h)))); err != nil { + if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(GetHeightToDAHeightDataKey(h)))); err != nil { if !errors.Is(err, ds.ErrNotFound) { return fmt.Errorf("failed to delete data DA height metadata at height %d during pruning: %w", h, err) } @@ -422,7 +423,7 @@ func (s *DefaultStore) PruneBlocks(ctx context.Context, height uint64) error { } // Persist the updated last pruned height. - if err := batch.Put(ctx, ds.NewKey(getMetaKey(LastPrunedBlockHeightKey)), encodeHeight(height)); err != nil { + if err := batch.Put(ctx, ds.NewKey(GetMetaKey(LastPrunedBlockHeightKey)), encodeHeight(height)); err != nil { return fmt.Errorf("failed to update last pruned height: %w", err) }