From bdcb4d9633e12bdfd270d2f586ff736f690e3ddc Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Thu, 2 Oct 2025 21:01:31 +0200 Subject: [PATCH 1/7] initial implementation --- cmd/etracker/start.go | 76 +++++++- deploy/app/main.tf | 15 ++ go.mod | 6 +- go.sum | 12 -- internal/config/config.go | 13 +- internal/consolidator/consolidator.go | 229 +++++++++++++++++++++++ internal/db/consolidated/consolidated.go | 21 +++ internal/db/consolidated/dynamodb.go | 126 +++++++++++++ internal/db/egress/dynamodb.go | 93 +++++++++ internal/db/egress/egress.go | 13 ++ 10 files changed, 581 insertions(+), 23 deletions(-) create mode 100644 internal/consolidator/consolidator.go create mode 100644 internal/db/consolidated/consolidated.go create mode 100644 internal/db/consolidated/dynamodb.go diff --git a/cmd/etracker/start.go b/cmd/etracker/start.go index 0ca4cf1..07447d2 100644 --- a/cmd/etracker/start.go +++ b/cmd/etracker/start.go @@ -1,7 +1,12 @@ package main import ( + "context" "fmt" + "os" + "os/signal" + "syscall" + "time" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/spf13/cobra" @@ -11,6 +16,8 @@ import ( "github.com/storacha/go-ucanto/principal/signer" "github.com/storacha/etracker/internal/config" + "github.com/storacha/etracker/internal/consolidator" + "github.com/storacha/etracker/internal/db/consolidated" "github.com/storacha/etracker/internal/db/egress" "github.com/storacha/etracker/internal/server" "github.com/storacha/etracker/internal/service" @@ -53,10 +60,35 @@ func init() { cobra.CheckErr(viper.BindPFlag("egress_table_name", startCmd.Flags().Lookup("egress-table-name"))) // bind flag to storoku-style environment variable cobra.CheckErr(viper.BindEnv("egress_table_name", "EGRESS_RECORDS_TABLE_ID")) + + startCmd.Flags().String( + "consolidated-table-name", + "", + "Name of the DynamoDB table to use for consolidated records", + ) + cobra.CheckErr(viper.BindPFlag("consolidated_table_name", startCmd.Flags().Lookup("consolidated-table-name"))) + cobra.CheckErr(viper.BindEnv("consolidated_table_name", "CONSOLIDATED_RECORDS_TABLE_ID")) + + startCmd.Flags().Int( + "consolidation-interval", + 12*60*60, + "Interval in seconds between consolidation runs", + ) + cobra.CheckErr(viper.BindPFlag("consolidation_interval", startCmd.Flags().Lookup("consolidation-interval"))) + + startCmd.Flags().Int( + "consolidation-batch-size", + 100, + "Number of records to process in each consolidation batch", + ) + cobra.CheckErr(viper.BindPFlag("consolidation_batch_size", startCmd.Flags().Lookup("consolidation-batch-size"))) } func startService(cmd *cobra.Command, args []string) error { - cfg, err := config.Load(cmd.Context()) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + cfg, err := config.Load(ctx) if err != nil { return fmt.Errorf("loading config: %w", err) } @@ -77,15 +109,53 @@ func startService(cmd *cobra.Command, args []string) error { } } - svc, err := service.New(id, egress.NewDynamoEgressTable(dynamodb.NewFromConfig(cfg.AWSConfig), cfg.EgressTableName)) + // Create DynamoDB client + dynamoClient := dynamodb.NewFromConfig(cfg.AWSConfig) + + // Create database tables + egressTable := egress.NewDynamoEgressTable(dynamoClient, cfg.EgressTableName) + consolidatedTable := consolidated.NewDynamoConsolidatedTable(dynamoClient, cfg.ConsolidatedTableName) + + // Create service + svc, err := service.New(id, egressTable) if err != nil { return fmt.Errorf("creating service: %w", err) } + // Create server server, err := server.New(id, svc) if err != nil { return fmt.Errorf("creating server: %w", err) } - return server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port)) + // Create and start consolidator + interval := time.Duration(cfg.ConsolidationInterval) * time.Second + batchSize := cfg.ConsolidationBatchSize + + cons := consolidator.New(egressTable, consolidatedTable, interval, batchSize) + + // Start consolidator in a goroutine + go cons.Start(ctx) + + // Handle graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + errCh := make(chan error, 1) + go func() { + log.Infof("Starting server on port %d", cfg.Port) + errCh <- server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port)) + }() + + select { + case err := <-errCh: + log.Errorf("Server error: %v", err) + cons.Stop() + return err + case sig := <-sigCh: + log.Infof("Received signal %v, shutting down gracefully", sig) + cons.Stop() + cancel() + return nil + } } diff --git a/deploy/app/main.tf b/deploy/app/main.tf index 7bf330d..79bcaf3 100644 --- a/deploy/app/main.tf +++ b/deploy/app/main.tf @@ -82,6 +82,21 @@ module "app" { hash_key = "PK" range_key ="SK" }, + { + name = "consolidated" + attributes = [ + { + name = "NodeDID" + type = "S" + }, + { + name = "ReceiptsBatchCID" + type = "S" + }, + ] + hash_key = "NodeDID" + range_key = "ReceiptsBatchCID" + }, ] buckets = [ ] diff --git a/go.mod b/go.mod index 3334bfe..0c23e13 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,14 @@ go 1.24.4 require ( github.com/aws/aws-sdk-go-v2 v1.39.1 + github.com/aws/aws-sdk-go-v2/config v1.31.10 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.3 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.47.0 github.com/go-playground/validator/v10 v10.27.0 github.com/google/uuid v1.6.0 + github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-log/v2 v2.7.0 + github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 github.com/storacha/go-libstoracha v0.2.7 @@ -16,7 +19,6 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2/config v1.31.10 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.14 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.8 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.8 // indirect @@ -50,7 +52,6 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-blockservice v0.5.2 // indirect - github.com/ipfs/go-cid v0.5.0 // indirect github.com/ipfs/go-datastore v0.8.2 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect @@ -65,7 +66,6 @@ require ( github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/ipld/go-car v0.6.2 // indirect github.com/ipld/go-codec-dagpb v1.6.0 // indirect - github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/leodido/go-urn v1.4.0 // indirect diff --git a/go.sum b/go.sum index 5274a90..98de1d8 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,6 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU= -github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= github.com/aws/aws-sdk-go-v2 v1.39.1 h1:fWZhGAwVRK/fAN2tmt7ilH4PPAE11rDj7HytrmbZ2FE= github.com/aws/aws-sdk-go-v2 v1.39.1/go.mod h1:sDioUELIUO9Znk23YVmIk86/9DOpkbyyVb1i/gUNFXY= github.com/aws/aws-sdk-go-v2/config v1.31.10 h1:7LllDZAegXU3yk41mwM6KcPu0wmjKGQB1bg99bNdQm4= @@ -63,12 +61,8 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.3 h1:RrxJ6g7+ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.3/go.mod h1:e4y84j44vA9IFksSDDuAtNj9t3W20iJlsbXhbo/JU10= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.8 h1:gLD09eaJUdiszm7vd1btiQUYE0Hj+0I2b8AS+75z9AY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.8/go.mod h1:4RW3oMPt1POR74qVOC4SbubxAwdP4pCT0nSw3jycOU4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nvcishlz0nksIt2PIzDglLMP0vA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.8 h1:6bgAZgRyT4RoFWhxS+aoGMFyE0cD1bSzFnEEi4bFPGI= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.8/go.mod h1:KcGkXFVU8U28qS4KvLEcPxytPZPBcRawaH2Pf/0jptE= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.8 h1:HhJYoES3zOz34yWEpGENqJvRVPqpmJyR3+AFg9ybhdY= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.8/go.mod h1:JnA+hPWeYAVbDssp83tv+ysAG8lTfLVXvSsyKg/7xNA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= @@ -77,8 +71,6 @@ github.com/aws/aws-sdk-go-v2/service/dynamodb v1.47.0 h1:A5zeikrrAgz3YtNzhMat4K8 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.47.0/go.mod h1:tMQ/Edfn5xLcBFSVd3JDreJPias8GqBq0dVbCbMz9vs= github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.29.0 h1:SNys2IbAlovw/c/7Q+f0GXlSMnY/vML5Ex9LStTF0Zc= github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.29.0/go.mod h1:GoaIvEhueZB2eDyU7wV8m9K6Wez1e3Pt4f0JrAyIr08= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 h1:oegbebPEMA/1Jny7kvwejowCaHz1FWZAQ94WXFNCyTM= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1/go.mod h1:kemo5Myr9ac0U9JfSjMo9yHLtw+pECEHsFtJ9tqCEI8= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.3 h1:xMmJPUT0G1q9+I0mzH4B6oN9fB5PkDoD+jvpVIcom1I= @@ -91,8 +83,6 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.0 h1:I7ghctfGXrscr7r1Ga/mDqSJ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.0/go.mod h1:Zo9id81XP6jbayIFWNuDpA6lMBWhsVy+3ou2jLa4JnA= github.com/aws/aws-sdk-go-v2/service/sts v1.38.5 h1:+LVB0xBqEgjQoqr9bGZbRzvg212B0f17JdflleJRNR4= github.com/aws/aws-sdk-go-v2/service/sts v1.38.5/go.mod h1:xoaxeqnnUaZjPjaICgIy5B+MHCSb/ZSOn4MvkFNOUA0= -github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= -github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE= github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= @@ -590,8 +580,6 @@ github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/storacha/go-libstoracha v0.2.7 h1:IlsffQq3mr8hDMqXIFyRLKVTHhq7qelyq8if/eivNC0= github.com/storacha/go-libstoracha v0.2.7/go.mod h1:zzeqIZhBBuWR2dkGygYqv4Bhg3JsvHuuvDCcdXCwGhg= -github.com/storacha/go-ucanto v0.5.0 h1:BCYfTOjJ7DxmoGwpZn4N1XITWj1BdKIbk5Ok7kMoQ6I= -github.com/storacha/go-ucanto v0.5.0/go.mod h1:/I6qtE+oDHc+6lBc/glN+RFx0cbP/mDj4gihD7YezWc= github.com/storacha/go-ucanto v0.6.4 h1:2a0IKtdIVqq1Y36LYFKtUVQXo7t5fxxvZ17B3AhICWM= github.com/storacha/go-ucanto v0.6.4/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/config/config.go b/internal/config/config.go index 9856fce..e42caca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,11 +10,14 @@ import ( ) type Config struct { - Port int `mapstructure:"port" validate:"required,min=1,max=65535"` - PrivateKey string `mapstructure:"private_key" validate:"required"` - DID string `mapstructure:"did" validate:"startswith=did:web:"` - AWSConfig aws.Config `mapstructure:"aws_config"` - EgressTableName string `mapstructure:"egress_table_name" validate:"required"` + Port int `mapstructure:"port" validate:"required,min=1,max=65535"` + PrivateKey string `mapstructure:"private_key" validate:"required"` + DID string `mapstructure:"did" validate:"startswith=did:web:"` + AWSConfig aws.Config `mapstructure:"aws_config"` + EgressTableName string `mapstructure:"egress_table_name" validate:"required"` + ConsolidatedTableName string `mapstructure:"consolidated_table_name" validate:"required"` + ConsolidationInterval int `mapstructure:"consolidation_interval" validate:"min=300"` + ConsolidationBatchSize int `mapstructure:"consolidation_batch_size" validate:"min=1"` } func Load(ctx context.Context) (*Config, error) { diff --git a/internal/consolidator/consolidator.go b/internal/consolidator/consolidator.go new file mode 100644 index 0000000..456aeb5 --- /dev/null +++ b/internal/consolidator/consolidator.go @@ -0,0 +1,229 @@ +package consolidator + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/storacha/go-libstoracha/capabilities/space/content" + "github.com/storacha/go-ucanto/core/car" + "github.com/storacha/go-ucanto/core/receipt" + "github.com/storacha/go-ucanto/core/result" + fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel" + + "github.com/storacha/etracker/internal/db/consolidated" + "github.com/storacha/etracker/internal/db/egress" +) + +var log = logging.Logger("consolidator") + +type Consolidator struct { + egressTable egress.EgressTable + consolidatedTable consolidated.ConsolidatedTable + httpClient *http.Client + interval time.Duration + batchSize int + stopCh chan struct{} +} + +func New(egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator { + return &Consolidator{ + egressTable: egressTable, + consolidatedTable: consolidatedTable, + httpClient: &http.Client{Timeout: 30 * time.Second}, + interval: interval, + batchSize: batchSize, + stopCh: make(chan struct{}), + } +} + +func (c *Consolidator) Start(ctx context.Context) { + ticker := time.NewTicker(c.interval) + + log.Infof("Consolidator started with interval: %v", c.interval) + + for { + select { + case <-ctx.Done(): + log.Info("Consolidator stopping due to context cancellation") + return + case <-c.stopCh: + log.Info("Consolidator stopping") + return + case <-ticker.C: + if err := c.Consolidate(ctx); err != nil { + log.Errorf("Consolidation error: %v", err) + } + } + } +} + +func (c *Consolidator) Stop() { + close(c.stopCh) +} + +func (c *Consolidator) Consolidate(ctx context.Context) error { + log.Info("Starting consolidation cycle") + + // Get unprocessed records + records, err := c.egressTable.GetUnprocessed(ctx, c.batchSize) + if err != nil { + return fmt.Errorf("fetching unprocessed records: %w", err) + } + + if len(records) == 0 { + log.Debug("No unprocessed records found") + return nil + } + + log.Infof("Processing %d unprocessed records", len(records)) + + // Process each record (each record represents a batch of receipts for a single node) + for _, record := range records { + // Fetch receipts from the endpoint + receipts, err := c.fetchReceipts(ctx, record) + if err != nil { + log.Errorf("Failed to fetch receipts for record (nodeID=%s): %v", record.NodeID, err) + continue + } + + // Process each receipt in the batch + totalBytes := uint64(0) + for _, rcpt := range receipts { + retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType()) + if err != nil { + log.Warnf("receipt doesn't seem a retrieval receipt: %w", err) + continue + } + + if err := c.validateReceipt(retrievalRcpt); err != nil { + log.Warnf("Invalid receipt: %v", err) + continue + } + + size, err := c.extractSize(retrievalRcpt) + if err != nil { + log.Warnf("Failed to extract size from receipt: %v", err) + continue + } + + totalBytes += size + } + + // Store consolidated record (one per batch) + if err := c.consolidatedTable.Add(ctx, record.NodeID, record.Receipts, totalBytes); err != nil { + log.Errorf("Failed to add consolidated record for node %s, batch %s: %v", record.NodeID, record.Receipts, err) + continue + } + log.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts) + } + + // Mark records as processed + if err := c.egressTable.MarkAsProcessed(ctx, records); err != nil { + return fmt.Errorf("marking records as processed: %w", err) + } + + log.Infof("Consolidation cycle completed. Processed %d records", len(records)) + + return nil +} + +func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRecord) ([]receipt.AnyReceipt, error) { + // Substitute {cid} in the endpoint URL with the receipts CID + batchURLStr := record.Endpoint.String() + batchCID := record.Receipts.String() + + // Handle both {cid} and :cid patterns + batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCID) + batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCID) + + batchURL, err := url.Parse(batchURLStr) + if err != nil { + return nil, fmt.Errorf("parsing batch URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "GET", batchURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("creating HTTP request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("fetching receipts from %s: %w", batchURL.String(), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // a receipt batch is a flat CAR file where each block is an archived receipt + _, blks, err := car.Decode(resp.Body) + if err != nil { + return nil, fmt.Errorf("decoding receipt batch: %w", err) + } + defer resp.Body.Close() + + var rcpts []receipt.AnyReceipt + for blk, err := range blks { + if err != nil { + return nil, fmt.Errorf("iterating over receipt blocks: %w", err) + } + + rcpt, err := receipt.Extract(blk.Bytes()) + if err != nil { + return nil, fmt.Errorf("extracting receipt: %w", err) + } + + rcpts = append(rcpts, rcpt) + } + + return rcpts, nil +} + +func (c *Consolidator) validateReceipt(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) error { + _, x := result.Unwrap(retrievalRcpt.Out()) + var emptyFailure fdm.FailureModel + if x != emptyFailure { + return fmt.Errorf("receipt is a failure receipt") + } + + // TODO: do more validation here. + // At the very least that the invocation is a retrieval invocation and the audience is the node + + return nil +} + +func (c *Consolidator) extractSize(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) (uint64, error) { + _, x := result.Unwrap(retrievalRcpt.Out()) + var emptyFailure fdm.FailureModel + if x != emptyFailure { + return 0, fmt.Errorf("receipt is a failure receipt") + } + + inv, ok := retrievalRcpt.Ran().Invocation() + if !ok { + return 0, fmt.Errorf("expected the ran invocation to be attached to the receipt") + } + + caps := inv.Capabilities() + if len(caps) != 1 { + return 0, fmt.Errorf("expected exactly one capability in the invocation") + } + + cap := caps[0] + if cap.Can() != content.RetrieveAbility { + return 0, fmt.Errorf("original invocation is not a retrieval invocation, but a %s", cap.Can()) + } + + caveats, err := content.RetrieveCaveatsReader.Read(cap.Nb()) + if err != nil { + return 0, fmt.Errorf("reading caveats from invocation: %w", err) + } + + return caveats.Range.End - caveats.Range.Start + 1, nil +} diff --git a/internal/db/consolidated/consolidated.go b/internal/db/consolidated/consolidated.go new file mode 100644 index 0000000..3a41fe2 --- /dev/null +++ b/internal/db/consolidated/consolidated.go @@ -0,0 +1,21 @@ +package consolidated + +import ( + "context" + + "github.com/storacha/go-ucanto/did" + "github.com/storacha/go-ucanto/ucan" +) + +type ConsolidatedRecord struct { + NodeDID did.DID + ReceiptsBatchCID ucan.Link + TotalBytes uint64 + ProcessedAt string +} + +type ConsolidatedTable interface { + Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error + Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) + GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) +} \ No newline at end of file diff --git a/internal/db/consolidated/dynamodb.go b/internal/db/consolidated/dynamodb.go new file mode 100644 index 0000000..782eaca --- /dev/null +++ b/internal/db/consolidated/dynamodb.go @@ -0,0 +1,126 @@ +package consolidated + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/storacha/go-ucanto/did" + "github.com/storacha/go-ucanto/ucan" +) + +var _ ConsolidatedTable = (*DynamoConsolidatedTable)(nil) + +type DynamoConsolidatedTable struct { + client *dynamodb.Client + tableName string +} + +func NewDynamoConsolidatedTable(client *dynamodb.Client, tableName string) *DynamoConsolidatedTable { + return &DynamoConsolidatedTable{client, tableName} +} + +type consolidatedRecord struct { + NodeDID string `dynamodbav:"NodeDID"` + ReceiptsBatchCID string `dynamodbav:"ReceiptsBatchCID"` + TotalBytes uint64 `dynamodbav:"TotalBytes"` + ProcessedAt string `dynamodbav:"ProcessedAt"` +} + +func (d *DynamoConsolidatedTable) Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error { + record := consolidatedRecord{ + NodeDID: nodeDID.String(), + ReceiptsBatchCID: receiptsBatchCID.String(), + TotalBytes: bytes, + ProcessedAt: time.Now().UTC().Format(time.RFC3339), + } + + item, err := attributevalue.MarshalMap(record) + if err != nil { + return fmt.Errorf("serializing consolidated record: %w", err) + } + + _, err = d.client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(d.tableName), + Item: item, + }) + if err != nil { + return fmt.Errorf("storing consolidated record: %w", err) + } + + return nil +} + +func (d *DynamoConsolidatedTable) Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) { + result, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + "NodeDID": &types.AttributeValueMemberS{Value: nodeDID.String()}, + "ReceiptsBatchCID": &types.AttributeValueMemberS{Value: receiptsBatchCID.String()}, + }, + }) + if err != nil { + return nil, fmt.Errorf("getting consolidated record: %w", err) + } + + if result.Item == nil { + return nil, fmt.Errorf("record not found") + } + + return d.unmarshalRecord(result.Item) +} + +func (d *DynamoConsolidatedTable) GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) { + result, err := d.client.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(d.tableName), + KeyConditionExpression: aws.String("NodeDID = :nodeDID"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":nodeDID": &types.AttributeValueMemberS{Value: nodeDID.String()}, + }, + }) + if err != nil { + return nil, fmt.Errorf("querying consolidated records by node: %w", err) + } + + records := make([]ConsolidatedRecord, 0, len(result.Items)) + for _, item := range result.Items { + record, err := d.unmarshalRecord(item) + if err != nil { + return nil, err + } + records = append(records, *record) + } + + return records, nil +} + +func (d *DynamoConsolidatedTable) unmarshalRecord(item map[string]types.AttributeValue) (*ConsolidatedRecord, error) { + var record consolidatedRecord + if err := attributevalue.UnmarshalMap(item, &record); err != nil { + return nil, fmt.Errorf("unmarshaling consolidated record: %w", err) + } + + parsedDID, err := did.Parse(record.NodeDID) + if err != nil { + return nil, fmt.Errorf("parsing node DID: %w", err) + } + + c, err := cid.Decode(record.ReceiptsBatchCID) + if err != nil { + return nil, fmt.Errorf("parsing receipts batch CID: %w", err) + } + receiptsBatchCID := cidlink.Link{Cid: c} + + return &ConsolidatedRecord{ + NodeDID: parsedDID, + ReceiptsBatchCID: receiptsBatchCID, + TotalBytes: record.TotalBytes, + ProcessedAt: record.ProcessedAt, + }, nil +} \ No newline at end of file diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index 2debef4..78d36e9 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -10,7 +10,10 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/google/uuid" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) @@ -39,6 +42,7 @@ type egressRecord struct { Receipts string `dynamodbav:"receipts"` Endpoint string `dynamodbav:"endpoint"` ReceivedAt string `dynamodbav:"receivedAt"` + Processed bool `dynamodbav:"processed"` } func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL) egressRecord { @@ -56,6 +60,7 @@ func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL) egressReco Receipts: receipts.String(), Endpoint: endpoint.String(), ReceivedAt: receivedAt.Format(time.RFC3339), + Processed: false, } } @@ -73,3 +78,91 @@ func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts } return nil } + +func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) { + // Scan all shards for the current date for unprocessed records + today := time.Now().UTC().Format("2006-01-02") + var allRecords []EgressRecord + + for shard := range 10 { + pk := fmt.Sprintf("%s#%d", today, shard) + + result, err := d.client.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(d.tableName), + KeyConditionExpression: aws.String("PK = :pk"), + FilterExpression: aws.String("attribute_not_exists(processed) OR processed = :false"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":pk": &types.AttributeValueMemberS{Value: pk}, + ":false": &types.AttributeValueMemberBOOL{Value: false}, + }, + Limit: aws.Int32(int32(limit)), + }) + if err != nil { + return nil, fmt.Errorf("querying unprocessed records for shard %d: %w", shard, err) + } + + for _, item := range result.Items { + var record egressRecord + if err := attributevalue.UnmarshalMap(item, &record); err != nil { + return nil, fmt.Errorf("unmarshaling egress record: %w", err) + } + + nodeID, err := did.Parse(record.NodeID) + if err != nil { + return nil, fmt.Errorf("parsing node DID: %w", err) + } + + c, err := cid.Decode(record.Receipts) + if err != nil { + return nil, fmt.Errorf("parsing receipts CID: %w", err) + } + receipts := cidlink.Link{Cid: c} + + endpoint, err := url.Parse(record.Endpoint) + if err != nil { + return nil, fmt.Errorf("parsing endpoint URL: %w", err) + } + + receivedAt, err := time.Parse(time.RFC3339, record.ReceivedAt) + if err != nil { + return nil, fmt.Errorf("parsing received at time: %w", err) + } + + allRecords = append(allRecords, EgressRecord{ + PK: record.PK, + SK: record.SK, + NodeID: nodeID, + Receipts: receipts, + Endpoint: endpoint, + ReceivedAt: receivedAt, + Processed: record.Processed, + }) + + if len(allRecords) >= limit { + return allRecords, nil + } + } + } + + return allRecords, nil +} + +func (d *DynamoEgressTable) MarkAsProcessed(ctx context.Context, records []EgressRecord) error { + for _, record := range records { + _, err := d.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{Value: record.PK}, + "SK": &types.AttributeValueMemberS{Value: record.SK}, + }, + UpdateExpression: aws.String("SET processed = :true"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":true": &types.AttributeValueMemberBOOL{Value: true}, + }, + }) + if err != nil { + return fmt.Errorf("marking record as processed (PK=%s, SK=%s): %w", record.PK, record.SK, err) + } + } + return nil +} diff --git a/internal/db/egress/egress.go b/internal/db/egress/egress.go index ce05f9b..04904e3 100644 --- a/internal/db/egress/egress.go +++ b/internal/db/egress/egress.go @@ -3,11 +3,24 @@ package egress import ( "context" "net/url" + "time" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) +type EgressRecord struct { + PK string + SK string + NodeID did.DID + Receipts ucan.Link + Endpoint *url.URL + ReceivedAt time.Time + Processed bool +} + type EgressTable interface { Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL) error + GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) + MarkAsProcessed(ctx context.Context, records []EgressRecord) error } From adb5db26b376203221ccb559fddd435432ed795a Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 11:09:01 +0200 Subject: [PATCH 2/7] generate consolidate invocation and receipt --- cmd/etracker/start.go | 2 +- internal/consolidator/consolidator.go | 34 ++++++++++++++++++++++++++- internal/db/egress/dynamodb.go | 15 +++++++++--- internal/db/egress/egress.go | 3 ++- internal/server/methods.go | 2 +- internal/service/service.go | 4 ++-- 6 files changed, 51 insertions(+), 9 deletions(-) diff --git a/cmd/etracker/start.go b/cmd/etracker/start.go index 07447d2..25a7438 100644 --- a/cmd/etracker/start.go +++ b/cmd/etracker/start.go @@ -132,7 +132,7 @@ func startService(cmd *cobra.Command, args []string) error { interval := time.Duration(cfg.ConsolidationInterval) * time.Second batchSize := cfg.ConsolidationBatchSize - cons := consolidator.New(egressTable, consolidatedTable, interval, batchSize) + cons := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize) // Start consolidator in a goroutine go cons.Start(ctx) diff --git a/internal/consolidator/consolidator.go b/internal/consolidator/consolidator.go index 456aeb5..8f5c480 100644 --- a/internal/consolidator/consolidator.go +++ b/internal/consolidator/consolidator.go @@ -10,10 +10,14 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/storacha/go-libstoracha/capabilities/space/content" + capegress "github.com/storacha/go-libstoracha/capabilities/space/egress" "github.com/storacha/go-ucanto/core/car" + "github.com/storacha/go-ucanto/core/delegation" "github.com/storacha/go-ucanto/core/receipt" + "github.com/storacha/go-ucanto/core/receipt/ran" "github.com/storacha/go-ucanto/core/result" fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel" + "github.com/storacha/go-ucanto/principal" "github.com/storacha/etracker/internal/db/consolidated" "github.com/storacha/etracker/internal/db/egress" @@ -22,6 +26,7 @@ import ( var log = logging.Logger("consolidator") type Consolidator struct { + id principal.Signer egressTable egress.EgressTable consolidatedTable consolidated.ConsolidatedTable httpClient *http.Client @@ -30,8 +35,9 @@ type Consolidator struct { stopCh chan struct{} } -func New(egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator { +func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator { return &Consolidator{ + id: id, egressTable: egressTable, consolidatedTable: consolidatedTable, httpClient: &http.Client{Timeout: 30 * time.Second}, @@ -84,6 +90,23 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { // Process each record (each record represents a batch of receipts for a single node) for _, record := range records { + // According to the spec, consolidation happens as a result of a `space/egress/consolidate` invocation. + // Since the service is invoking it on itself, we will generate it here. + // Is this acceptable or should we create and register a handler and follow the full go-ucanto flow instead? + inv, err := capegress.Consolidate.Invoke( + c.id, + c.id, + c.id.DID().String(), + capegress.ConsolidateCaveats{ + Cause: record.Cause, + }, + delegation.WithNoExpiration(), + ) + if err != nil { + log.Errorf("generating consolidation invocation: %w", err) + continue + } + // Fetch receipts from the endpoint receipts, err := c.fetchReceipts(ctx, record) if err != nil { @@ -119,6 +142,15 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { log.Errorf("Failed to add consolidated record for node %s, batch %s: %v", record.NodeID, record.Receipts, err) continue } + + // Issue the receipt for the consolidation operation + // TODO: store in the DB + _, err = receipt.Issue(c.id, result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{}), ran.FromInvocation(inv)) + if err != nil { + log.Errorf("Failed to issue consolidation receipt: %v", err) + continue + } + log.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts) } diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index 78d36e9..b1b2d50 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -41,11 +41,12 @@ type egressRecord struct { NodeID string `dynamodbav:"nodeID"` Receipts string `dynamodbav:"receipts"` Endpoint string `dynamodbav:"endpoint"` + Cause string `dynamodbav:"cause"` ReceivedAt string `dynamodbav:"receivedAt"` Processed bool `dynamodbav:"processed"` } -func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL) egressRecord { +func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) egressRecord { // TODO: review keys to improve performance and access patterns receivedAt := time.Now().UTC() dateStr := receivedAt.Format("2006-01-02") @@ -59,13 +60,14 @@ func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL) egressReco NodeID: nodeID.String(), Receipts: receipts.String(), Endpoint: endpoint.String(), + Cause: cause.String(), ReceivedAt: receivedAt.Format(time.RFC3339), Processed: false, } } -func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts ucan.Link, endpoint *url.URL) error { - item, err := attributevalue.MarshalMap(newRecord(nodeID, receipts, endpoint)) +func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) error { + item, err := attributevalue.MarshalMap(newRecord(nodeID, receipts, endpoint, cause)) if err != nil { return fmt.Errorf("serializing egress record: %w", err) } @@ -128,12 +130,19 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg return nil, fmt.Errorf("parsing received at time: %w", err) } + cause, err := cid.Decode(record.Cause) + if err != nil { + return nil, fmt.Errorf("parsing cause CID: %w", err) + } + causeLink := cidlink.Link{Cid: cause} + allRecords = append(allRecords, EgressRecord{ PK: record.PK, SK: record.SK, NodeID: nodeID, Receipts: receipts, Endpoint: endpoint, + Cause: causeLink, ReceivedAt: receivedAt, Processed: record.Processed, }) diff --git a/internal/db/egress/egress.go b/internal/db/egress/egress.go index 04904e3..3730bd1 100644 --- a/internal/db/egress/egress.go +++ b/internal/db/egress/egress.go @@ -15,12 +15,13 @@ type EgressRecord struct { NodeID did.DID Receipts ucan.Link Endpoint *url.URL + Cause ucan.Link ReceivedAt time.Time Processed bool } type EgressTable interface { - Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL) error + Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL, cause ucan.Link) error GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) MarkAsProcessed(ctx context.Context, records []EgressRecord) error } diff --git a/internal/server/methods.go b/internal/server/methods.go index 2798f26..a31235e 100644 --- a/internal/server/methods.go +++ b/internal/server/methods.go @@ -38,7 +38,7 @@ func ucanTrackHandler(svc *service.Service) func( receipts := cap.Nb().Receipts endpoint := cap.Nb().Endpoint - err := svc.Record(ctx, nodeDID, receipts, endpoint) + err := svc.Record(ctx, nodeDID, receipts, endpoint, inv.Link()) if err != nil { return result.Error[egress.TrackOk, egress.TrackError](egress.NewTrackError(err.Error())), nil, nil } diff --git a/internal/service/service.go b/internal/service/service.go index 3c85f21..0b47024 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -20,6 +20,6 @@ func New(id principal.Signer, egressTable egress.EgressTable) (*Service, error) return &Service{id: id, egressTable: egressTable}, nil } -func (s *Service) Record(ctx context.Context, nodeDID did.DID, receipts ucan.Link, endpoint *url.URL) error { - return s.egressTable.Record(ctx, nodeDID, receipts, endpoint) +func (s *Service) Record(ctx context.Context, nodeDID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) error { + return s.egressTable.Record(ctx, nodeDID, receipts, endpoint, cause) } From 31bab278a58c9eb49560d0f591049109d65bb154 Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 11:24:40 +0200 Subject: [PATCH 3/7] add receipts endpoint --- internal/server/handlers.go | 18 ++++++++++++++++++ internal/server/server.go | 1 + 2 files changed, 19 insertions(+) diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 858dddd..0336279 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -5,6 +5,8 @@ import ( "io" "net/http" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/storacha/go-ucanto/principal/signer" ucanhttp "github.com/storacha/go-ucanto/transport/http" @@ -45,3 +47,19 @@ func (s *Server) ucanHandler() http.HandlerFunc { } } } + +func (s *Server) getReceiptsHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cidStr := r.PathValue("cid") + cid, err := cid.Decode(cidStr) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + _ = cidlink.Link{Cid: cid} + + // TODO: fetch receipt from DB + w.WriteHeader(http.StatusNotFound) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 9099710..d3dc6e6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -39,6 +39,7 @@ func (s *Server) ListenAndServe(addr string) error { mux.HandleFunc("GET /", s.getRootHandler()) mux.HandleFunc("POST /track", s.ucanHandler()) + mux.HandleFunc("GET /receipts/{cid}", s.getReceiptsHandler()) log.Infof("Listening on %s", addr) return http.ListenAndServe(addr, mux) From c981df53730b88753cd78193a963c5c2dace92ba Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 19:19:25 +0200 Subject: [PATCH 4/7] rename consolidated table --- .storoku.json | 15 +++++++++++++++ deploy/app/main.tf | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/.storoku.json b/.storoku.json index 3b2e068..3941b11 100644 --- a/.storoku.json +++ b/.storoku.json @@ -27,6 +27,21 @@ ], "hashKey": "PK", "rangeKey": "SK" + }, + { + "name": "consolidated-records", + "attributes": [ + { + "name": "NodeDID", + "type": "S" + }, + { + "name": "ReceiptsBatchCID", + "type": "S" + } + ], + "hashKey": "NodeDID", + "rangeKey": "ReceiptsBatchCID" } ], "networks": [ diff --git a/deploy/app/main.tf b/deploy/app/main.tf index 79bcaf3..c3a9698 100644 --- a/deploy/app/main.tf +++ b/deploy/app/main.tf @@ -83,7 +83,7 @@ module "app" { range_key ="SK" }, { - name = "consolidated" + name = "consolidated-records" attributes = [ { name = "NodeDID" From 9b50e931914942228f06a845976d203f21b376a6 Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 19:24:21 +0200 Subject: [PATCH 5/7] handle batch endpoint as string to only unescape once --- internal/consolidator/consolidator.go | 4 +++- internal/db/egress/dynamodb.go | 10 +++------- internal/db/egress/egress.go | 2 +- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/consolidator/consolidator.go b/internal/consolidator/consolidator.go index 8f5c480..e50097f 100644 --- a/internal/consolidator/consolidator.go +++ b/internal/consolidator/consolidator.go @@ -166,7 +166,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRecord) ([]receipt.AnyReceipt, error) { // Substitute {cid} in the endpoint URL with the receipts CID - batchURLStr := record.Endpoint.String() + batchURLStr := record.Endpoint batchCID := record.Receipts.String() // Handle both {cid} and :cid patterns @@ -178,6 +178,8 @@ func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRe return nil, fmt.Errorf("parsing batch URL: %w", err) } + log.Debugf("Fetching receipts from %s", batchURL.String()) + req, err := http.NewRequestWithContext(ctx, "GET", batchURL.String(), nil) if err != nil { return nil, fmt.Errorf("creating HTTP request: %w", err) diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index b1b2d50..31f9e0b 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -53,13 +53,14 @@ func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan shard := rand.Intn(10) pk := fmt.Sprintf("%s#%d", dateStr, shard) sk := fmt.Sprintf("%s#%s#%s", dateStr, nodeID, uuid.New()) + endpointStr, _ := url.PathUnescape(endpoint.String()) return egressRecord{ PK: pk, SK: sk, NodeID: nodeID.String(), Receipts: receipts.String(), - Endpoint: endpoint.String(), + Endpoint: endpointStr, Cause: cause.String(), ReceivedAt: receivedAt.Format(time.RFC3339), Processed: false, @@ -120,11 +121,6 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg } receipts := cidlink.Link{Cid: c} - endpoint, err := url.Parse(record.Endpoint) - if err != nil { - return nil, fmt.Errorf("parsing endpoint URL: %w", err) - } - receivedAt, err := time.Parse(time.RFC3339, record.ReceivedAt) if err != nil { return nil, fmt.Errorf("parsing received at time: %w", err) @@ -141,7 +137,7 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg SK: record.SK, NodeID: nodeID, Receipts: receipts, - Endpoint: endpoint, + Endpoint: record.Endpoint, Cause: causeLink, ReceivedAt: receivedAt, Processed: record.Processed, diff --git a/internal/db/egress/egress.go b/internal/db/egress/egress.go index 3730bd1..caac0f1 100644 --- a/internal/db/egress/egress.go +++ b/internal/db/egress/egress.go @@ -14,7 +14,7 @@ type EgressRecord struct { SK string NodeID did.DID Receipts ucan.Link - Endpoint *url.URL + Endpoint string Cause ucan.Link ReceivedAt time.Time Processed bool From d4d59448245ba200cbd5930c92b382be72a285b8 Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 19:26:12 +0200 Subject: [PATCH 6/7] processed is a reserved keyword in dynamoDB --- internal/db/egress/dynamodb.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index 31f9e0b..9fa3e7a 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -43,7 +43,7 @@ type egressRecord struct { Endpoint string `dynamodbav:"endpoint"` Cause string `dynamodbav:"cause"` ReceivedAt string `dynamodbav:"receivedAt"` - Processed bool `dynamodbav:"processed"` + Processed bool `dynamodbav:"proc"` } func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) egressRecord { @@ -93,7 +93,7 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg result, err := d.client.Query(ctx, &dynamodb.QueryInput{ TableName: aws.String(d.tableName), KeyConditionExpression: aws.String("PK = :pk"), - FilterExpression: aws.String("attribute_not_exists(processed) OR processed = :false"), + FilterExpression: aws.String("proc = :false"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":pk": &types.AttributeValueMemberS{Value: pk}, ":false": &types.AttributeValueMemberBOOL{Value: false}, @@ -160,7 +160,7 @@ func (d *DynamoEgressTable) MarkAsProcessed(ctx context.Context, records []Egres "PK": &types.AttributeValueMemberS{Value: record.PK}, "SK": &types.AttributeValueMemberS{Value: record.SK}, }, - UpdateExpression: aws.String("SET processed = :true"), + UpdateExpression: aws.String("SET proc = :true"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":true": &types.AttributeValueMemberBOOL{Value: true}, }, From 9c35728f03f4637312a0e23593515591e67434fc Mon Sep 17 00:00:00 2001 From: Vicente Olmedo Date: Fri, 3 Oct 2025 19:41:42 +0200 Subject: [PATCH 7/7] format --- internal/db/consolidated/consolidated.go | 2 +- internal/db/consolidated/dynamodb.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/db/consolidated/consolidated.go b/internal/db/consolidated/consolidated.go index 3a41fe2..6d68d68 100644 --- a/internal/db/consolidated/consolidated.go +++ b/internal/db/consolidated/consolidated.go @@ -18,4 +18,4 @@ type ConsolidatedTable interface { Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) -} \ No newline at end of file +} diff --git a/internal/db/consolidated/dynamodb.go b/internal/db/consolidated/dynamodb.go index 782eaca..605ab45 100644 --- a/internal/db/consolidated/dynamodb.go +++ b/internal/db/consolidated/dynamodb.go @@ -123,4 +123,4 @@ func (d *DynamoConsolidatedTable) unmarshalRecord(item map[string]types.Attribut TotalBytes: record.TotalBytes, ProcessedAt: record.ProcessedAt, }, nil -} \ No newline at end of file +}