diff --git a/.storoku.json b/.storoku.json index 3b2e068..3941b11 100644 --- a/.storoku.json +++ b/.storoku.json @@ -27,6 +27,21 @@ ], "hashKey": "PK", "rangeKey": "SK" + }, + { + "name": "consolidated-records", + "attributes": [ + { + "name": "NodeDID", + "type": "S" + }, + { + "name": "ReceiptsBatchCID", + "type": "S" + } + ], + "hashKey": "NodeDID", + "rangeKey": "ReceiptsBatchCID" } ], "networks": [ diff --git a/cmd/etracker/start.go b/cmd/etracker/start.go index 0ca4cf1..25a7438 100644 --- a/cmd/etracker/start.go +++ b/cmd/etracker/start.go @@ -1,7 +1,12 @@ package main import ( + "context" "fmt" + "os" + "os/signal" + "syscall" + "time" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/spf13/cobra" @@ -11,6 +16,8 @@ import ( "github.com/storacha/go-ucanto/principal/signer" "github.com/storacha/etracker/internal/config" + "github.com/storacha/etracker/internal/consolidator" + "github.com/storacha/etracker/internal/db/consolidated" "github.com/storacha/etracker/internal/db/egress" "github.com/storacha/etracker/internal/server" "github.com/storacha/etracker/internal/service" @@ -53,10 +60,35 @@ func init() { cobra.CheckErr(viper.BindPFlag("egress_table_name", startCmd.Flags().Lookup("egress-table-name"))) // bind flag to storoku-style environment variable cobra.CheckErr(viper.BindEnv("egress_table_name", "EGRESS_RECORDS_TABLE_ID")) + + startCmd.Flags().String( + "consolidated-table-name", + "", + "Name of the DynamoDB table to use for consolidated records", + ) + cobra.CheckErr(viper.BindPFlag("consolidated_table_name", startCmd.Flags().Lookup("consolidated-table-name"))) + cobra.CheckErr(viper.BindEnv("consolidated_table_name", "CONSOLIDATED_RECORDS_TABLE_ID")) + + startCmd.Flags().Int( + "consolidation-interval", + 12*60*60, + "Interval in seconds between consolidation runs", + ) + cobra.CheckErr(viper.BindPFlag("consolidation_interval", startCmd.Flags().Lookup("consolidation-interval"))) + + startCmd.Flags().Int( + "consolidation-batch-size", + 100, + "Number of records to process in each consolidation batch", + ) + cobra.CheckErr(viper.BindPFlag("consolidation_batch_size", startCmd.Flags().Lookup("consolidation-batch-size"))) } func startService(cmd *cobra.Command, args []string) error { - cfg, err := config.Load(cmd.Context()) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + cfg, err := config.Load(ctx) if err != nil { return fmt.Errorf("loading config: %w", err) } @@ -77,15 +109,53 @@ func startService(cmd *cobra.Command, args []string) error { } } - svc, err := service.New(id, egress.NewDynamoEgressTable(dynamodb.NewFromConfig(cfg.AWSConfig), cfg.EgressTableName)) + // Create DynamoDB client + dynamoClient := dynamodb.NewFromConfig(cfg.AWSConfig) + + // Create database tables + egressTable := egress.NewDynamoEgressTable(dynamoClient, cfg.EgressTableName) + consolidatedTable := consolidated.NewDynamoConsolidatedTable(dynamoClient, cfg.ConsolidatedTableName) + + // Create service + svc, err := service.New(id, egressTable) if err != nil { return fmt.Errorf("creating service: %w", err) } + // Create server server, err := server.New(id, svc) if err != nil { return fmt.Errorf("creating server: %w", err) } - return server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port)) + // Create and start consolidator + interval := time.Duration(cfg.ConsolidationInterval) * time.Second + batchSize := cfg.ConsolidationBatchSize + + cons := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize) + + // Start consolidator in a goroutine + go cons.Start(ctx) + + // Handle graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + errCh := make(chan error, 1) + go func() { + log.Infof("Starting server on port %d", cfg.Port) + errCh <- server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port)) + }() + + select { + case err := <-errCh: + log.Errorf("Server error: %v", err) + cons.Stop() + return err + case sig := <-sigCh: + log.Infof("Received signal %v, shutting down gracefully", sig) + cons.Stop() + cancel() + return nil + } } diff --git a/deploy/app/main.tf b/deploy/app/main.tf index 7bf330d..c3a9698 100644 --- a/deploy/app/main.tf +++ b/deploy/app/main.tf @@ -82,6 +82,21 @@ module "app" { hash_key = "PK" range_key ="SK" }, + { + name = "consolidated-records" + attributes = [ + { + name = "NodeDID" + type = "S" + }, + { + name = "ReceiptsBatchCID" + type = "S" + }, + ] + hash_key = "NodeDID" + range_key = "ReceiptsBatchCID" + }, ] buckets = [ ] diff --git a/go.mod b/go.mod index 1f7a1a6..0c23e13 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,9 @@ require ( github.com/aws/aws-sdk-go-v2/service/dynamodb v1.47.0 github.com/go-playground/validator/v10 v10.27.0 github.com/google/uuid v1.6.0 + github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-log/v2 v2.7.0 + github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 github.com/storacha/go-libstoracha v0.2.7 @@ -50,7 +52,6 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-blockservice v0.5.2 // indirect - github.com/ipfs/go-cid v0.5.0 // indirect github.com/ipfs/go-datastore v0.8.2 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect @@ -65,7 +66,6 @@ require ( github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/ipld/go-car v0.6.2 // indirect github.com/ipld/go-codec-dagpb v1.6.0 // indirect - github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/leodido/go-urn v1.4.0 // indirect diff --git a/internal/config/config.go b/internal/config/config.go index 9856fce..e42caca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,11 +10,14 @@ import ( ) type Config struct { - Port int `mapstructure:"port" validate:"required,min=1,max=65535"` - PrivateKey string `mapstructure:"private_key" validate:"required"` - DID string `mapstructure:"did" validate:"startswith=did:web:"` - AWSConfig aws.Config `mapstructure:"aws_config"` - EgressTableName string `mapstructure:"egress_table_name" validate:"required"` + Port int `mapstructure:"port" validate:"required,min=1,max=65535"` + PrivateKey string `mapstructure:"private_key" validate:"required"` + DID string `mapstructure:"did" validate:"startswith=did:web:"` + AWSConfig aws.Config `mapstructure:"aws_config"` + EgressTableName string `mapstructure:"egress_table_name" validate:"required"` + ConsolidatedTableName string `mapstructure:"consolidated_table_name" validate:"required"` + ConsolidationInterval int `mapstructure:"consolidation_interval" validate:"min=300"` + ConsolidationBatchSize int `mapstructure:"consolidation_batch_size" validate:"min=1"` } func Load(ctx context.Context) (*Config, error) { diff --git a/internal/consolidator/consolidator.go b/internal/consolidator/consolidator.go new file mode 100644 index 0000000..e50097f --- /dev/null +++ b/internal/consolidator/consolidator.go @@ -0,0 +1,263 @@ +package consolidator + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/storacha/go-libstoracha/capabilities/space/content" + capegress "github.com/storacha/go-libstoracha/capabilities/space/egress" + "github.com/storacha/go-ucanto/core/car" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/core/receipt" + "github.com/storacha/go-ucanto/core/receipt/ran" + "github.com/storacha/go-ucanto/core/result" + fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel" + "github.com/storacha/go-ucanto/principal" + + "github.com/storacha/etracker/internal/db/consolidated" + "github.com/storacha/etracker/internal/db/egress" +) + +var log = logging.Logger("consolidator") + +type Consolidator struct { + id principal.Signer + egressTable egress.EgressTable + consolidatedTable consolidated.ConsolidatedTable + httpClient *http.Client + interval time.Duration + batchSize int + stopCh chan struct{} +} + +func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator { + return &Consolidator{ + id: id, + egressTable: egressTable, + consolidatedTable: consolidatedTable, + httpClient: &http.Client{Timeout: 30 * time.Second}, + interval: interval, + batchSize: batchSize, + stopCh: make(chan struct{}), + } +} + +func (c *Consolidator) Start(ctx context.Context) { + ticker := time.NewTicker(c.interval) + + log.Infof("Consolidator started with interval: %v", c.interval) + + for { + select { + case <-ctx.Done(): + log.Info("Consolidator stopping due to context cancellation") + return + case <-c.stopCh: + log.Info("Consolidator stopping") + return + case <-ticker.C: + if err := c.Consolidate(ctx); err != nil { + log.Errorf("Consolidation error: %v", err) + } + } + } +} + +func (c *Consolidator) Stop() { + close(c.stopCh) +} + +func (c *Consolidator) Consolidate(ctx context.Context) error { + log.Info("Starting consolidation cycle") + + // Get unprocessed records + records, err := c.egressTable.GetUnprocessed(ctx, c.batchSize) + if err != nil { + return fmt.Errorf("fetching unprocessed records: %w", err) + } + + if len(records) == 0 { + log.Debug("No unprocessed records found") + return nil + } + + log.Infof("Processing %d unprocessed records", len(records)) + + // Process each record (each record represents a batch of receipts for a single node) + for _, record := range records { + // According to the spec, consolidation happens as a result of a `space/egress/consolidate` invocation. + // Since the service is invoking it on itself, we will generate it here. + // Is this acceptable or should we create and register a handler and follow the full go-ucanto flow instead? + inv, err := capegress.Consolidate.Invoke( + c.id, + c.id, + c.id.DID().String(), + capegress.ConsolidateCaveats{ + Cause: record.Cause, + }, + delegation.WithNoExpiration(), + ) + if err != nil { + log.Errorf("generating consolidation invocation: %w", err) + continue + } + + // Fetch receipts from the endpoint + receipts, err := c.fetchReceipts(ctx, record) + if err != nil { + log.Errorf("Failed to fetch receipts for record (nodeID=%s): %v", record.NodeID, err) + continue + } + + // Process each receipt in the batch + totalBytes := uint64(0) + for _, rcpt := range receipts { + retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType()) + if err != nil { + log.Warnf("receipt doesn't seem a retrieval receipt: %w", err) + continue + } + + if err := c.validateReceipt(retrievalRcpt); err != nil { + log.Warnf("Invalid receipt: %v", err) + continue + } + + size, err := c.extractSize(retrievalRcpt) + if err != nil { + log.Warnf("Failed to extract size from receipt: %v", err) + continue + } + + totalBytes += size + } + + // Store consolidated record (one per batch) + if err := c.consolidatedTable.Add(ctx, record.NodeID, record.Receipts, totalBytes); err != nil { + log.Errorf("Failed to add consolidated record for node %s, batch %s: %v", record.NodeID, record.Receipts, err) + continue + } + + // Issue the receipt for the consolidation operation + // TODO: store in the DB + _, err = receipt.Issue(c.id, result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{}), ran.FromInvocation(inv)) + if err != nil { + log.Errorf("Failed to issue consolidation receipt: %v", err) + continue + } + + log.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts) + } + + // Mark records as processed + if err := c.egressTable.MarkAsProcessed(ctx, records); err != nil { + return fmt.Errorf("marking records as processed: %w", err) + } + + log.Infof("Consolidation cycle completed. Processed %d records", len(records)) + + return nil +} + +func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRecord) ([]receipt.AnyReceipt, error) { + // Substitute {cid} in the endpoint URL with the receipts CID + batchURLStr := record.Endpoint + batchCID := record.Receipts.String() + + // Handle both {cid} and :cid patterns + batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCID) + batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCID) + + batchURL, err := url.Parse(batchURLStr) + if err != nil { + return nil, fmt.Errorf("parsing batch URL: %w", err) + } + + log.Debugf("Fetching receipts from %s", batchURL.String()) + + req, err := http.NewRequestWithContext(ctx, "GET", batchURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("creating HTTP request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("fetching receipts from %s: %w", batchURL.String(), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // a receipt batch is a flat CAR file where each block is an archived receipt + _, blks, err := car.Decode(resp.Body) + if err != nil { + return nil, fmt.Errorf("decoding receipt batch: %w", err) + } + defer resp.Body.Close() + + var rcpts []receipt.AnyReceipt + for blk, err := range blks { + if err != nil { + return nil, fmt.Errorf("iterating over receipt blocks: %w", err) + } + + rcpt, err := receipt.Extract(blk.Bytes()) + if err != nil { + return nil, fmt.Errorf("extracting receipt: %w", err) + } + + rcpts = append(rcpts, rcpt) + } + + return rcpts, nil +} + +func (c *Consolidator) validateReceipt(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) error { + _, x := result.Unwrap(retrievalRcpt.Out()) + var emptyFailure fdm.FailureModel + if x != emptyFailure { + return fmt.Errorf("receipt is a failure receipt") + } + + // TODO: do more validation here. + // At the very least that the invocation is a retrieval invocation and the audience is the node + + return nil +} + +func (c *Consolidator) extractSize(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) (uint64, error) { + _, x := result.Unwrap(retrievalRcpt.Out()) + var emptyFailure fdm.FailureModel + if x != emptyFailure { + return 0, fmt.Errorf("receipt is a failure receipt") + } + + inv, ok := retrievalRcpt.Ran().Invocation() + if !ok { + return 0, fmt.Errorf("expected the ran invocation to be attached to the receipt") + } + + caps := inv.Capabilities() + if len(caps) != 1 { + return 0, fmt.Errorf("expected exactly one capability in the invocation") + } + + cap := caps[0] + if cap.Can() != content.RetrieveAbility { + return 0, fmt.Errorf("original invocation is not a retrieval invocation, but a %s", cap.Can()) + } + + caveats, err := content.RetrieveCaveatsReader.Read(cap.Nb()) + if err != nil { + return 0, fmt.Errorf("reading caveats from invocation: %w", err) + } + + return caveats.Range.End - caveats.Range.Start + 1, nil +} diff --git a/internal/db/consolidated/consolidated.go b/internal/db/consolidated/consolidated.go new file mode 100644 index 0000000..6d68d68 --- /dev/null +++ b/internal/db/consolidated/consolidated.go @@ -0,0 +1,21 @@ +package consolidated + +import ( + "context" + + "github.com/storacha/go-ucanto/did" + "github.com/storacha/go-ucanto/ucan" +) + +type ConsolidatedRecord struct { + NodeDID did.DID + ReceiptsBatchCID ucan.Link + TotalBytes uint64 + ProcessedAt string +} + +type ConsolidatedTable interface { + Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error + Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) + GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) +} diff --git a/internal/db/consolidated/dynamodb.go b/internal/db/consolidated/dynamodb.go new file mode 100644 index 0000000..605ab45 --- /dev/null +++ b/internal/db/consolidated/dynamodb.go @@ -0,0 +1,126 @@ +package consolidated + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/storacha/go-ucanto/did" + "github.com/storacha/go-ucanto/ucan" +) + +var _ ConsolidatedTable = (*DynamoConsolidatedTable)(nil) + +type DynamoConsolidatedTable struct { + client *dynamodb.Client + tableName string +} + +func NewDynamoConsolidatedTable(client *dynamodb.Client, tableName string) *DynamoConsolidatedTable { + return &DynamoConsolidatedTable{client, tableName} +} + +type consolidatedRecord struct { + NodeDID string `dynamodbav:"NodeDID"` + ReceiptsBatchCID string `dynamodbav:"ReceiptsBatchCID"` + TotalBytes uint64 `dynamodbav:"TotalBytes"` + ProcessedAt string `dynamodbav:"ProcessedAt"` +} + +func (d *DynamoConsolidatedTable) Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error { + record := consolidatedRecord{ + NodeDID: nodeDID.String(), + ReceiptsBatchCID: receiptsBatchCID.String(), + TotalBytes: bytes, + ProcessedAt: time.Now().UTC().Format(time.RFC3339), + } + + item, err := attributevalue.MarshalMap(record) + if err != nil { + return fmt.Errorf("serializing consolidated record: %w", err) + } + + _, err = d.client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(d.tableName), + Item: item, + }) + if err != nil { + return fmt.Errorf("storing consolidated record: %w", err) + } + + return nil +} + +func (d *DynamoConsolidatedTable) Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) { + result, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + "NodeDID": &types.AttributeValueMemberS{Value: nodeDID.String()}, + "ReceiptsBatchCID": &types.AttributeValueMemberS{Value: receiptsBatchCID.String()}, + }, + }) + if err != nil { + return nil, fmt.Errorf("getting consolidated record: %w", err) + } + + if result.Item == nil { + return nil, fmt.Errorf("record not found") + } + + return d.unmarshalRecord(result.Item) +} + +func (d *DynamoConsolidatedTable) GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) { + result, err := d.client.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(d.tableName), + KeyConditionExpression: aws.String("NodeDID = :nodeDID"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":nodeDID": &types.AttributeValueMemberS{Value: nodeDID.String()}, + }, + }) + if err != nil { + return nil, fmt.Errorf("querying consolidated records by node: %w", err) + } + + records := make([]ConsolidatedRecord, 0, len(result.Items)) + for _, item := range result.Items { + record, err := d.unmarshalRecord(item) + if err != nil { + return nil, err + } + records = append(records, *record) + } + + return records, nil +} + +func (d *DynamoConsolidatedTable) unmarshalRecord(item map[string]types.AttributeValue) (*ConsolidatedRecord, error) { + var record consolidatedRecord + if err := attributevalue.UnmarshalMap(item, &record); err != nil { + return nil, fmt.Errorf("unmarshaling consolidated record: %w", err) + } + + parsedDID, err := did.Parse(record.NodeDID) + if err != nil { + return nil, fmt.Errorf("parsing node DID: %w", err) + } + + c, err := cid.Decode(record.ReceiptsBatchCID) + if err != nil { + return nil, fmt.Errorf("parsing receipts batch CID: %w", err) + } + receiptsBatchCID := cidlink.Link{Cid: c} + + return &ConsolidatedRecord{ + NodeDID: parsedDID, + ReceiptsBatchCID: receiptsBatchCID, + TotalBytes: record.TotalBytes, + ProcessedAt: record.ProcessedAt, + }, nil +} diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index cac400f..9fa3e7a 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -10,7 +10,10 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/google/uuid" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) @@ -40,6 +43,7 @@ type egressRecord struct { Endpoint string `dynamodbav:"endpoint"` Cause string `dynamodbav:"cause"` ReceivedAt string `dynamodbav:"receivedAt"` + Processed bool `dynamodbav:"proc"` } func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) egressRecord { @@ -49,15 +53,17 @@ func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan shard := rand.Intn(10) pk := fmt.Sprintf("%s#%d", dateStr, shard) sk := fmt.Sprintf("%s#%s#%s", dateStr, nodeID, uuid.New()) + endpointStr, _ := url.PathUnescape(endpoint.String()) return egressRecord{ PK: pk, SK: sk, NodeID: nodeID.String(), Receipts: receipts.String(), - Endpoint: endpoint.String(), + Endpoint: endpointStr, Cause: cause.String(), ReceivedAt: receivedAt.Format(time.RFC3339), + Processed: false, } } @@ -75,3 +81,93 @@ func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts } return nil } + +func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) { + // Scan all shards for the current date for unprocessed records + today := time.Now().UTC().Format("2006-01-02") + var allRecords []EgressRecord + + for shard := range 10 { + pk := fmt.Sprintf("%s#%d", today, shard) + + result, err := d.client.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(d.tableName), + KeyConditionExpression: aws.String("PK = :pk"), + FilterExpression: aws.String("proc = :false"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":pk": &types.AttributeValueMemberS{Value: pk}, + ":false": &types.AttributeValueMemberBOOL{Value: false}, + }, + Limit: aws.Int32(int32(limit)), + }) + if err != nil { + return nil, fmt.Errorf("querying unprocessed records for shard %d: %w", shard, err) + } + + for _, item := range result.Items { + var record egressRecord + if err := attributevalue.UnmarshalMap(item, &record); err != nil { + return nil, fmt.Errorf("unmarshaling egress record: %w", err) + } + + nodeID, err := did.Parse(record.NodeID) + if err != nil { + return nil, fmt.Errorf("parsing node DID: %w", err) + } + + c, err := cid.Decode(record.Receipts) + if err != nil { + return nil, fmt.Errorf("parsing receipts CID: %w", err) + } + receipts := cidlink.Link{Cid: c} + + receivedAt, err := time.Parse(time.RFC3339, record.ReceivedAt) + if err != nil { + return nil, fmt.Errorf("parsing received at time: %w", err) + } + + cause, err := cid.Decode(record.Cause) + if err != nil { + return nil, fmt.Errorf("parsing cause CID: %w", err) + } + causeLink := cidlink.Link{Cid: cause} + + allRecords = append(allRecords, EgressRecord{ + PK: record.PK, + SK: record.SK, + NodeID: nodeID, + Receipts: receipts, + Endpoint: record.Endpoint, + Cause: causeLink, + ReceivedAt: receivedAt, + Processed: record.Processed, + }) + + if len(allRecords) >= limit { + return allRecords, nil + } + } + } + + return allRecords, nil +} + +func (d *DynamoEgressTable) MarkAsProcessed(ctx context.Context, records []EgressRecord) error { + for _, record := range records { + _, err := d.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{Value: record.PK}, + "SK": &types.AttributeValueMemberS{Value: record.SK}, + }, + UpdateExpression: aws.String("SET proc = :true"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":true": &types.AttributeValueMemberBOOL{Value: true}, + }, + }) + if err != nil { + return fmt.Errorf("marking record as processed (PK=%s, SK=%s): %w", record.PK, record.SK, err) + } + } + return nil +} diff --git a/internal/db/egress/egress.go b/internal/db/egress/egress.go index 7ac3898..caac0f1 100644 --- a/internal/db/egress/egress.go +++ b/internal/db/egress/egress.go @@ -3,11 +3,25 @@ package egress import ( "context" "net/url" + "time" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) +type EgressRecord struct { + PK string + SK string + NodeID did.DID + Receipts ucan.Link + Endpoint string + Cause ucan.Link + ReceivedAt time.Time + Processed bool +} + type EgressTable interface { Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL, cause ucan.Link) error + GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) + MarkAsProcessed(ctx context.Context, records []EgressRecord) error } diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 858dddd..0336279 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -5,6 +5,8 @@ import ( "io" "net/http" + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/storacha/go-ucanto/principal/signer" ucanhttp "github.com/storacha/go-ucanto/transport/http" @@ -45,3 +47,19 @@ func (s *Server) ucanHandler() http.HandlerFunc { } } } + +func (s *Server) getReceiptsHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cidStr := r.PathValue("cid") + cid, err := cid.Decode(cidStr) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + _ = cidlink.Link{Cid: cid} + + // TODO: fetch receipt from DB + w.WriteHeader(http.StatusNotFound) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 628d182..6dd1db4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -39,6 +39,7 @@ func (s *Server) ListenAndServe(addr string) error { mux.HandleFunc("GET /", s.getRootHandler()) mux.HandleFunc("POST /", s.ucanHandler()) + mux.HandleFunc("GET /receipts/{cid}", s.getReceiptsHandler()) log.Infof("Listening on %s", addr) return http.ListenAndServe(addr, mux)