Skip to content

feat: egress records consolidation#5

Merged
volmedo merged 8 commits intomainfrom
vic/feat/consolidator-mvp
Oct 6, 2025
Merged

feat: egress records consolidation#5
volmedo merged 8 commits intomainfrom
vic/feat/consolidator-mvp

Conversation

@volmedo
Copy link
Member

@volmedo volmedo commented Oct 2, 2025

Ref: storacha/piri#174

Implement egress record consolidation.

A periodic task:

  • looks for unprocessed records in the DB
  • for each record, fetches a receipt batch from the receipt batch endpoint in the node
  • for each receipt in the batch, validates the receipt (more careful validation still to-do) and takes note of the bytes egressed
  • when all receipts in the batch have been processed, a consolidated record is stored with the total amount of bytes the node egressed in that batch

A new /receipts endpoint is offered for nodes to fetch space/egress/consolidate receipts so that they can check the results of consolidation against their own records.

Known limitations

  • space/egress/conclude invocations are supposed to be invoked on a consolidator service as per the spec. The consolidator service has been implemented as a sub-component of the etracker service, so the invocation is issued and executed by the etracker service itself. In this implementation, instead of setting up a new service method and do the invocation, we just create the invocation and generate the receipt when processing the records.
  • when processing receipts, those that fail are silently discarded. The spec states that this failures need to be communicated to nodes by including errors in the receipt. This needs to be implemented.
  • conclude receipts are not stored and the /receipts endpoint always returns 404. I'll add a new dynamoDB table to store receipts keyed by the corresponding invocation CID.

@volmedo volmedo self-assigned this Oct 2, 2025
@volmedo volmedo requested a review from alanshaw October 3, 2025 17:53
@volmedo volmedo marked this pull request as ready for review October 3, 2025 17:55
Copy link
Member

@frrist frrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff!

Main thing here I'd like to discuss more is the Consolidate routine marking records as processed when a failure occurs due to a transient failure while processing.

Comment on lines +229 to +230
// TODO: do more validation here.
// At the very least that the invocation is a retrieval invocation and the audience is the node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about storacha/RFC#68 here, I wonder how challenging it would be to perform filtering such that retrievals resulting from replication, or indexing could be excluded. Alternatively this responsibility could be pushed down to piri.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm, I think the billing service is the right place for that business logic, where rules about what we want or don't want to pay for could live.

But yeah, I'm not sure how easy it will be to differentiate them, because they are all retrieval requests in the end. An idea that comes to mind is keeping a white-list of actors that are not charged for egress/do not produce billable egress. Something like "if it was the indexing-service that requested the retrieval, then this receipt doesn't count. And if it was a storage node, it doesn't count either". That would require a registry of nodes in the network, but I think this is something we will need for other use case anyway.


func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) {
// Scan all shards for the current date for unprocessed records
today := time.Now().UTC().Format("2006-01-02")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything significant about this date?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surprisingly enough, that's a standard format string in Go. The way you tell the formatter where you want the year to be in the final string is by using the "2006" or "06" sequences (if you want the long or the short format), "01" is the month, "02" the day and so on. Take a look at the time.Format example in the docs to see how absolutely weird it is.

today := time.Now().UTC().Format("2006-01-02")
var allRecords []EgressRecord

for shard := range 10 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 10?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the AI said so. All of that will change, the current implementation comes from a moment when the spec was still WIP and access patterns were not clear. Please ignore that for now.

}
}

func (s *Server) getReceiptsHandler() http.HandlerFunc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does https://github.com/storacha/piri/blob/main/pkg/service/egresstracker/service.go#L247 call this? Just curious to make sure I understand the flow of things here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's the endpoint were nodes will be able to fetch consolidate receipts from, exactly in that call you linked

Comment on lines +157 to +160
// Mark records as processed
if err := c.egressTable.MarkAsProcessed(ctx, records); err != nil {
return fmt.Errorf("marking records as processed: %w", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all records really processed at the end of this method? e.g. say a Piri node is offline (due to an update or whatever) when fetchReceipts is called, the method fails, and we mark the record as processed anyways. I think we'll need to me more granular here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, proper error handling is not implemented yet

@volmedo
Copy link
Member Author

volmedo commented Oct 4, 2025

thanks for the input @frrist. The main goal of this PR is to implement the bare minimum functionality to allow the interaction with the nodes to "work", which means the receipts endpoint and the basic structure is there.

Consolidation needs more work, as does validation in the nodes. My aim is to get the basic flow working so that remaining changes are isolated to their own components.

So please review this as "MVP, almost WIP" code 🙂

@volmedo volmedo merged commit 35cc8d6 into main Oct 6, 2025
4 of 5 checks passed
@volmedo volmedo deleted the vic/feat/consolidator-mvp branch October 6, 2025 09:41
@frrist
Copy link
Member

frrist commented Oct 6, 2025

My approval on this is based on the work being an MVP/WIP. Let's not forget to file an issue for the point raised here: #5 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants