Skip to content

rbaliyan/event

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

173 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Event v3

CI Go Reference Go Report Card Release License: MIT OpenSSF Scorecard

A production-grade event pub-sub library for Go with support for distributed event handling, exactly-once semantics, and multiple transports. Comparable to MassTransit (.NET), Axon (Java), and Spring Cloud Stream.

Features

Core

  • Type-Safe Generics: Event[T] ensures compile-time type safety
  • Multiple Transports: Channel (in-memory), Redis Streams, NATS JetStream, Kafka
  • Fire-and-Forget API: Publish() and Subscribe() are void - events are facts
  • Delivery Modes: Broadcast (fan-out) or WorkerPool (load balancing)

Reliability

  • Transactional Outbox: Atomic publish with database writes (PostgreSQL, MongoDB, Redis)
  • Idempotency: Prevent duplicate processing (Redis, PostgreSQL, in-memory)
  • Poison Detection: Auto-quarantine repeatedly failing messages
  • At-Least-Once Delivery: Via Redis Streams, NATS, or Kafka

Advanced

  • Circuit Breaker: Failure isolation pattern
  • Schema Registry: Publisher-defined event configuration with subscriber auto-sync
  • Backoff Strategies: Exponential, linear, constant with jitter support

Observability

  • OpenTelemetry Tracing: Distributed tracing across services
  • OpenTelemetry Metrics: Out-of-the-box monitoring
  • Health Checks: Transport health and consumer lag monitoring
  • Event Monitoring: Track event processing status, duration, and errors

Ecosystem

The event library is part of a larger ecosystem of packages:

Package Description Install
event Core event bus with transports go get github.com/rbaliyan/event/v3
event-mongodb MongoDB Change Stream transport (CDC) go get github.com/rbaliyan/event-mongodb
event-dlq Dead Letter Queue management go get github.com/rbaliyan/event-dlq
event-scheduler Delayed/scheduled message delivery go get github.com/rbaliyan/event-scheduler
event-extras Rate limiting and saga orchestration go get github.com/rbaliyan/event-extras

All packages share consistent patterns:

  • Functional options for configuration
  • Health checks via health.Checker interface
  • OpenTelemetry metrics support
  • Multiple backend implementations (PostgreSQL, MongoDB, Redis)

Installation

go get github.com/rbaliyan/event/v3

Quick Start

Basic Usage with Type Safety

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/rbaliyan/event/v3"
    "github.com/rbaliyan/event/v3/transport/channel"
)

type Order struct {
    ID     string
    Amount float64
}

func main() {
    ctx := context.Background()

    // Create a bus with channel transport
    bus, err := event.NewBus("my-app", event.WithTransport(channel.New()))
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close(ctx)

    // Create and register a type-safe event
    orderEvent := event.New[Order]("order.created")
    if err := event.Register(ctx, bus, orderEvent); err != nil {
        log.Fatal(err)
    }

    // Subscribe with type-safe handler
    orderEvent.Subscribe(ctx, func(ctx context.Context, e event.Event[Order], order Order) error {
        fmt.Printf("Order received: %s, Amount: $%.2f\n", order.ID, order.Amount)
        return nil
    })

    // Publish (fire-and-forget)
    orderEvent.Publish(ctx, Order{ID: "ORD-123", Amount: 99.99})
}

Transports

Redis Streams (Recommended for Production)

Redis Streams provides at-least-once delivery with consumer groups:

import (
    "github.com/rbaliyan/event/v3"
    "github.com/rbaliyan/event/v3/transport/redis"
    redisclient "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()

    rdb := redisclient.NewClient(&redisclient.Options{
        Addr: "localhost:6379",
    })

    transport, _ := redis.New(rdb,
        redis.WithConsumerGroup("order-service"),
        redis.WithMaxLen(10000),
        redis.WithMaxAge(24*time.Hour),
        redis.WithClaimInterval(30*time.Second, time.Minute),
    )

    bus, _ := event.NewBus("order-service", event.WithTransport(transport))
    defer bus.Close(ctx)
}

NATS JetStream

For durable messaging with native broker features:

import (
    "github.com/rbaliyan/event/v3/transport/nats"
    natsgo "github.com/nats-io/nats.go"
)

func main() {
    ctx := context.Background()

    nc, _ := natsgo.Connect("nats://localhost:4222")
    js, _ := nc.JetStream()

    transport, _ := nats.NewJetStream(js,
        nats.WithStreamName("ORDERS"),
        nats.WithDeduplication(time.Hour),
        nats.WithMaxDeliver(5),
        nats.WithAckWait(30*time.Second),
    )

    bus, _ := event.NewBus("my-app", event.WithTransport(transport))
    defer bus.Close(ctx)
}

Kafka

Kafka with native dead letter topic (DLT) support:

import (
    "github.com/rbaliyan/event/v3/transport/kafka"
    "github.com/IBM/sarama"
)

func main() {
    ctx := context.Background()

    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false

    transport, _ := kafka.New(
        []string{"localhost:9092"},
        config,
        kafka.WithConsumerGroup("order-service"),
        kafka.WithDeadLetterTopic("orders.dlq"),
        kafka.WithMaxRetries(3),
    )

    bus, _ := event.NewBus("my-app", event.WithTransport(transport))
    defer bus.Close(ctx)
}

MongoDB Change Streams (CDC)

For Change Data Capture scenarios, use the separate event-mongodb package:

import (
    "github.com/rbaliyan/event/v3"
    mongodb "github.com/rbaliyan/event-mongodb"
    "go.mongodb.org/mongo-driver/v2/mongo"
)

func main() {
    ctx := context.Background()

    client, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
    db := client.Database("myapp")

    // Watch a specific collection
    transport := mongodb.New(db,
        mongodb.WithCollection("orders"),
        mongodb.WithFullDocument(mongodb.FullDocumentUpdateLookup),
        mongodb.WithResumeTokenStore(mongodb.NewMongoResumeTokenStore(db)),
    )

    bus, _ := event.NewBus("order-watcher", event.WithTransport(transport))
    defer bus.Close(ctx)

    // Subscribe to changes
    changes := event.New[mongodb.ChangeEvent]("db-changes")
    event.Register(ctx, bus, changes)

    changes.Subscribe(ctx, func(ctx context.Context, e event.Event[mongodb.ChangeEvent], change mongodb.ChangeEvent) error {
        fmt.Printf("Change: %s on %s.%s\n", change.OperationType, change.Database, change.Collection)
        return nil
    })

    // Publishing via Bus is NOT supported - write directly to MongoDB
    // ordersCol.InsertOne(ctx, order) triggers the subscriber
}

Note: MongoDB transport is subscribe-only (CDC). Publishing happens via direct MongoDB writes.

Transport Feature Comparison

Feature Redis Streams NATS JetStream Kafka MongoDB CDC
Persistence
At-Least-Once
Consumer Groups ❌ (Broadcast)
Native Deduplication
Native DLQ/DLT
Publish Support
WorkerPool Mode via distributed*

* MongoDB CDC supports WorkerPool mode through the distributed package, which emulates worker semantics using database atomic state transitions. See Distributed WorkerPool.

Health Checks

All transports and stores implement the health.Checker interface:

import "github.com/rbaliyan/event/v3/health"

// Check transport health
result := transport.Health(ctx)
fmt.Printf("Status: %s, Latency: %v\n", result.Status, result.Latency)

// Check all components
results := health.CheckAll(ctx,
    health.Named("transport", transport),
    health.Named("idempotency", idempStore),
    health.Named("monitor", monitorStore),
)

for name, result := range results {
    fmt.Printf("%s: %s\n", name, result.Status)
}

Health status levels:

  • StatusHealthy - Component is fully operational
  • StatusDegraded - Component is operational but has issues (e.g., high latency, pending messages)
  • StatusUnhealthy - Component is not operational

Backoff Strategies

Configure retry behavior with pluggable backoff strategies:

import "github.com/rbaliyan/event/v3/backoff"

// Exponential backoff (recommended)
strategy := &backoff.Exponential{
    Initial:    100 * time.Millisecond,
    Multiplier: 2.0,
    Max:        30 * time.Second,
    Jitter:     0.1, // 10% randomization
}

// Linear backoff
strategy := &backoff.Linear{
    Initial:   100 * time.Millisecond,
    Increment: 100 * time.Millisecond,
    Max:       5 * time.Second,
}

// Constant delay
strategy := &backoff.Constant{
    Delay: 500 * time.Millisecond,
}

// Use with event options
orderEvent := event.New[Order]("order.created",
    event.WithBackoff(strategy),
    event.WithMaxRetries(5),
)

Delivery Modes

Broadcast (Default)

All subscribers receive every message:

orderEvent.Subscribe(ctx, notifyWarehouse, event.AsBroadcast[Order]())
orderEvent.Subscribe(ctx, notifyShipping, event.AsBroadcast[Order]())
// Both handlers receive every order

Worker Pool

Only one subscriber receives each message (load balancing):

orderEvent.Subscribe(ctx, processOrder, event.AsWorker[Order]())
orderEvent.Subscribe(ctx, processOrder, event.AsWorker[Order]())
// Each order processed by exactly one worker

Worker Groups

Multiple groups, each receiving all messages. Workers within a group compete:

// Group A: Order processors (3 workers compete)
orderEvent.Subscribe(ctx, processOrder,
    event.AsWorker[Order](),
    event.WithWorkerGroup[Order]("order-processors"))

// Group B: Analytics (2 workers compete)
orderEvent.Subscribe(ctx, trackAnalytics,
    event.AsWorker[Order](),
    event.WithWorkerGroup[Order]("analytics"))

// Each order goes to 1 processor AND 1 analytics worker

Distributed WorkerPool

The distributed package enables WorkerPool semantics on Broadcast-only transports (like MongoDB Change Streams) using database atomic state transitions. Only one worker processes each message, with automatic failover and payload recovery.

Basic Usage

import "github.com/rbaliyan/event/v3/distributed"

// Create a coordinator (Redis for distributed deployments)
coord := distributed.NewRedisStateManager(redisClient,
    distributed.WithCompletedTTL(48*time.Hour),
)

// Subscribe with WorkerPool emulation
mongoEvent.Subscribe(ctx, handler,
    event.WithMiddleware(
        distributed.WorkerPoolMiddleware[Order](coord, 5*time.Minute),
    ),
)

Payload Recovery

For transports without re-delivery (e.g., MongoDB Change Streams), the middleware automatically stores message payload so the RecoveryRunner can re-publish stale events if a worker crashes:

coord := distributed.NewMongoStateManager(db)

// RecoveryRunner detects PayloadStore capability automatically
runner := distributed.NewRecoveryRunner(coord,
    distributed.WithPublisher(bus),     // enables re-publishing
    distributed.WithStaleTimeout(2*time.Minute),
    distributed.WithCheckInterval(30*time.Second),
)

go runner.Run(ctx)

Recovery is two-phase:

  1. Re-publish: Stale entries with stored payload are re-published via the bus with a new event ID
  2. Reset: Remaining stale entries (no payload) are reset for reacquisition

Worker Groups

Use separate coordinators with different prefixes per group:

smA := distributed.NewRedisStateManager(redis, distributed.WithPrefix("processors:"))
smB := distributed.NewRedisStateManager(redis, distributed.WithPrefix("analytics:"))

orderEvent.Subscribe(ctx, processOrder,
    event.WithMiddleware(distributed.WorkerPoolMiddleware[Order](smA, ttl)))
orderEvent.Subscribe(ctx, collectAnalytics,
    event.WithMiddleware(distributed.WorkerPoolMiddleware[Order](smB, ttl)))

Coordinator Backends

Backend Package Use Case
Redis distributed.NewRedisStateManager Distributed deployments (recommended)
MongoDB distributed.NewMongoStateManager When MongoDB is already your primary store
Memory distributed.NewMemoryStateManager Single-instance or testing

All three backends implement both Coordinator and PayloadStore interfaces.

Worker Observability

Query active and completed worker states using the WorkerStore interface (implemented by MongoStateManager and MemoryStateManager):

page, _ := sm.ListWorkers(ctx, distributed.WorkerFilter{
    Status: []distributed.WorkerState{distributed.WorkerStateProcessing},
    Limit:  100,
})

count, _ := sm.CountWorkers(ctx, distributed.WorkerFilter{
    StaleTimeout: 5 * time.Minute,
})

Note: RedisStateManager does not implement WorkerStore due to Redis SCAN's O(N) cost.

Transactional Outbox Pattern

Ensure atomic publish with database writes:

import (
    "github.com/rbaliyan/event/v3"
    "github.com/rbaliyan/event/v3/outbox"
)

func main() {
    ctx := context.Background()

    store := outbox.NewMongoStore(mongoClient.Database("myapp"))

    bus, _ := event.NewBus("order-service",
        event.WithTransport(transport),
        event.WithOutbox(store),
    )
    defer bus.Close(ctx)

    orderEvent := event.New[Order]("order.created")
    event.Register(ctx, bus, orderEvent)

    // Normal publish - goes directly to transport
    orderEvent.Publish(ctx, Order{ID: "123"})

    // Inside transaction - automatically routes to outbox
    err := outbox.Transaction(ctx, mongoClient, func(ctx context.Context) error {
        _, err := ordersCol.InsertOne(ctx, order)
        if err != nil {
            return err
        }
        return orderEvent.Publish(ctx, order) // Goes to outbox
    })

    // Start relay to publish from outbox to transport
    relay := outbox.NewMongoRelay(store, transport)
    go relay.Start(ctx)
}

Idempotency

Prevent duplicate message processing:

import "github.com/rbaliyan/event/v3/idempotency"

store := idempotency.NewRedisStore(redisClient, time.Hour)

bus, _ := event.NewBus("order-service",
    event.WithTransport(transport),
    event.WithBusIdempotency(store),
)

// All subscribers automatically get deduplication
orderEvent.Subscribe(ctx, func(ctx context.Context, e event.Event[Order], order Order) error {
    return processOrder(ctx, order) // Duplicates automatically skipped
})

Poison Message Detection

Auto-quarantine messages that keep failing:

import "github.com/rbaliyan/event/v3/poison"

store := poison.NewRedisStore(redisClient)
detector := poison.NewDetector(store,
    poison.WithThreshold(5),
    poison.WithQuarantineTime(time.Hour),
)

bus, _ := event.NewBus("order-service",
    event.WithTransport(transport),
    event.WithBusPoisonDetection(detector),
)

// Messages failing 5+ times are automatically quarantined
orderEvent.Subscribe(ctx, processOrder)

// Release a message from quarantine
detector.Release(ctx, messageID)

Event Monitoring

Track event processing status, duration, and errors:

import "github.com/rbaliyan/event/v3/monitor"

store := monitor.NewPostgresStore(db)

bus, _ := event.NewBus("order-service",
    event.WithTransport(transport),
    event.WithMonitor(store),
)

// Query monitoring data
entries, _ := store.List(ctx, monitor.Filter{
    Status:    []monitor.Status{monitor.StatusFailed},
    StartTime: time.Now().Add(-time.Hour),
    Limit:     100,
})

for _, entry := range entries {
    fmt.Printf("Event %s: %s (duration: %v)\n",
        entry.EventID, entry.Status, entry.Duration)
}

Monitor HTTP API

import monitorhttp "github.com/rbaliyan/event/v3/monitor/http"

handler := monitorhttp.New(store)
http.Handle("/", handler)
http.ListenAndServe(":8080", nil)

Endpoints:

  • GET /v1/monitor/entries - List entries with filters
  • GET /v1/monitor/entries/{event_id} - Get entries for an event
  • DELETE /v1/monitor/entries?older_than=1h - Delete old entries

Worker Pool State (HTTP)

When using distributed worker pools with MongoDB, expose worker state via the monitor HTTP handler:

handler := monitorhttp.New(store, monitorhttp.WithWorkerStore(sm))

Endpoints:

  • GET /v1/workers - List workers (filters: status, event_name, stale_timeout, cursor, limit)
  • GET /v1/workers/{message_id} - Get single worker
  • GET /v1/workers/count - Count workers matching filter

Schema Registry

Define event configuration centrally:

import "github.com/rbaliyan/event/v3/schema"

provider := schema.NewPostgresProvider(db, nil)
defer provider.Close()

bus, _ := event.NewBus("order-service",
    event.WithTransport(transport),
    event.WithSchemaProvider(provider),
    event.WithIdempotency(idempStore),
    event.WithMonitor(monitorStore),
)

// Publisher: Define schema
provider.Set(ctx, &schema.EventSchema{
    Name:              "order.created",
    Version:           1,
    SubTimeout:        30 * time.Second,
    MaxRetries:        3,
    EnableMonitor:     true,
    EnableIdempotency: true,
})

// Subscriber: Schema auto-loaded on Register()
orderEvent := event.New[Order]("order.created")
event.Register(ctx, bus, orderEvent) // Loads schema automatically

Error Handling

Use semantic error types to control message acknowledgment:

orderEvent.Subscribe(ctx, func(ctx context.Context, e event.Event[Order], order Order) error {
    err := processOrder(ctx, order)

    switch {
    case err == nil:
        return nil // ACK - message processed

    case errors.Is(err, ErrTemporary):
        return event.ErrNack // NACK - retry immediately

    case errors.Is(err, ErrTransient):
        return event.ErrDefer // NACK - retry with backoff

    case errors.Is(err, ErrPermanent):
        return event.ErrReject // ACK + send to DLQ

    default:
        return event.ErrDefer.Wrap(err) // Default: retry with backoff
    }
})

Dead Letter Queue

Use event-dlq for failed message management:

import dlq "github.com/rbaliyan/event-dlq"

store := dlq.NewPostgresStore(db)
manager := dlq.NewManager(store, transport)

// Store failed message
manager.Store(ctx, "order.created", msgID, payload, metadata, err, retryCount, "order-service")

// Replay failed messages
replayed, _ := manager.Replay(ctx, dlq.Filter{
    EventName:      "order.created",
    ExcludeRetried: true,
})

// Get statistics
stats, _ := manager.Stats(ctx)
fmt.Printf("Pending: %d\n", stats.PendingMessages)

Scheduled Messages

Use event-scheduler for delayed delivery:

import scheduler "github.com/rbaliyan/event-scheduler"

sched := scheduler.NewRedisScheduler(redisClient, transport,
    scheduler.WithPollInterval(100*time.Millisecond),
)
defer sched.Close(ctx)

go sched.Start(ctx)

// Schedule for future delivery
sched.Schedule(ctx, "order.reminder", payload, time.Now().Add(24*time.Hour))

// Schedule with ID for cancellation
sched.ScheduleWithID(ctx, "reminder-123", "order.reminder", payload, deliverAt)
sched.Cancel(ctx, "reminder-123")

Rate Limiting

Use event-extras/ratelimit for rate limiting:

import "github.com/rbaliyan/event-extras/ratelimit"

// Local rate limiter
limiter := ratelimit.NewTokenBucket(100, 10) // 100 rps, burst of 10

// Distributed rate limiter
limiter := ratelimit.NewRedisLimiter(redisClient, "api-service", 100, time.Second)

// Use in handler
if limiter.Allow(ctx) {
    processRequest()
} else {
    return errors.New("rate limited")
}

// Or block until allowed
if err := limiter.Wait(ctx); err != nil {
    return err
}
processRequest()

Saga Orchestration

Use event-extras/saga for distributed transactions:

import "github.com/rbaliyan/event-extras/saga"

orderSaga := saga.New("order-creation",
    &CreateOrderStep{orderService},
    &ReserveInventoryStep{inventoryService},
    &ProcessPaymentStep{paymentService},
).
    WithStore(saga.NewRedisStore(redisClient)).
    WithBackoff(&backoff.Exponential{Initial: time.Second, Max: 30*time.Second}).
    WithMaxRetries(3)

// Execute saga
sagaID := uuid.New().String()
if err := orderSaga.Execute(ctx, sagaID, order); err != nil {
    // Compensations were automatically run
    log.Error("order creation failed", "saga_id", sagaID, "error", err)
}

Database Support

Component PostgreSQL MongoDB Redis In-Memory
Outbox -
Idempotency -
Poison - -
Monitor -
Schema Registry
DLQ
Scheduler -
Saga
Distributed WP -

Testing

Use built-in test utilities:

func TestOrderHandler(t *testing.T) {
    bus := event.TestBus(channel.New())
    defer bus.Close(context.Background())

    handler := event.NewTestHandler(func(ctx context.Context, e event.Event[Order], order Order) error {
        return nil
    })

    orderEvent := event.New[Order]("order.created")
    event.Register(ctx, bus, orderEvent)

    orderEvent.Subscribe(ctx, handler.Handler())
    orderEvent.Publish(ctx, Order{ID: "test"})

    if !handler.WaitFor(1, 100*time.Millisecond) {
        t.Error("handler not called")
    }

    orders := handler.Received()
    if orders[0].ID != "test" {
        t.Error("wrong order ID")
    }
}

License

MIT License - see LICENSE for details.

About

Event Manager

Resources

License

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages