Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .storoku.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
],
"hashKey": "PK",
"rangeKey": "SK"
},
{
"name": "consolidated-records",
"attributes": [
{
"name": "NodeDID",
"type": "S"
},
{
"name": "ReceiptsBatchCID",
"type": "S"
}
],
"hashKey": "NodeDID",
"rangeKey": "ReceiptsBatchCID"
}
],
"networks": [
Expand Down
76 changes: 73 additions & 3 deletions cmd/etracker/start.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/spf13/cobra"
Expand All @@ -11,6 +16,8 @@ import (
"github.com/storacha/go-ucanto/principal/signer"

"github.com/storacha/etracker/internal/config"
"github.com/storacha/etracker/internal/consolidator"
"github.com/storacha/etracker/internal/db/consolidated"
"github.com/storacha/etracker/internal/db/egress"
"github.com/storacha/etracker/internal/server"
"github.com/storacha/etracker/internal/service"
Expand Down Expand Up @@ -53,10 +60,35 @@ func init() {
cobra.CheckErr(viper.BindPFlag("egress_table_name", startCmd.Flags().Lookup("egress-table-name")))
// bind flag to storoku-style environment variable
cobra.CheckErr(viper.BindEnv("egress_table_name", "EGRESS_RECORDS_TABLE_ID"))

startCmd.Flags().String(
"consolidated-table-name",
"",
"Name of the DynamoDB table to use for consolidated records",
)
cobra.CheckErr(viper.BindPFlag("consolidated_table_name", startCmd.Flags().Lookup("consolidated-table-name")))
cobra.CheckErr(viper.BindEnv("consolidated_table_name", "CONSOLIDATED_RECORDS_TABLE_ID"))

startCmd.Flags().Int(
"consolidation-interval",
12*60*60,
"Interval in seconds between consolidation runs",
)
cobra.CheckErr(viper.BindPFlag("consolidation_interval", startCmd.Flags().Lookup("consolidation-interval")))

startCmd.Flags().Int(
"consolidation-batch-size",
100,
"Number of records to process in each consolidation batch",
)
cobra.CheckErr(viper.BindPFlag("consolidation_batch_size", startCmd.Flags().Lookup("consolidation-batch-size")))
}

func startService(cmd *cobra.Command, args []string) error {
cfg, err := config.Load(cmd.Context())
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

cfg, err := config.Load(ctx)
if err != nil {
return fmt.Errorf("loading config: %w", err)
}
Expand All @@ -77,15 +109,53 @@ func startService(cmd *cobra.Command, args []string) error {
}
}

svc, err := service.New(id, egress.NewDynamoEgressTable(dynamodb.NewFromConfig(cfg.AWSConfig), cfg.EgressTableName))
// Create DynamoDB client
dynamoClient := dynamodb.NewFromConfig(cfg.AWSConfig)

// Create database tables
egressTable := egress.NewDynamoEgressTable(dynamoClient, cfg.EgressTableName)
consolidatedTable := consolidated.NewDynamoConsolidatedTable(dynamoClient, cfg.ConsolidatedTableName)

// Create service
svc, err := service.New(id, egressTable)
if err != nil {
return fmt.Errorf("creating service: %w", err)
}

// Create server
server, err := server.New(id, svc)
if err != nil {
return fmt.Errorf("creating server: %w", err)
}

return server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port))
// Create and start consolidator
interval := time.Duration(cfg.ConsolidationInterval) * time.Second
batchSize := cfg.ConsolidationBatchSize

cons := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize)

// Start consolidator in a goroutine
go cons.Start(ctx)

// Handle graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

errCh := make(chan error, 1)
go func() {
log.Infof("Starting server on port %d", cfg.Port)
errCh <- server.ListenAndServe(fmt.Sprintf(":%d", cfg.Port))
}()

select {
case err := <-errCh:
log.Errorf("Server error: %v", err)
cons.Stop()
return err
case sig := <-sigCh:
log.Infof("Received signal %v, shutting down gracefully", sig)
cons.Stop()
cancel()
return nil
}
}
15 changes: 15 additions & 0 deletions deploy/app/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ module "app" {
hash_key = "PK"
range_key ="SK"
},
{
name = "consolidated-records"
attributes = [
{
name = "NodeDID"
type = "S"
},
{
name = "ReceiptsBatchCID"
type = "S"
},
]
hash_key = "NodeDID"
range_key = "ReceiptsBatchCID"
},
]
buckets = [
]
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ require (
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.47.0
github.com/go-playground/validator/v10 v10.27.0
github.com/google/uuid v1.6.0
github.com/ipfs/go-cid v0.5.0
github.com/ipfs/go-log/v2 v2.7.0
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e
github.com/spf13/cobra v1.2.1
github.com/spf13/viper v1.8.1
github.com/storacha/go-libstoracha v0.2.7
Expand Down Expand Up @@ -50,7 +52,6 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/ipfs/go-datastore v0.8.2 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
Expand All @@ -65,7 +66,6 @@ require (
github.com/ipfs/go-verifcid v0.0.3 // indirect
github.com/ipld/go-car v0.6.2 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down
13 changes: 8 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
)

type Config struct {
Port int `mapstructure:"port" validate:"required,min=1,max=65535"`
PrivateKey string `mapstructure:"private_key" validate:"required"`
DID string `mapstructure:"did" validate:"startswith=did:web:"`
AWSConfig aws.Config `mapstructure:"aws_config"`
EgressTableName string `mapstructure:"egress_table_name" validate:"required"`
Port int `mapstructure:"port" validate:"required,min=1,max=65535"`
PrivateKey string `mapstructure:"private_key" validate:"required"`
DID string `mapstructure:"did" validate:"startswith=did:web:"`
AWSConfig aws.Config `mapstructure:"aws_config"`
EgressTableName string `mapstructure:"egress_table_name" validate:"required"`
ConsolidatedTableName string `mapstructure:"consolidated_table_name" validate:"required"`
ConsolidationInterval int `mapstructure:"consolidation_interval" validate:"min=300"`
ConsolidationBatchSize int `mapstructure:"consolidation_batch_size" validate:"min=1"`
}

func Load(ctx context.Context) (*Config, error) {
Expand Down
Loading
Loading