From de2dcde03c3b6939bb483d502a3546978d32c006 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Thu, 26 Feb 2026 19:55:24 +0000 Subject: [PATCH 1/5] Initial --- lib/statix/application.ex | 28 +++- lib/statix/conn.ex | 38 ++++- lib/statix/conn_tracker.ex | 230 +++++++++++++++++++++++++---- test/statix/uds_reconnect_test.exs | 208 ++++++++++++++++++++++++++ test/statix/uds_test.exs | 10 +- 5 files changed, 465 insertions(+), 49 deletions(-) create mode 100644 test/statix/uds_reconnect_test.exs diff --git a/lib/statix/application.ex b/lib/statix/application.ex index e0721fd..025a039 100644 --- a/lib/statix/application.ex +++ b/lib/statix/application.ex @@ -1,13 +1,33 @@ defmodule Statix.Application do @moduledoc false - use Application + @doc """ + Starts the Statix supervision tree lazily on first UDS connection. + No-op if already started. Safe to call concurrently. + """ + def ensure_started do + if GenServer.whereis(Statix.ConnTracker) do + :ok + else + do_start() + end + end - def start(_type, _args) do + defp do_start do children = [ - Statix.ConnTracker + {DynamicSupervisor, strategy: :one_for_one, name: Statix.DynamicSupervisor} ] - Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor) + case Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor) do + {:ok, _pid} -> start_conn_tracker() + {:error, {:already_started, _pid}} -> start_conn_tracker() + end + end + + defp start_conn_tracker do + case DynamicSupervisor.start_child(Statix.DynamicSupervisor, Statix.ConnTracker) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + end end end diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index 1604e55..93c29d0 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -60,19 +60,45 @@ defmodule Statix.Conn do end end - def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options) + def safe_open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do + with {:ok, sock} <- :socket.open(:local, :dgram, :default) do + path_addr = %{family: :local, path: String.to_charlist(path)} + + case :socket.connect(sock, path_addr) do + :ok -> + {:ok, %__MODULE__{conn | sock: sock}} + + {:error, reason} -> + :socket.close(sock) + {:error, reason} + end + end + end + + def transmit( + %__MODULE__{sock: sock, prefix: prefix} = conn, + type, + key, + val, + options, + opts \\ [] + ) when is_binary(val) and is_list(options) do result = prefix |> Packet.build(type, key, val, options) |> transmit(conn) + should_log = Keyword.get(opts, :log, true) + with {:error, error} <- result do - Logger.error(fn -> - if(is_atom(sock), do: "", else: "Statix ") <> - "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> - " error=#{inspect(error)}" - end) + if should_log do + Logger.error(fn -> + if(is_atom(sock), do: "", else: "Statix ") <> + "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> + " error=#{inspect(error)}" + end) + end end result diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex index daf8db7..b61afe4 100644 --- a/lib/statix/conn_tracker.ex +++ b/lib/statix/conn_tracker.ex @@ -5,51 +5,70 @@ defmodule Statix.ConnTracker do alias Statix.Conn + require Logger + + @backoff_steps [1_000, 5_000, 30_000, 60_000, 120_000, 300_000] + def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + @doc """ + Ensures ConnTracker is running. Called lazily on first UDS connection. + No-op if already started. Starts the entire supervision tree if needed. + """ + defdelegate ensure_started, to: Statix.Application + @impl true def init(_opts) do table = :ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true]) - {:ok, table} + {:ok, %{table: table, unhealthy: %{}, conn_templates: %{}, pool_sizes: %{}}} end - @doc """ - Stores a list of connections for the given key, replacing any existing connections. - - Closes old connections before storing new ones to prevent resource leaks. - Typically called when establishing a UDS connection pool with pool_size > 1. - """ - @spec set(key :: term(), connections :: [Conn.t()]) :: :ok - def set(key, connections) do - GenServer.call(__MODULE__, {:set, key, connections}) + @spec set(key :: term(), connections :: [Conn.t()], opts :: keyword()) :: :ok + def set(key, connections, opts \\ []) do + GenServer.call(__MODULE__, {:set, key, connections, opts}) end - @doc """ - Retrieves a random connection for the given key. - - Returns `{:ok, conn}` with a randomly selected connection from the pool, - or `{:error, :not_found}` if no connections exist for the key. - """ @spec get(key :: term()) :: {:ok, Conn.t()} | {:error, :not_found} def get(key) do case :ets.lookup(:statix_conn_tracker, key) do - [{^key, connections}] -> - random_connection = Enum.random(connections) - {:ok, random_connection} + [{^key, [_ | _] = connections}] -> + {:ok, Enum.random(connections)} - [] -> + _ -> {:error, :not_found} end + rescue + ArgumentError -> {:error, :not_found} + end + + @doc """ + Check if a path is marked unhealthy. Lock-free ETS read for hot path. + """ + @spec unhealthy?(key :: term()) :: boolean() + def unhealthy?(key) do + :ets.lookup(:statix_conn_tracker, {:unhealthy, key}) != [] + rescue + ArgumentError -> false + end + + @doc """ + Report a send error for the given path. Non-blocking cast. + If path is not yet unhealthy, marks it and starts the health-check loop. + If already unhealthy, increments the lost metric count. + """ + @spec report_send_error(key :: term()) :: :ok + def report_send_error(key) do + GenServer.cast(__MODULE__, {:report_send_error, key}) end @impl true - def handle_call({:set, key, connections}, _from, state) do + def handle_call({:set, key, connections, opts}, _from, state) do # Close old connections before replacing them - case :ets.lookup(state, key) do + case :ets.lookup(state.table, key) do [{^key, old_connections}] -> close_connections(old_connections) @@ -57,17 +76,160 @@ defmodule Statix.ConnTracker do :ok end - :ets.insert(state, {key, connections}) - {:reply, :ok, state} + :ets.insert(state.table, {key, connections}) + + # Clear unhealthy state if present — a manual connect() supersedes the health-check loop. + # Cancel the pending timer so it doesn't close these fresh connections. + :ets.delete(state.table, {:unhealthy, key}) + + unhealthy = + case Map.pop(state.unhealthy, key) do + {nil, map} -> + map + + {entry, map} -> + Process.cancel_timer(entry.timer_ref) + map + end + + # Store conn_template and pool_size if provided + conn_templates = + case Keyword.fetch(opts, :conn_template) do + {:ok, template} -> Map.put(state.conn_templates, key, template) + :error -> state.conn_templates + end + + pool_sizes = + case Keyword.fetch(opts, :pool_size) do + {:ok, size} -> Map.put(state.pool_sizes, key, size) + :error -> state.pool_sizes + end + + {:reply, :ok, + %{state | conn_templates: conn_templates, unhealthy: unhealthy, pool_sizes: pool_sizes}} + end + + @impl true + def handle_cast({:report_send_error, path}, state) do + if Map.has_key?(state.unhealthy, path) do + # Already unhealthy — just bump lost count + state = update_in(state.unhealthy[path].lost_count, &(&1 + 1)) + {:noreply, state} + else + case Map.fetch(state.conn_templates, path) do + {:ok, conn_template} -> + # Mark as unhealthy, insert ETS sentinel, schedule first health-check + :ets.insert(state.table, {{:unhealthy, path}, true}) + + pool_size = Map.get(state.pool_sizes, path, 1) + delay = backoff_ms(0) + timer_ref = Process.send_after(self(), {:health_check, path}, delay) + + unhealthy_entry = %{ + backoff_index: 0, + timer_ref: timer_ref, + conn_template: conn_template, + pool_size: pool_size, + lost_count: 1 + } + + Logger.warning( + "Statix: UDS path #{path} marked unhealthy, " <> + "scheduling reconnect in #{delay}ms" + ) + + {:noreply, put_in(state.unhealthy[path], unhealthy_entry)} + + :error -> + # No template stored — can't reconnect. This shouldn't happen. + Logger.error("Statix: UDS path #{path} has no conn_template, cannot reconnect") + {:noreply, state} + end + end + end + + @impl true + def handle_info({:health_check, path}, state) do + case Map.fetch(state.unhealthy, path) do + {:ok, entry} -> + attempt_reconnect(path, entry, state) + + :error -> + # No longer unhealthy (race with successful set?) + {:noreply, state} + end + end + + defp attempt_reconnect(path, entry, state) do + # Capture old connections for cleanup AFTER new ones are in ETS. + # This avoids a window where get/1 returns conns with closed sockets. + old_conns = + case :ets.lookup(state.table, path) do + [{^path, conns}] -> conns + [] -> [] + end + + # Try to open pool_size new sockets + results = + Enum.map(1..entry.pool_size, fn _ -> + Conn.safe_open(entry.conn_template) + end) + + {successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1)) + + cond do + successes != [] -> + # At least some sockets opened — swap in new connections, then close old. + # Accepts partial success: better to have some working sockets than none. + connections = Enum.map(successes, fn {:ok, conn} -> conn end) + :ets.insert(state.table, {path, connections}) + :ets.delete(state.table, {:unhealthy, path}) + close_connections(old_conns) + + if failures == [] do + Logger.info( + "Statix: reconnected UDS path #{path} " <> + "after losing #{entry.lost_count} metric(s)" + ) + else + opened = length(successes) + failed = length(failures) + + Logger.info( + "Statix: partially reconnected UDS path #{path} " <> + "(#{opened}/#{opened + failed} sockets), " <> + "lost #{entry.lost_count} metric(s)" + ) + end + + {:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}} + + true -> + # All failed — keep stale conns in ETS, schedule retry with backoff + next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1) + delay = backoff_ms(next_index) + timer_ref = Process.send_after(self(), {:health_check, path}, delay) + + Logger.warning( + "Statix: reconnect failed for UDS path #{path}, " <> + "#{entry.lost_count} metric(s) lost so far, retrying in #{delay}ms" + ) + + updated_entry = %{entry | backoff_index: next_index, timer_ref: timer_ref} + {:noreply, put_in(state.unhealthy[path], updated_entry)} + end end @impl true - def terminate(_reason, table) do - # Close all sockets before table is destroyed + def terminate(_reason, %{table: table}) do :ets.foldl( - fn {_path, connections}, acc -> - close_connections(connections) - acc + fn + {{:unhealthy, _}, _}, acc -> + acc + + {_path, connections}, acc -> + close_connections(connections) + acc end, nil, table @@ -78,9 +240,17 @@ defmodule Statix.ConnTracker do defp close_connections(connections) do Enum.each(connections, fn conn -> - if conn.transport == :uds and is_reference(conn.sock) do + try do :socket.close(conn.sock) + catch + _, _ -> :ok end end) end + + defp backoff_ms(index) do + base = Enum.at(@backoff_steps, index, List.last(@backoff_steps)) + jitter = trunc(base * 0.1) + base - jitter + :rand.uniform(jitter * 2 + 1) - 1 + end end diff --git a/test/statix/uds_reconnect_test.exs b/test/statix/uds_reconnect_test.exs new file mode 100644 index 0000000..53d5943 --- /dev/null +++ b/test/statix/uds_reconnect_test.exs @@ -0,0 +1,208 @@ +Code.require_file("../support/uds_test_server.exs", __DIR__) + +defmodule Statix.UDSReconnectTest do + use ExUnit.Case + + @moduletag :uds + + defmodule TestStatix do + use Statix, runtime_config: true + end + + describe "pathology: stale UDS sockets after server restart" do + test "sends fail after server goes away, and remain broken after server returns" do + socket_path = "/tmp/statix_reconnect_test_#{:erlang.unique_integer([:positive])}.sock" + + # Don't use UDSTestServer.setup/1 — it registers on_exit callbacks + # that fail when we manually stop the servers during the test. + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server) + :ok = GenServer.call(__MODULE__.Server, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + # Verify happy path works + TestStatix.increment("baseline") + assert_receive {:test_server, _, "baseline:1|c"}, 1000 + + # Kill the test server (simulates Datadog agent restart) + GenServer.stop(server) + Process.sleep(100) + + # Attempt to send — should return an error + result_after_kill = TestStatix.increment("after_kill") + + assert {:error, _reason} = result_after_kill, + "Expected send to fail after server shutdown, got: #{inspect(result_after_kill)}" + + # Restart the test server on the same socket path + {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server2) + :ok = GenServer.call(__MODULE__.Server2, {:set_current_test, self()}) + + # Attempt to send with the old (stale) sockets — should STILL fail + result_after_restart = TestStatix.increment("after_restart") + + assert {:error, _reason} = result_after_restart, + "Expected send to STILL fail with stale sockets after server restart, " <> + "got: #{inspect(result_after_restart)}. " <> + "If this passes (:ok), the pathology may not exist as expected." + + GenServer.stop(server2) + File.rm(socket_path) + end + end + + describe "Conn.safe_open/1" do + test "returns {:ok, conn} on success" do + socket_path = "/tmp/statix_safe_open_test_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.SafeOpenServer) + + conn = Statix.Conn.new(socket_path, nil) + assert {:ok, %Statix.Conn{transport: :uds, sock: sock}} = Statix.Conn.safe_open(conn) + assert sock != nil + + :socket.close(sock) + GenServer.stop(server) + File.rm(socket_path) + end + + test "returns {:error, reason} when socket path does not exist" do + conn = + Statix.Conn.new( + "/tmp/statix_nonexistent_#{:erlang.unique_integer([:positive])}.sock", + nil + ) + + assert {:error, _reason} = Statix.Conn.safe_open(conn) + end + end + + describe "ConnTracker state" do + test "set/3 stores conn_template for reconnection" do + socket_path = "/tmp/statix_ct_state_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.CTStateServer) + + Statix.ConnTracker.ensure_started() + + conn_template = Statix.Conn.new(socket_path, nil) + {:ok, opened} = Statix.Conn.safe_open(conn_template) + + Statix.ConnTracker.set(socket_path, [opened], conn_template: conn_template) + + assert {:ok, %Statix.Conn{transport: :uds}} = Statix.ConnTracker.get(socket_path) + + GenServer.stop(server) + File.rm(socket_path) + end + end + + describe "error detection and marking" do + test "send failure marks path unhealthy and starts health-check" do + socket_path = "/tmp/statix_error_detect_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ErrorDetectServer) + :ok = GenServer.call(__MODULE__.ErrorDetectServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + TestStatix.increment("baseline") + assert_receive {:test_server, _, "baseline:1|c"}, 1000 + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("after_kill") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert Statix.ConnTracker.unhealthy?(socket_path) + + File.rm(socket_path) + end + end + + describe "full reconnect cycle" do + test "health-check reconnects after server restart" do + socket_path = "/tmp/statix_reconnect_cycle_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ReconnectServer) + :ok = GenServer.call(__MODULE__.ReconnectServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + TestStatix.increment("before") + assert_receive {:test_server, _, "before:1|c"}, 1000 + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("during_outage") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert Statix.ConnTracker.unhealthy?(socket_path) + + {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ReconnectServer2) + :ok = GenServer.call(__MODULE__.ReconnectServer2, {:set_current_test, self()}) + + # Wait for health-check to fire (first backoff is ~1 second with jitter) + Process.sleep(2_000) + + refute Statix.ConnTracker.unhealthy?(socket_path) + + TestStatix.increment("after_reconnect") + assert_receive {:test_server, _, "after_reconnect:1|c"}, 1000 + + GenServer.stop(server2) + File.rm(socket_path) + end + end + + describe "backoff behavior" do + test "backoff index increments on repeated failure" do + socket_path = "/tmp/statix_backoff_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.BackoffServer) + :ok = GenServer.call(__MODULE__.BackoffServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("trigger") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert Statix.ConnTracker.unhealthy?(socket_path) + + # Wait for first health-check to fire and fail (~1s backoff) + Process.sleep(1_500) + + assert Statix.ConnTracker.unhealthy?(socket_path) + + File.rm(socket_path) + end + + test "lost_count tracks dropped metrics" do + socket_path = "/tmp/statix_lost_count_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.LostCountServer) + :ok = GenServer.call(__MODULE__.LostCountServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + GenServer.stop(server) + Process.sleep(100) + + for _ <- 1..10 do + TestStatix.increment("lost_metric") + end + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert Statix.ConnTracker.unhealthy?(socket_path) + + File.rm(socket_path) + end + end +end diff --git a/test/statix/uds_test.exs b/test/statix/uds_test.exs index 8b92354..a3f93f6 100644 --- a/test/statix/uds_test.exs +++ b/test/statix/uds_test.exs @@ -104,8 +104,6 @@ defmodule Statix.UDSTest do end test "large packet over 1024 bytes via UDS maintains atomicity", _context do - # Create tags that will result in a packet > 1024 bytes - # Each tag is roughly 30 chars, so ~35 tags = ~1050 bytes total packet tags = for i <- 1..35 do "very_long_tag_name_#{i}:very_long_tag_value_#{i}" @@ -113,20 +111,16 @@ defmodule Statix.UDSTest do TestStatix.increment("sample.with.long.metric.name", 1, tags: tags) - # Verify we receive the complete packet atomically (all or nothing) assert_receive {:test_server, _, packet}, 1000 - # Verify packet structure is intact and complete assert packet =~ ~r/^sample\.with\.long\.metric\.name:1\|c\|#/ assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1") assert String.contains?(packet, "very_long_tag_name_35:very_long_tag_value_35") - # Verify packet size exceeds 1024 bytes assert byte_size(packet) > 1024 end test "very large packet over 4096 bytes via UDS maintains atomicity", _context do - # ~140 tags at 30 chars each = ~4200 bytes total tags = for i <- 1..140 do "very_long_tag_name_#{i}:very_long_tag_value_#{i}" @@ -141,7 +135,6 @@ defmodule Statix.UDSTest do assert String.contains?(packet, "very_long_tag_name_140:very_long_tag_value_140") assert byte_size(packet) > 4096 - # Verify atomicity: all 140 tags present (no truncation) tag_count = packet |> String.split(",") |> length() assert tag_count == 140 end @@ -153,11 +146,10 @@ defmodule Statix.UDSTest do assert length(connections) == 3 socket_refs = - for i <- 1..1000 do + for _i <- 1..1000 do {:ok, conn} = Statix.ConnTracker.get(context[:socket_path]) TestStatix.increment("pooled.metric") assert_receive {:test_server, _, _packet} - # Return the socket reference conn.sock end From c63e8db88e260202dcb299fdf67a597e6f3fbce3 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Fri, 27 Feb 2026 04:44:25 +0000 Subject: [PATCH 2/5] Fixup --- lib/statix.ex | 14 ++++++++++++-- mix.exs | 1 - 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/statix.ex b/lib/statix.ex index 2bf21d8..8aa6b2f 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -396,12 +396,17 @@ defmodule Statix do def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do # UDS sockets are socket references (not ports), so they cannot be registered as process names. # Instead, store them in ConnTracker's ETS table. + Statix.ConnTracker.ensure_started() + connections = Enum.map(pool, fn _name -> Conn.open(conn) end) - Statix.ConnTracker.set(path, connections) + Statix.ConnTracker.set(path, connections, + conn_template: conn, + pool_size: length(pool) + ) end def open(%__MODULE__{conn: conn, pool: pool}) do @@ -425,7 +430,12 @@ defmodule Statix do case Statix.ConnTracker.get(path) do {:ok, conn} -> - Conn.transmit(conn, type, key, to_string(value), options) + case Conn.transmit(conn, type, key, to_string(value), options) do + :ok -> :ok + {:error, _reason} = error -> + Statix.ConnTracker.report_send_error(path) + error + end {:error, :not_found} -> {:error, :socket_not_found} diff --git a/mix.exs b/mix.exs index 6d74506..87695f1 100644 --- a/mix.exs +++ b/mix.exs @@ -23,7 +23,6 @@ defmodule Statix.Mixfile do def application() do [ - mod: {Statix.Application, []}, extra_applications: [:logger] ] end From 9b0dc079e8b0c22ea6be15245f70b7c4ce2c26f4 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Sat, 28 Feb 2026 05:58:27 +0000 Subject: [PATCH 3/5] Various fixups --- lib/statix.ex | 4 +- lib/statix/application.ex | 26 ++----- lib/statix/conn.ex | 23 ++---- lib/statix/conn_tracker.ex | 112 ++++++++++++----------------- test/statix/uds_reconnect_test.exs | 14 ++-- 5 files changed, 69 insertions(+), 110 deletions(-) diff --git a/lib/statix.ex b/lib/statix.ex index 8aa6b2f..06cd616 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -431,7 +431,9 @@ defmodule Statix do case Statix.ConnTracker.get(path) do {:ok, conn} -> case Conn.transmit(conn, type, key, to_string(value), options) do - :ok -> :ok + :ok -> + :ok + {:error, _reason} = error -> Statix.ConnTracker.report_send_error(path) error diff --git a/lib/statix/application.ex b/lib/statix/application.ex index 025a039..7599419 100644 --- a/lib/statix/application.ex +++ b/lib/statix/application.ex @@ -9,25 +9,13 @@ defmodule Statix.Application do if GenServer.whereis(Statix.ConnTracker) do :ok else - do_start() - end - end - - defp do_start do - children = [ - {DynamicSupervisor, strategy: :one_for_one, name: Statix.DynamicSupervisor} - ] - - case Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor) do - {:ok, _pid} -> start_conn_tracker() - {:error, {:already_started, _pid}} -> start_conn_tracker() - end - end - - defp start_conn_tracker do - case DynamicSupervisor.start_child(Statix.DynamicSupervisor, Statix.ConnTracker) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok + case Supervisor.start_link([Statix.ConnTracker], + strategy: :one_for_one, + name: Statix.Supervisor + ) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + end end end end diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index 93c29d0..b85e71f 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -75,30 +75,19 @@ defmodule Statix.Conn do end end - def transmit( - %__MODULE__{sock: sock, prefix: prefix} = conn, - type, - key, - val, - options, - opts \\ [] - ) + def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options) when is_binary(val) and is_list(options) do result = prefix |> Packet.build(type, key, val, options) |> transmit(conn) - should_log = Keyword.get(opts, :log, true) - with {:error, error} <- result do - if should_log do - Logger.error(fn -> - if(is_atom(sock), do: "", else: "Statix ") <> - "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> - " error=#{inspect(error)}" - end) - end + Logger.error(fn -> + if(is_atom(sock), do: "", else: "Statix ") <> + "#{inspect(sock)} #{type} metric \"#{key}\" lost value #{val}" <> + " error=#{inspect(error)}" + end) end result diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex index b61afe4..b54f9fd 100644 --- a/lib/statix/conn_tracker.ex +++ b/lib/statix/conn_tracker.ex @@ -45,16 +45,6 @@ defmodule Statix.ConnTracker do ArgumentError -> {:error, :not_found} end - @doc """ - Check if a path is marked unhealthy. Lock-free ETS read for hot path. - """ - @spec unhealthy?(key :: term()) :: boolean() - def unhealthy?(key) do - :ets.lookup(:statix_conn_tracker, {:unhealthy, key}) != [] - rescue - ArgumentError -> false - end - @doc """ Report a send error for the given path. Non-blocking cast. If path is not yet unhealthy, marks it and starts the health-check loop. @@ -80,8 +70,6 @@ defmodule Statix.ConnTracker do # Clear unhealthy state if present — a manual connect() supersedes the health-check loop. # Cancel the pending timer so it doesn't close these fresh connections. - :ets.delete(state.table, {:unhealthy, key}) - unhealthy = case Map.pop(state.unhealthy, key) do {nil, map} -> @@ -118,8 +106,10 @@ defmodule Statix.ConnTracker do else case Map.fetch(state.conn_templates, path) do {:ok, conn_template} -> - # Mark as unhealthy, insert ETS sentinel, schedule first health-check - :ets.insert(state.table, {{:unhealthy, path}, true}) + # Close and remove stale connections — they're permanently broken. + # UDS DGRAM sockets to a dead server will never recover on their own; + # only opening new sockets after the server restarts can restore service. + close_and_remove(state.table, path) pool_size = Map.get(state.pool_sizes, path, 1) delay = backoff_ms(0) @@ -160,16 +150,14 @@ defmodule Statix.ConnTracker do end end + # All-or-nothing reconnection strategy. + # + # Unlike UDP over a network, a UDS socket is a local-host resource: the server + # socket file either exists on the filesystem or it doesn't. There is no partial + # reachability. If we can open one DGRAM connection to it, we can open all of + # them; if we can't open one, we can't open any. So partial success doesn't need + # handling — any failure means total failure, and we retry the full pool later. defp attempt_reconnect(path, entry, state) do - # Capture old connections for cleanup AFTER new ones are in ETS. - # This avoids a window where get/1 returns conns with closed sockets. - old_conns = - case :ets.lookup(state.table, path) do - [{^path, conns}] -> conns - [] -> [] - end - - # Try to open pool_size new sockets results = Enum.map(1..entry.pool_size, fn _ -> Conn.safe_open(entry.conn_template) @@ -177,59 +165,40 @@ defmodule Statix.ConnTracker do {successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1)) - cond do - successes != [] -> - # At least some sockets opened — swap in new connections, then close old. - # Accepts partial success: better to have some working sockets than none. - connections = Enum.map(successes, fn {:ok, conn} -> conn end) - :ets.insert(state.table, {path, connections}) - :ets.delete(state.table, {:unhealthy, path}) - close_connections(old_conns) - - if failures == [] do - Logger.info( - "Statix: reconnected UDS path #{path} " <> - "after losing #{entry.lost_count} metric(s)" - ) - else - opened = length(successes) - failed = length(failures) - - Logger.info( - "Statix: partially reconnected UDS path #{path} " <> - "(#{opened}/#{opened + failed} sockets), " <> - "lost #{entry.lost_count} metric(s)" - ) - end + if failures == [] do + connections = Enum.map(successes, fn {:ok, conn} -> conn end) + :ets.insert(state.table, {path, connections}) - {:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}} + Logger.info( + "Statix: reconnected UDS path #{path} " <> + "after losing #{entry.lost_count} metric(s)" + ) - true -> - # All failed — keep stale conns in ETS, schedule retry with backoff - next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1) - delay = backoff_ms(next_index) - timer_ref = Process.send_after(self(), {:health_check, path}, delay) + {:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}} + else + # Close any that happened to open — we need all or nothing. + close_connections(Enum.map(successes, fn {:ok, conn} -> conn end)) + + next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1) + delay = backoff_ms(next_index) + timer_ref = Process.send_after(self(), {:health_check, path}, delay) - Logger.warning( - "Statix: reconnect failed for UDS path #{path}, " <> - "#{entry.lost_count} metric(s) lost so far, retrying in #{delay}ms" - ) + Logger.warning( + "Statix: reconnect failed for UDS path #{path}, " <> + "#{entry.lost_count} metric(s) lost so far, retrying in #{delay}ms" + ) - updated_entry = %{entry | backoff_index: next_index, timer_ref: timer_ref} - {:noreply, put_in(state.unhealthy[path], updated_entry)} + updated_entry = %{entry | backoff_index: next_index, timer_ref: timer_ref} + {:noreply, put_in(state.unhealthy[path], updated_entry)} end end @impl true def terminate(_reason, %{table: table}) do :ets.foldl( - fn - {{:unhealthy, _}, _}, acc -> - acc - - {_path, connections}, acc -> - close_connections(connections) - acc + fn {_path, connections}, acc -> + close_connections(connections) + acc end, nil, table @@ -238,6 +207,17 @@ defmodule Statix.ConnTracker do :ok end + defp close_and_remove(table, path) do + case :ets.lookup(table, path) do + [{^path, connections}] -> + close_connections(connections) + :ets.delete(table, path) + + [] -> + :ok + end + end + defp close_connections(connections) do Enum.each(connections, fn conn -> try do diff --git a/test/statix/uds_reconnect_test.exs b/test/statix/uds_reconnect_test.exs index 53d5943..93ef420 100644 --- a/test/statix/uds_reconnect_test.exs +++ b/test/statix/uds_reconnect_test.exs @@ -114,7 +114,7 @@ defmodule Statix.UDSReconnectTest do # Sync barrier: forces ConnTracker to process all prior casts :sys.get_state(Statix.ConnTracker) - assert Statix.ConnTracker.unhealthy?(socket_path) + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) File.rm(socket_path) end @@ -139,7 +139,7 @@ defmodule Statix.UDSReconnectTest do # Sync barrier: forces ConnTracker to process all prior casts :sys.get_state(Statix.ConnTracker) - assert Statix.ConnTracker.unhealthy?(socket_path) + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ReconnectServer2) :ok = GenServer.call(__MODULE__.ReconnectServer2, {:set_current_test, self()}) @@ -147,7 +147,7 @@ defmodule Statix.UDSReconnectTest do # Wait for health-check to fire (first backoff is ~1 second with jitter) Process.sleep(2_000) - refute Statix.ConnTracker.unhealthy?(socket_path) + assert {:ok, _} = Statix.ConnTracker.get(socket_path) TestStatix.increment("after_reconnect") assert_receive {:test_server, _, "after_reconnect:1|c"}, 1000 @@ -173,17 +173,17 @@ defmodule Statix.UDSReconnectTest do # Sync barrier: forces ConnTracker to process all prior casts :sys.get_state(Statix.ConnTracker) - assert Statix.ConnTracker.unhealthy?(socket_path) + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) # Wait for first health-check to fire and fail (~1s backoff) Process.sleep(1_500) - assert Statix.ConnTracker.unhealthy?(socket_path) + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) File.rm(socket_path) end - test "lost_count tracks dropped metrics" do + test "repeated failures while unhealthy do not crash" do socket_path = "/tmp/statix_lost_count_#{:erlang.unique_integer([:positive])}.sock" {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.LostCountServer) :ok = GenServer.call(__MODULE__.LostCountServer, {:set_current_test, self()}) @@ -200,7 +200,7 @@ defmodule Statix.UDSReconnectTest do # Sync barrier: forces ConnTracker to process all prior casts :sys.get_state(Statix.ConnTracker) - assert Statix.ConnTracker.unhealthy?(socket_path) + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) File.rm(socket_path) end From acfb624ef994af77be84e1282dbbaa31c0230e7c Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Sat, 28 Feb 2026 07:22:31 +0000 Subject: [PATCH 4/5] Simplify! --- lib/statix.ex | 5 +---- lib/statix/application.ex | 17 +++++++---------- lib/statix/conn.ex | 10 +++------- lib/statix/conn_tracker.ex | 38 ++++++++++++++++---------------------- 4 files changed, 27 insertions(+), 43 deletions(-) diff --git a/lib/statix.ex b/lib/statix.ex index 06cd616..e47ba7e 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -403,10 +403,7 @@ defmodule Statix do Conn.open(conn) end) - Statix.ConnTracker.set(path, connections, - conn_template: conn, - pool_size: length(pool) - ) + Statix.ConnTracker.set(path, connections, conn_template: conn) end def open(%__MODULE__{conn: conn, pool: pool}) do diff --git a/lib/statix/application.ex b/lib/statix/application.ex index 7599419..ae2ae39 100644 --- a/lib/statix/application.ex +++ b/lib/statix/application.ex @@ -6,16 +6,13 @@ defmodule Statix.Application do No-op if already started. Safe to call concurrently. """ def ensure_started do - if GenServer.whereis(Statix.ConnTracker) do - :ok - else - case Supervisor.start_link([Statix.ConnTracker], - strategy: :one_for_one, - name: Statix.Supervisor - ) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok - end + case Supervisor.start_link([Statix.ConnTracker], + strategy: :one_for_one, + name: Statix.Supervisor + ) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, reason} -> {:error, reason} end end end diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index b85e71f..e2bf90d 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -47,15 +47,11 @@ defmodule Statix.Conn do raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module." end - {:ok, sock} = :socket.open(:local, :dgram, :default) - path_addr = %{family: :local, path: String.to_charlist(path)} - - case :socket.connect(sock, path_addr) do - :ok -> - %__MODULE__{conn | sock: sock} + case safe_open(conn) do + {:ok, opened} -> + opened {:error, reason} -> - :socket.close(sock) raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}" end end diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex index b54f9fd..fde9af2 100644 --- a/lib/statix/conn_tracker.ex +++ b/lib/statix/conn_tracker.ex @@ -24,7 +24,7 @@ defmodule Statix.ConnTracker do table = :ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true]) - {:ok, %{table: table, unhealthy: %{}, conn_templates: %{}, pool_sizes: %{}}} + {:ok, %{table: table, unhealthy: %{}, path_meta: %{}}} end @spec set(key :: term(), connections :: [Conn.t()], opts :: keyword()) :: :ok @@ -80,21 +80,20 @@ defmodule Statix.ConnTracker do map end - # Store conn_template and pool_size if provided - conn_templates = + # Store conn_template if provided (pool_size is derived from connections length) + path_meta = case Keyword.fetch(opts, :conn_template) do - {:ok, template} -> Map.put(state.conn_templates, key, template) - :error -> state.conn_templates - end + {:ok, template} -> + Map.put(state.path_meta, key, %{ + conn_template: template, + pool_size: length(connections) + }) - pool_sizes = - case Keyword.fetch(opts, :pool_size) do - {:ok, size} -> Map.put(state.pool_sizes, key, size) - :error -> state.pool_sizes + :error -> + state.path_meta end - {:reply, :ok, - %{state | conn_templates: conn_templates, unhealthy: unhealthy, pool_sizes: pool_sizes}} + {:reply, :ok, %{state | path_meta: path_meta, unhealthy: unhealthy}} end @impl true @@ -104,14 +103,13 @@ defmodule Statix.ConnTracker do state = update_in(state.unhealthy[path].lost_count, &(&1 + 1)) {:noreply, state} else - case Map.fetch(state.conn_templates, path) do - {:ok, conn_template} -> + case Map.fetch(state.path_meta, path) do + {:ok, %{conn_template: conn_template, pool_size: pool_size}} -> # Close and remove stale connections — they're permanently broken. # UDS DGRAM sockets to a dead server will never recover on their own; # only opening new sockets after the server restarts can restore service. close_and_remove(state.table, path) - pool_size = Map.get(state.pool_sizes, path, 1) delay = backoff_ms(0) timer_ref = Process.send_after(self(), {:health_check, path}, delay) @@ -208,13 +206,9 @@ defmodule Statix.ConnTracker do end defp close_and_remove(table, path) do - case :ets.lookup(table, path) do - [{^path, connections}] -> - close_connections(connections) - :ets.delete(table, path) - - [] -> - :ok + case :ets.take(table, path) do + [{^path, connections}] -> close_connections(connections) + [] -> :ok end end From 3bbc14803cb04e594c6024a92d237c33ee897dca Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Sat, 28 Feb 2026 07:45:30 +0000 Subject: [PATCH 5/5] More simplifications, actually read comments... --- lib/statix.ex | 3 --- lib/statix/application.ex | 4 ---- lib/statix/conn.ex | 3 +-- lib/statix/conn_tracker.ex | 32 +++++------------------------- test/statix/uds_reconnect_test.exs | 5 ----- 5 files changed, 6 insertions(+), 41 deletions(-) diff --git a/lib/statix.ex b/lib/statix.ex index e47ba7e..7e434c4 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -371,7 +371,6 @@ defmodule Statix do def new(module, options) do config = get_config(module, options) - # Determine transport based on socket_path presence conn = if config.socket_path do Conn.new(config.socket_path, config.prefix) @@ -462,7 +461,6 @@ defmodule Statix do end end - # Takes first :sample_rate occurrence (standard keyword list behavior) defp should_send?([]), do: true defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform() defp should_send?([_ | rest]), do: should_send?(rest) @@ -493,7 +491,6 @@ defmodule Statix do tags: tags } - # Warn if both socket_path and host/port are specified if config.socket_path do has_custom_host = Keyword.has_key?(options, :host) has_custom_port = Keyword.has_key?(options, :port) diff --git a/lib/statix/application.ex b/lib/statix/application.ex index ae2ae39..88ca154 100644 --- a/lib/statix/application.ex +++ b/lib/statix/application.ex @@ -1,10 +1,6 @@ defmodule Statix.Application do @moduledoc false - @doc """ - Starts the Statix supervision tree lazily on first UDS connection. - No-op if already started. Safe to call concurrently. - """ def ensure_started do case Supervisor.start_link([Statix.ConnTracker], strategy: :one_for_one, diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index e2bf90d..b7310fe 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -4,7 +4,6 @@ defmodule Statix.Conn do # sock field holds different types depending on state and transport: # - UDP: port (after open/1) or atom for process name # - UDS: {:socket_path, path} before open/1, socket reference after - # socket_path field preserves the UDS path even after opening defstruct [:sock, :address, :port, :prefix, :transport, :socket_path] alias Statix.Packet @@ -44,7 +43,7 @@ defmodule Statix.Conn do def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do unless Code.ensure_loaded?(:socket) do - raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module." + raise "Unix domain socket support requires OTP 22+" end case safe_open(conn) do diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex index fde9af2..ec1e41c 100644 --- a/lib/statix/conn_tracker.ex +++ b/lib/statix/conn_tracker.ex @@ -13,10 +13,6 @@ defmodule Statix.ConnTracker do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end - @doc """ - Ensures ConnTracker is running. Called lazily on first UDS connection. - No-op if already started. Starts the entire supervision tree if needed. - """ defdelegate ensure_started, to: Statix.Application @impl true @@ -41,15 +37,8 @@ defmodule Statix.ConnTracker do _ -> {:error, :not_found} end - rescue - ArgumentError -> {:error, :not_found} end - @doc """ - Report a send error for the given path. Non-blocking cast. - If path is not yet unhealthy, marks it and starts the health-check loop. - If already unhealthy, increments the lost metric count. - """ @spec report_send_error(key :: term()) :: :ok def report_send_error(key) do GenServer.cast(__MODULE__, {:report_send_error, key}) @@ -57,15 +46,7 @@ defmodule Statix.ConnTracker do @impl true def handle_call({:set, key, connections, opts}, _from, state) do - # Close old connections before replacing them - case :ets.lookup(state.table, key) do - [{^key, old_connections}] -> - close_connections(old_connections) - - [] -> - :ok - end - + close_and_remove(state.table, key) :ets.insert(state.table, {key, connections}) # Clear unhealthy state if present — a manual connect() supersedes the health-check loop. @@ -80,7 +61,6 @@ defmodule Statix.ConnTracker do map end - # Store conn_template if provided (pool_size is derived from connections length) path_meta = case Keyword.fetch(opts, :conn_template) do {:ok, template} -> @@ -99,13 +79,11 @@ defmodule Statix.ConnTracker do @impl true def handle_cast({:report_send_error, path}, state) do if Map.has_key?(state.unhealthy, path) do - # Already unhealthy — just bump lost count state = update_in(state.unhealthy[path].lost_count, &(&1 + 1)) {:noreply, state} else case Map.fetch(state.path_meta, path) do {:ok, %{conn_template: conn_template, pool_size: pool_size}} -> - # Close and remove stale connections — they're permanently broken. # UDS DGRAM sockets to a dead server will never recover on their own; # only opening new sockets after the server restarts can restore service. close_and_remove(state.table, path) @@ -162,10 +140,10 @@ defmodule Statix.ConnTracker do end) {successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1)) + opened = Enum.map(successes, fn {:ok, conn} -> conn end) if failures == [] do - connections = Enum.map(successes, fn {:ok, conn} -> conn end) - :ets.insert(state.table, {path, connections}) + :ets.insert(state.table, {path, opened}) Logger.info( "Statix: reconnected UDS path #{path} " <> @@ -174,8 +152,8 @@ defmodule Statix.ConnTracker do {:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}} else - # Close any that happened to open — we need all or nothing. - close_connections(Enum.map(successes, fn {:ok, conn} -> conn end)) + # All or nothing + close_connections(opened) next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1) delay = backoff_ms(next_index) diff --git a/test/statix/uds_reconnect_test.exs b/test/statix/uds_reconnect_test.exs index 93ef420..d40a38d 100644 --- a/test/statix/uds_reconnect_test.exs +++ b/test/statix/uds_reconnect_test.exs @@ -20,25 +20,20 @@ defmodule Statix.UDSReconnectTest do TestStatix.connect(socket_path: socket_path) - # Verify happy path works TestStatix.increment("baseline") assert_receive {:test_server, _, "baseline:1|c"}, 1000 - # Kill the test server (simulates Datadog agent restart) GenServer.stop(server) Process.sleep(100) - # Attempt to send — should return an error result_after_kill = TestStatix.increment("after_kill") assert {:error, _reason} = result_after_kill, "Expected send to fail after server shutdown, got: #{inspect(result_after_kill)}" - # Restart the test server on the same socket path {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server2) :ok = GenServer.call(__MODULE__.Server2, {:set_current_test, self()}) - # Attempt to send with the old (stale) sockets — should STILL fail result_after_restart = TestStatix.increment("after_restart") assert {:error, _reason} = result_after_restart,