Skip to content

soltiHQ/taskvisor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

68 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Minimum Rust 1.85 Crates.io Apache 2.0

Install

[dependencies]
taskvisor = "0.0"

With features:

[dependencies]
taskvisor = { version = "0.0", features = ["controller", "logging"] }

See crates.io for the latest version.

Taskvisor

Lightweight, event-driven task supervision for async Rust.

A small orchestrator that runs your background tasks, restarts them on failure, and tells you what happened.

 ┌────────────┐     runs & restarts    ┌──────────────┐
 │   Tasks    │ <───────────────────── │  Supervisor  │
 └────────────┘                        └──────┬───────┘
                                              |
                                         emits events
                                              |
                                       ┌──────────────┐
                                       │ Subscribers  │ <─ your metrics / logs
                                       └──────────────┘

Use it for long-lived jobs, background workers, or anything that must stay alive, observable, and restart safely.

API Docs Examples

Quick start

A task that prints "pong" every 10 seconds, restarts forever, and shuts down on Ctrl+C:

use std::time::Duration;
use tokio_util::sync::CancellationToken;
use taskvisor::{
    Supervisor, SupervisorConfig, TaskFn, TaskSpec, TaskRef,
    TaskError, RestartPolicy, BackoffPolicy,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let sup = Supervisor::new(SupervisorConfig::default(), vec![]);

    let ping: TaskRef = TaskFn::arc("ping", |ctx: CancellationToken| async move {
        tokio::select! {
            _ = ctx.cancelled() => return Err(TaskError::Canceled),
            _ = tokio::time::sleep(Duration::from_millis(100)) => {}
        }
        println!("[ping] pong");
        Ok(())
    });

    let spec = TaskSpec::new(
        ping,
        RestartPolicy::Always { interval: Some(Duration::from_secs(10)) },
        BackoffPolicy::default(),
        None, // no timeout
    );

    sup.run(vec![spec]).await?;
    Ok(())
}

Core concepts

Task & TaskFn — A Task is any Send + Sync + 'static type that implements fn spawn(&self, ctx: CancellationToken) -> BoxTaskFuture. TaskFn wraps a closure into a Task so you don't need a struct. TaskRef is just Arc<dyn Task>.

TaskSpec — Bundles a task with its policies: restart, backoff, and optional timeout. This is what you pass to the supervisor.

RestartPolicy — Controls when a task restarts after it exits:

Policy Behavior
Never Run once, done.
OnFailure (default) Restart only on error. Success = stop.
Always { interval } Always restart. interval: Some(10s) waits between successes.

BackoffPolicy — Controls retry delay after failure. Delay for attempt n = first * factor^n, capped at max, then jitter is applied:

Field Default Meaning
first 100ms Initial delay
max 30s Delay cap
factor 1.0 Multiplier per attempt (2.0 = exponential)
jitter None Randomization strategy (see below)

JitterPolicy — Prevents thundering herd when multiple tasks retry at the same time:

Policy Range Use when
None exact delay Single task, predictable timing
Full [0, delay] Maximum spread needed
Equal (recommended) [delay/2, delay] Balanced: preserves ~75% of backoff
Decorrelated [base, base*3] capped at max Sophisticated, self-adjusting

Supervisor — Owns the runtime. Spawns task actors, distributes events to subscribers, handles shutdown.

Events & Subscribe — Every lifecycle change (start, stop, fail, timeout, backoff, shutdown) is published to a broadcast bus. Implement Subscribe to observe them. Each subscriber gets its own queue — a slow subscriber never blocks others.


Recipes

Exponential backoff with jitter

use std::time::Duration;
use taskvisor::{BackoffPolicy, JitterPolicy};

let backoff = BackoffPolicy {
    first: Duration::from_millis(200),
    max: Duration::from_secs(30),
    factor: 2.0,                    // 200ms -> 400ms -> 800ms -> ...
    jitter: JitterPolicy::Equal,    // recommended: [delay/2, delay]
};

Per-task timeout

// Task gets 5s per attempt. If exceeded: TimeoutHit event + TaskError::Timeout + retry.
let spec = TaskSpec::new(task, RestartPolicy::OnFailure, backoff, Some(Duration::from_secs(5)));

Custom subscriber (metrics)

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use taskvisor::{Subscribe, Event, EventKind};

struct Metrics { failures: AtomicU64 }

#[async_trait]
impl Subscribe for Metrics {
    async fn on_event(&self, event: &Event) {
        if matches!(event.kind, EventKind::TaskFailed) {
            self.failures.fetch_add(1, Ordering::Relaxed);
        }
    }
}

// Pass to supervisor:
let metrics = Arc::new(Metrics { failures: AtomicU64::new(0) });
let sup = Supervisor::new(config, vec![metrics]);

Periodic task (run every N seconds)

// Runs, completes, waits 30s, runs again. Forever.
let spec = TaskSpec::new(
    task,
    RestartPolicy::Always { interval: Some(Duration::from_secs(30)) },
    BackoffPolicy::default(), // backoff only applies on failure
    None,
);

Runtime task management

// Add/remove/cancel while running
sup.add_task(spec).await?;
sup.cancel("task-name").await?;
sup.remove_task("task-name").await?;
let alive = sup.is_alive("task-name").await;
let tasks = sup.list_tasks().await;

Error handling

Return these from your task to control what happens next:

Return Retryable What happens
Ok(()) - Task completed. RestartPolicy decides next step.
Err(TaskError::Fail { reason }) Yes Retryable failure. Backoff, then retry.
Err(TaskError::Timeout { .. }) Yes Set automatically when per-task timeout is exceeded.
Err(TaskError::Fatal { reason }) No Permanent failure. Actor stops, publishes ActorDead.
Err(TaskError::Canceled) No Graceful shutdown. Not an error.

Cancellation pattern

Tasks must observe cancellation. Two equivalent patterns:

// Pattern 1: select! (recommended for long-running tasks)
tokio::select! {
    _ = ctx.cancelled() => Err(TaskError::Canceled),
    result = do_work() => result,
}

// Pattern 2: check before work (ok for short tasks)
if ctx.is_cancelled() { return Err(TaskError::Canceled); }
do_work().await

Events

Every lifecycle change publishes an Event to the bus. Subscribe to observe them.

Event Meaning
Lifecycle
TaskStarting Attempt is beginning
TaskStopped Completed or cancelled gracefully
TaskFailed Attempt failed (retryable or fatal)
TimeoutHit Attempt exceeded its timeout
BackoffScheduled Retry delay scheduled (includes delay duration)
Terminal
ActorExhausted Restart policy says stop (normal end-of-life)
ActorDead Fatal error, actor will not restart
Management
TaskAdded / TaskRemoved Task registered / unregistered
TaskAddRequested / TaskRemoveRequested Add/remove commands received
Shutdown
ShutdownRequested OS signal caught (SIGTERM/SIGINT)
AllStoppedWithinGrace Clean shutdown
GraceExceeded Some tasks didn't stop in time
Subscriber
SubscriberPanicked A subscriber panicked (isolated, others unaffected)
SubscriberOverflow Subscriber queue full, event dropped

Each event carries: kind, task (name), attempt, reason, delay_ms, timeout_ms, seq (monotonic ordering), at (timestamp).


Configuration

use taskvisor::SupervisorConfig;

let mut cfg = SupervisorConfig::default();
cfg.grace = Duration::from_secs(30);   // shutdown grace period
cfg.timeout = Duration::from_secs(5);  // default per-task timeout (0 = none)
cfg.max_concurrent = 4;                // task concurrency limit (0 = unlimited)
cfg.bus_capacity = 2048;               // event bus ring buffer size
cfg.restart = RestartPolicy::OnFailure;
cfg.backoff = BackoffPolicy::default();
Field Default Meaning
grace 60s How long to wait for tasks to stop on shutdown
timeout 0s (none) Default per-task attempt timeout
max_concurrent 0 (unlimited) Global semaphore for running tasks
bus_capacity 1024 Broadcast channel size. Slow subscribers see Lagged
restart OnFailure Default restart policy for tasks
backoff 100ms / 1.0x / 30s max Default backoff for tasks

Controller (feature = controller)

Slot-based admission control. Tasks submit to named slots; the policy decides what happens when a slot is busy.

Policy Behavior
Queue FIFO queue. New task waits until current one finishes.
Replace Cancels running task, starts the new one immediately.
DropIfRunning Rejects submission if slot is already busy.
use taskvisor::{ControllerSpec, ControllerConfig};

let sup = Supervisor::builder(cfg)
    .with_controller(ControllerConfig::default())
    .build();

// Submit with admission policy
sup.submit(ControllerSpec::queue(spec)).await?;
sup.submit(ControllerSpec::replace(spec)).await?;
sup.submit(ControllerSpec::drop_if_running(spec)).await?;

How it works

TaskFn (your async code)
   |
   v
TaskSpec (task + RestartPolicy + BackoffPolicy + timeout)
   |
   v
Supervisor
   ├── Registry
   │     └── TaskActor (per task)
   │           ├── attempt loop
   │           │     ├── run task with timeout
   │           │     ├── apply RestartPolicy
   │           │     └── apply BackoffPolicy on failure
   │           └── publish events to Bus
   └── Bus
         └── Subscribers (your metrics, logs, alerts)

Examples

cargo run --example subscriber
cargo run --example control
cargo run --example controller --features controller
Example What it shows
subscriber.rs Custom event subscriber tracking task metrics
control.rs Add, remove, cancel tasks at runtime
controller.rs Admission policies: Queue, Replace, DropIfRunning

Optional features

Feature What it enables
controller Controller, ControllerSpec, ControllerConfig, AdmissionPolicy
logging Built-in LogWriter subscriber that prints events to stdout (demo only)

Contributing

Found a bug? Have an idea? Pull requests and issues are welcome.

About

Event-driven task orchestration library for Rust

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages