Skip to content

potatman/EventHorizon

Repository files navigation

EventHorizon

CI License: MIT

EventHorizon is a .NET framework for Event Sourcing and Event Streaming, providing a clean abstraction layer over multiple storage and streaming backends. Build event-driven applications with pluggable persistence (MongoDB, Elasticsearch, Apache Ignite, In-Memory) and messaging (Apache Pulsar, Kafka, In-Memory).

Table of Contents

Features

  • Event Sourcing β€” Snapshot and View stores with automatic event application
  • Event Streaming β€” Publish/subscribe with topic-based routing
  • CQRS β€” Commands, Events, Requests, and Responses as first-class citizens
  • Pluggable Backends β€” Swap storage and streaming providers without changing business logic
  • Aggregate Pattern β€” Built-in aggregate lifecycle management with locking
  • Middleware β€” Extensible pipeline for aggregate processing
  • Multi-Stream Subscriptions β€” Subscribe to multiple event streams in a single consumer
  • Migration Support β€” Built-in tooling for migrating between state schemas

Supported Platforms

.NET Version Support Level End of Support
.NET 10 βœ… LTS November 2028
.NET 9 βœ… STS May 2026
.NET 8 βœ… LTS November 2026

NuGet Packages

All packages are published to NuGet.org with the Cts. prefix.

Package Description
Cts.EventHorizon.Abstractions Core interfaces, models, and attributes
Cts.EventHorizon.EventStore Event store abstractions (CRUD stores, locks)
Cts.EventHorizon.EventStore.InMemory In-memory event store (great for testing)
Cts.EventHorizon.EventStore.MongoDb MongoDB-backed event store
Cts.EventHorizon.EventStore.ElasticSearch Elasticsearch-backed event store
Cts.EventHorizon.EventStore.Ignite Apache Ignite-backed event store
Cts.EventHorizon.EventStreaming Event streaming abstractions
Cts.EventHorizon.EventStreaming.InMemory In-memory streaming (great for testing)
Cts.EventHorizon.EventStreaming.Pulsar Apache Pulsar streaming provider
Cts.EventHorizon.EventSourcing Event sourcing orchestration (aggregates, senders, subscriptions)

Quick Start

1. Install packages

# Core + In-Memory (for getting started / testing)
dotnet add package Cts.EventHorizon.EventSourcing
dotnet add package Cts.EventHorizon.EventStore.InMemory
dotnet add package Cts.EventHorizon.EventStreaming.InMemory

2. Define your state

using EventHorizon.Abstractions.Attributes;
using EventHorizon.Abstractions.Interfaces;
using EventHorizon.Abstractions.Interfaces.Actions;
using EventHorizon.Abstractions.Interfaces.Handlers;

[SnapshotStore("my_app_accounts")]
[Stream("$type")]
public class Account : IState,
    IHandleCommand<CreateAccount>,
    IApplyEvent<AccountCreated>
{
    public string Id { get; set; }
    public string Name { get; set; }
    public int Balance { get; set; }

    public void Handle(CreateAccount command, AggregateContext context)
    {
        context.AddEvent(new AccountCreated(command.Name, command.InitialBalance));
    }

    public void Apply(AccountCreated @event)
    {
        Name = @event.Name;
        Balance = @event.Balance;
    }
}

3. Define actions

using EventHorizon.Abstractions.Interfaces.Actions;

public record CreateAccount(string Name, int InitialBalance) : ICommand<Account>;
public record AccountCreated(string Name, int Balance) : IEvent<Account>;

4. Register services

using EventHorizon.Abstractions.Extensions;
using EventHorizon.EventSourcing.Extensions;
using EventHorizon.EventStore.InMemory.Extensions;
using EventHorizon.EventStreaming.InMemory.Extensions;

services.AddEventHorizon(x =>
{
    x.AddEventSourcing()
        .AddInMemorySnapshotStore()
        .AddInMemoryViewStore()
        .AddInMemoryEventStream()
        .ApplyCommandsToSnapshot<Account>();
});

5. Use the client

using EventHorizon.EventSourcing;

public class AccountService
{
    private readonly EventSourcingClient<Account> _client;

    public AccountService(EventSourcingClient<Account> client)
    {
        _client = client;
    }

    public async Task CreateAccountAsync(string name, int balance)
    {
        await _client.CreateSender()
            .Send(new CreateAccount(name, balance))
            .ExecuteAsync();
    }
}

Core Concepts

State (IState)

The root entity that represents the current state of your domain object. Must implement IState with an Id property.

Actions

EventHorizon uses a CQRS-style action hierarchy:

Action Interface Purpose
Command ICommand<T> Mutates state β€” handled by IHandleCommand<T> on the state class
Event IEvent<T> Records what happened β€” applied by IApplyEvent<T> on the state class
Request IRequest<T, TResponse> Query or operation that returns a response
Response IResponse<T> Result of a request

Snapshots and Views

  • Snapshot (Snapshot<T>) β€” The authoritative persisted state, rebuilt by replaying events
  • View (View<T>) β€” A read-optimized projection derived from events, can combine data from multiple streams

Aggregates

The AggregateBuilder manages the lifecycle of loading state from a store, applying actions, and persisting results. It handles optimistic concurrency via sequence IDs and distributed locking.

Subscriptions

SubscriptionBuilder<T> creates durable consumers that process messages from one or more streams. Implement IStreamConsumer<T> to handle batches of messages:

public class MyConsumer : IStreamConsumer<Event>
{
    public Task OnBatch(SubscriptionContext<Event> context)
    {
        foreach (var message in context.Messages)
        {
            var payload = message.Data.GetPayload();
            // Process event...
        }
        return Task.CompletedTask;
    }
}

Register with:

x.AddSubscription<MyConsumer, Event>(s => s.AddStream<Account>());

Middleware

Aggregate processing supports middleware for cross-cutting concerns:

x.ApplyEventsToView<MyView>(h => h.UseMiddleware<MyMiddleware>());

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Your Application                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚ EventSourcingClientβ”‚  β”‚  StreamingClient  β”‚               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           β”‚   EventHorizon Core  β”‚                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚  AggregateBuilder β”‚  β”‚ SubscriptionBuilderβ”‚               β”‚
β”‚  β”‚  SenderBuilder    β”‚  β”‚ PublisherBuilder   β”‚               β”‚
β”‚  β”‚  ICrudStore<T>    β”‚  β”‚ ReaderBuilder      β”‚               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚   Event Stores    β”‚  β”‚ Event Streaming   β”‚                β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                β”‚
β”‚  β”‚  β”‚  MongoDB     β”‚  β”‚  β”‚ β”‚  Pulsar     β”‚  β”‚                β”‚
β”‚  β”‚  β”‚  Elastic     β”‚  β”‚  β”‚ β”‚  Kafka      β”‚  β”‚                β”‚
β”‚  β”‚  β”‚  Ignite      β”‚  β”‚  β”‚ β”‚  In-Memory  β”‚  β”‚                β”‚
β”‚  β”‚  β”‚  In-Memory   β”‚  β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Storage Backends

MongoDB

x.AddMongoDbSnapshotStore(config.GetSection("MongoDb").Bind)
 .AddMongoDbViewStore(config.GetSection("MongoDb").Bind);
{
  "MongoDb": {
    "ConnectionString": "mongodb://localhost:27017",
    "Database": "my_database"
  }
}

Elasticsearch

x.AddElasticSnapshotStore(config.GetSection("ElasticSearch").Bind)
 .AddElasticViewStore(config.GetSection("ElasticSearch").Bind);
{
  "ElasticSearch": {
    "Uri": "http://localhost:9200"
  }
}

Apache Ignite

x.AddIgniteSnapshotStore(config.GetSection("Ignite").Bind)
 .AddIgniteViewStore(config.GetSection("Ignite").Bind);

In-Memory

x.AddInMemorySnapshotStore()
 .AddInMemoryViewStore();

Best suited for unit/integration testing. No external dependencies required.

Streaming Backends

Apache Pulsar

x.AddPulsarEventStream(config.GetSection("Pulsar").Bind);
{
  "Pulsar": {
    "ServiceUrl": "pulsar://localhost:6650"
  }
}

In-Memory

x.AddInMemoryEventStream();

Best suited for unit/integration testing. No external dependencies required.

Configuration

Attributes

Attribute Target Purpose
[SnapshotStore("bucket_id")] Class Configures the snapshot store bucket/collection name
[ViewStore("database")] Class Configures the view store database/index name
[Stream("topic")] Class Maps a type to a streaming topic
[StreamPartitionKey] Property Designates the property used for stream partitioning

Docker Compose

Development infrastructure is provided in the compose/ directory:

# Start MongoDB
docker compose -f compose/MongoDb/docker-compose.yml up -d

# Start Elasticsearch
docker compose -f compose/ElasticSearch/docker-compose.yml up -d

# Start Pulsar
docker compose -f compose/Pulsar/docker-compose.yml up -d

# Start Ignite
docker compose -f compose/Ignite/docker-compose.yml up -d

Testing

The test suite uses xUnit with Bogus for data generation.

# Run unit tests only
dotnet test --filter "Category!=Integration"

# Run all tests (requires Docker services)
dotnet test

Writing Tests

Use the in-memory providers for fast, isolated unit tests:

services.AddEventHorizon(x =>
{
    x.AddInMemorySnapshotStore()
     .AddInMemoryViewStore()
     .AddInMemoryEventStream()
     .AddEventSourcing();
});

Integration tests use [Collection("Integration")] and require running Docker Compose services.

CI/CD

This project uses GitHub Actions (.github/workflows/ci.yml) with GitVersion for automatic semantic versioning based on the GitFlow branching model.

Versions are derived from git history and tags β€” no manual version bumping required after initial setup.

Branch/Tag Pre-release Label Example Version
v* tag (stable) 1.3.0
master / main (stable) 1.3.0
release/* rc 1.3.0-rc.3
hotfix/* hf 1.3.1-hf.1
develop preview 1.4.0-preview.12
feature/* {branch} 1.4.0-my-feature.1

How versioning works

  • Tag a release on main/master (e.g., v1.3.0) to set the version baseline
  • All subsequent commits on branches derive their version from git tags and merge history
  • Commit messages with +semver: major, +semver: minor, or +semver: fix control version increments
  • Configuration lives in GitVersion.yml at the repo root

All packages are published with the Cts.* prefix (e.g., Cts.EventHorizon.Abstractions).

Trusted Publishing

NuGet packages are published using trusted publishing via GitHub's OIDC tokens β€” no API keys or secrets required. The trusted publisher is configured on nuget.org to trust the ci.yml workflow in this repository.

Samples

Working examples are in the samples/ directory:

  • EventHorizon.EventSourcing.Samples β€” Full event sourcing example with accounts, commands, events, views, and subscriptions using MongoDB + Elasticsearch + Pulsar
  • EventHorizon.EventStreaming.Samples β€” Standalone streaming example with multi-topic subscription and publishing

Run samples with:

# Start required infrastructure
docker compose -f compose/MongoDb/docker-compose.yml up -d
docker compose -f compose/ElasticSearch/docker-compose.yml up -d
docker compose -f compose/Pulsar/docker-compose.yml up -d

# Run the event sourcing sample
dotnet run --project samples/EventHorizon.EventSourcing.Samples

Project Structure

EventHorizon/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ EventHorizon.Abstractions/          # Core interfaces, models, attributes
β”‚   β”œβ”€β”€ EventHorizon.EventStore/            # Store abstractions (ICrudStore, Lock)
β”‚   β”œβ”€β”€ EventHorizon.EventStore.InMemory/   # In-memory store implementation
β”‚   β”œβ”€β”€ EventHorizon.EventStore.MongoDb/    # MongoDB store implementation
β”‚   β”œβ”€β”€ EventHorizon.EventStore.ElasticSearch/ # Elasticsearch store implementation
β”‚   β”œβ”€β”€ EventHorizon.EventStore.Ignite/     # Apache Ignite store implementation
β”‚   β”œβ”€β”€ EventHorizon.EventStreaming/        # Streaming abstractions
β”‚   β”œβ”€β”€ EventHorizon.EventStreaming.InMemory/ # In-memory streaming
β”‚   β”œβ”€β”€ EventHorizon.EventStreaming.Pulsar/ # Apache Pulsar streaming
β”‚   └── EventHorizon.EventSourcing/        # Event sourcing orchestration
β”œβ”€β”€ test/                                   # Unit and integration tests
β”œβ”€β”€ samples/                                # Working example applications
β”œβ”€β”€ benchmark/                              # Performance benchmarks
β”œβ”€β”€ compose/                                # Docker Compose files for local dev
└── charts/                                 # Helm charts for Kubernetes deployment

Contributing

  1. Fork the repository
  2. Create a feature branch (feature/my-feature)
  3. Commit changes with clear messages
  4. Open a pull request against develop

License

This project is licensed under the MIT License.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 6