diff --git a/lib/statix.ex b/lib/statix.ex index 2bf21d8..7e434c4 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -371,7 +371,6 @@ defmodule Statix do def new(module, options) do config = get_config(module, options) - # Determine transport based on socket_path presence conn = if config.socket_path do Conn.new(config.socket_path, config.prefix) @@ -396,12 +395,14 @@ defmodule Statix do def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do # UDS sockets are socket references (not ports), so they cannot be registered as process names. # Instead, store them in ConnTracker's ETS table. + Statix.ConnTracker.ensure_started() + connections = Enum.map(pool, fn _name -> Conn.open(conn) end) - Statix.ConnTracker.set(path, connections) + Statix.ConnTracker.set(path, connections, conn_template: conn) end def open(%__MODULE__{conn: conn, pool: pool}) do @@ -425,7 +426,14 @@ defmodule Statix do case Statix.ConnTracker.get(path) do {:ok, conn} -> - Conn.transmit(conn, type, key, to_string(value), options) + case Conn.transmit(conn, type, key, to_string(value), options) do + :ok -> + :ok + + {:error, _reason} = error -> + Statix.ConnTracker.report_send_error(path) + error + end {:error, :not_found} -> {:error, :socket_not_found} @@ -453,7 +461,6 @@ defmodule Statix do end end - # Takes first :sample_rate occurrence (standard keyword list behavior) defp should_send?([]), do: true defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform() defp should_send?([_ | rest]), do: should_send?(rest) @@ -484,7 +491,6 @@ defmodule Statix do tags: tags } - # Warn if both socket_path and host/port are specified if config.socket_path do has_custom_host = Keyword.has_key?(options, :host) has_custom_port = Keyword.has_key?(options, :port) diff --git a/lib/statix/application.ex b/lib/statix/application.ex index e0721fd..88ca154 100644 --- a/lib/statix/application.ex +++ b/lib/statix/application.ex @@ -1,13 +1,14 @@ defmodule Statix.Application do @moduledoc false - use Application - - def start(_type, _args) do - children = [ - Statix.ConnTracker - ] - - Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor) + def ensure_started do + case Supervisor.start_link([Statix.ConnTracker], + strategy: :one_for_one, + name: Statix.Supervisor + ) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, reason} -> {:error, reason} + end end end diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index 1604e55..b7310fe 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -4,7 +4,6 @@ defmodule Statix.Conn do # sock field holds different types depending on state and transport: # - UDP: port (after open/1) or atom for process name # - UDS: {:socket_path, path} before open/1, socket reference after - # socket_path field preserves the UDS path even after opening defstruct [:sock, :address, :port, :prefix, :transport, :socket_path] alias Statix.Packet @@ -44,22 +43,33 @@ defmodule Statix.Conn do def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do unless Code.ensure_loaded?(:socket) do - raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module." + raise "Unix domain socket support requires OTP 22+" end - {:ok, sock} = :socket.open(:local, :dgram, :default) - path_addr = %{family: :local, path: String.to_charlist(path)} - - case :socket.connect(sock, path_addr) do - :ok -> - %__MODULE__{conn | sock: sock} + case safe_open(conn) do + {:ok, opened} -> + opened {:error, reason} -> - :socket.close(sock) raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}" end end + def safe_open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do + with {:ok, sock} <- :socket.open(:local, :dgram, :default) do + path_addr = %{family: :local, path: String.to_charlist(path)} + + case :socket.connect(sock, path_addr) do + :ok -> + {:ok, %__MODULE__{conn | sock: sock}} + + {:error, reason} -> + :socket.close(sock) + {:error, reason} + end + end + end + def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options) when is_binary(val) and is_list(options) do result = diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex index daf8db7..ec1e41c 100644 --- a/lib/statix/conn_tracker.ex +++ b/lib/statix/conn_tracker.ex @@ -5,65 +5,172 @@ defmodule Statix.ConnTracker do alias Statix.Conn + require Logger + + @backoff_steps [1_000, 5_000, 30_000, 60_000, 120_000, 300_000] + def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + defdelegate ensure_started, to: Statix.Application + @impl true def init(_opts) do table = :ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true]) - {:ok, table} + {:ok, %{table: table, unhealthy: %{}, path_meta: %{}}} end - @doc """ - Stores a list of connections for the given key, replacing any existing connections. - - Closes old connections before storing new ones to prevent resource leaks. - Typically called when establishing a UDS connection pool with pool_size > 1. - """ - @spec set(key :: term(), connections :: [Conn.t()]) :: :ok - def set(key, connections) do - GenServer.call(__MODULE__, {:set, key, connections}) + @spec set(key :: term(), connections :: [Conn.t()], opts :: keyword()) :: :ok + def set(key, connections, opts \\ []) do + GenServer.call(__MODULE__, {:set, key, connections, opts}) end - @doc """ - Retrieves a random connection for the given key. - - Returns `{:ok, conn}` with a randomly selected connection from the pool, - or `{:error, :not_found}` if no connections exist for the key. - """ @spec get(key :: term()) :: {:ok, Conn.t()} | {:error, :not_found} def get(key) do case :ets.lookup(:statix_conn_tracker, key) do - [{^key, connections}] -> - random_connection = Enum.random(connections) - {:ok, random_connection} + [{^key, [_ | _] = connections}] -> + {:ok, Enum.random(connections)} - [] -> + _ -> {:error, :not_found} end end + @spec report_send_error(key :: term()) :: :ok + def report_send_error(key) do + GenServer.cast(__MODULE__, {:report_send_error, key}) + end + @impl true - def handle_call({:set, key, connections}, _from, state) do - # Close old connections before replacing them - case :ets.lookup(state, key) do - [{^key, old_connections}] -> - close_connections(old_connections) - - [] -> - :ok + def handle_call({:set, key, connections, opts}, _from, state) do + close_and_remove(state.table, key) + :ets.insert(state.table, {key, connections}) + + # Clear unhealthy state if present — a manual connect() supersedes the health-check loop. + # Cancel the pending timer so it doesn't close these fresh connections. + unhealthy = + case Map.pop(state.unhealthy, key) do + {nil, map} -> + map + + {entry, map} -> + Process.cancel_timer(entry.timer_ref) + map + end + + path_meta = + case Keyword.fetch(opts, :conn_template) do + {:ok, template} -> + Map.put(state.path_meta, key, %{ + conn_template: template, + pool_size: length(connections) + }) + + :error -> + state.path_meta + end + + {:reply, :ok, %{state | path_meta: path_meta, unhealthy: unhealthy}} + end + + @impl true + def handle_cast({:report_send_error, path}, state) do + if Map.has_key?(state.unhealthy, path) do + state = update_in(state.unhealthy[path].lost_count, &(&1 + 1)) + {:noreply, state} + else + case Map.fetch(state.path_meta, path) do + {:ok, %{conn_template: conn_template, pool_size: pool_size}} -> + # UDS DGRAM sockets to a dead server will never recover on their own; + # only opening new sockets after the server restarts can restore service. + close_and_remove(state.table, path) + + delay = backoff_ms(0) + timer_ref = Process.send_after(self(), {:health_check, path}, delay) + + unhealthy_entry = %{ + backoff_index: 0, + timer_ref: timer_ref, + conn_template: conn_template, + pool_size: pool_size, + lost_count: 1 + } + + Logger.warning( + "Statix: UDS path #{path} marked unhealthy, " <> + "scheduling reconnect in #{delay}ms" + ) + + {:noreply, put_in(state.unhealthy[path], unhealthy_entry)} + + :error -> + # No template stored — can't reconnect. This shouldn't happen. + Logger.error("Statix: UDS path #{path} has no conn_template, cannot reconnect") + {:noreply, state} + end end + end - :ets.insert(state, {key, connections}) - {:reply, :ok, state} + @impl true + def handle_info({:health_check, path}, state) do + case Map.fetch(state.unhealthy, path) do + {:ok, entry} -> + attempt_reconnect(path, entry, state) + + :error -> + # No longer unhealthy (race with successful set?) + {:noreply, state} + end + end + + # All-or-nothing reconnection strategy. + # + # Unlike UDP over a network, a UDS socket is a local-host resource: the server + # socket file either exists on the filesystem or it doesn't. There is no partial + # reachability. If we can open one DGRAM connection to it, we can open all of + # them; if we can't open one, we can't open any. So partial success doesn't need + # handling — any failure means total failure, and we retry the full pool later. + defp attempt_reconnect(path, entry, state) do + results = + Enum.map(1..entry.pool_size, fn _ -> + Conn.safe_open(entry.conn_template) + end) + + {successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1)) + opened = Enum.map(successes, fn {:ok, conn} -> conn end) + + if failures == [] do + :ets.insert(state.table, {path, opened}) + + Logger.info( + "Statix: reconnected UDS path #{path} " <> + "after losing #{entry.lost_count} metric(s)" + ) + + {:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}} + else + # All or nothing + close_connections(opened) + + next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1) + delay = backoff_ms(next_index) + timer_ref = Process.send_after(self(), {:health_check, path}, delay) + + Logger.warning( + "Statix: reconnect failed for UDS path #{path}, " <> + "#{entry.lost_count} metric(s) lost so far, retrying in #{delay}ms" + ) + + updated_entry = %{entry | backoff_index: next_index, timer_ref: timer_ref} + {:noreply, put_in(state.unhealthy[path], updated_entry)} + end end @impl true - def terminate(_reason, table) do - # Close all sockets before table is destroyed + def terminate(_reason, %{table: table}) do :ets.foldl( fn {_path, connections}, acc -> close_connections(connections) @@ -76,11 +183,26 @@ defmodule Statix.ConnTracker do :ok end + defp close_and_remove(table, path) do + case :ets.take(table, path) do + [{^path, connections}] -> close_connections(connections) + [] -> :ok + end + end + defp close_connections(connections) do Enum.each(connections, fn conn -> - if conn.transport == :uds and is_reference(conn.sock) do + try do :socket.close(conn.sock) + catch + _, _ -> :ok end end) end + + defp backoff_ms(index) do + base = Enum.at(@backoff_steps, index, List.last(@backoff_steps)) + jitter = trunc(base * 0.1) + base - jitter + :rand.uniform(jitter * 2 + 1) - 1 + end end diff --git a/mix.exs b/mix.exs index 6d74506..87695f1 100644 --- a/mix.exs +++ b/mix.exs @@ -23,7 +23,6 @@ defmodule Statix.Mixfile do def application() do [ - mod: {Statix.Application, []}, extra_applications: [:logger] ] end diff --git a/test/statix/uds_reconnect_test.exs b/test/statix/uds_reconnect_test.exs new file mode 100644 index 0000000..d40a38d --- /dev/null +++ b/test/statix/uds_reconnect_test.exs @@ -0,0 +1,203 @@ +Code.require_file("../support/uds_test_server.exs", __DIR__) + +defmodule Statix.UDSReconnectTest do + use ExUnit.Case + + @moduletag :uds + + defmodule TestStatix do + use Statix, runtime_config: true + end + + describe "pathology: stale UDS sockets after server restart" do + test "sends fail after server goes away, and remain broken after server returns" do + socket_path = "/tmp/statix_reconnect_test_#{:erlang.unique_integer([:positive])}.sock" + + # Don't use UDSTestServer.setup/1 — it registers on_exit callbacks + # that fail when we manually stop the servers during the test. + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server) + :ok = GenServer.call(__MODULE__.Server, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + TestStatix.increment("baseline") + assert_receive {:test_server, _, "baseline:1|c"}, 1000 + + GenServer.stop(server) + Process.sleep(100) + + result_after_kill = TestStatix.increment("after_kill") + + assert {:error, _reason} = result_after_kill, + "Expected send to fail after server shutdown, got: #{inspect(result_after_kill)}" + + {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server2) + :ok = GenServer.call(__MODULE__.Server2, {:set_current_test, self()}) + + result_after_restart = TestStatix.increment("after_restart") + + assert {:error, _reason} = result_after_restart, + "Expected send to STILL fail with stale sockets after server restart, " <> + "got: #{inspect(result_after_restart)}. " <> + "If this passes (:ok), the pathology may not exist as expected." + + GenServer.stop(server2) + File.rm(socket_path) + end + end + + describe "Conn.safe_open/1" do + test "returns {:ok, conn} on success" do + socket_path = "/tmp/statix_safe_open_test_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.SafeOpenServer) + + conn = Statix.Conn.new(socket_path, nil) + assert {:ok, %Statix.Conn{transport: :uds, sock: sock}} = Statix.Conn.safe_open(conn) + assert sock != nil + + :socket.close(sock) + GenServer.stop(server) + File.rm(socket_path) + end + + test "returns {:error, reason} when socket path does not exist" do + conn = + Statix.Conn.new( + "/tmp/statix_nonexistent_#{:erlang.unique_integer([:positive])}.sock", + nil + ) + + assert {:error, _reason} = Statix.Conn.safe_open(conn) + end + end + + describe "ConnTracker state" do + test "set/3 stores conn_template for reconnection" do + socket_path = "/tmp/statix_ct_state_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.CTStateServer) + + Statix.ConnTracker.ensure_started() + + conn_template = Statix.Conn.new(socket_path, nil) + {:ok, opened} = Statix.Conn.safe_open(conn_template) + + Statix.ConnTracker.set(socket_path, [opened], conn_template: conn_template) + + assert {:ok, %Statix.Conn{transport: :uds}} = Statix.ConnTracker.get(socket_path) + + GenServer.stop(server) + File.rm(socket_path) + end + end + + describe "error detection and marking" do + test "send failure marks path unhealthy and starts health-check" do + socket_path = "/tmp/statix_error_detect_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ErrorDetectServer) + :ok = GenServer.call(__MODULE__.ErrorDetectServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + TestStatix.increment("baseline") + assert_receive {:test_server, _, "baseline:1|c"}, 1000 + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("after_kill") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) + + File.rm(socket_path) + end + end + + describe "full reconnect cycle" do + test "health-check reconnects after server restart" do + socket_path = "/tmp/statix_reconnect_cycle_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ReconnectServer) + :ok = GenServer.call(__MODULE__.ReconnectServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + TestStatix.increment("before") + assert_receive {:test_server, _, "before:1|c"}, 1000 + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("during_outage") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) + + {:ok, server2} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.ReconnectServer2) + :ok = GenServer.call(__MODULE__.ReconnectServer2, {:set_current_test, self()}) + + # Wait for health-check to fire (first backoff is ~1 second with jitter) + Process.sleep(2_000) + + assert {:ok, _} = Statix.ConnTracker.get(socket_path) + + TestStatix.increment("after_reconnect") + assert_receive {:test_server, _, "after_reconnect:1|c"}, 1000 + + GenServer.stop(server2) + File.rm(socket_path) + end + end + + describe "backoff behavior" do + test "backoff index increments on repeated failure" do + socket_path = "/tmp/statix_backoff_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.BackoffServer) + :ok = GenServer.call(__MODULE__.BackoffServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + GenServer.stop(server) + Process.sleep(100) + + assert {:error, _} = TestStatix.increment("trigger") + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) + + # Wait for first health-check to fire and fail (~1s backoff) + Process.sleep(1_500) + + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) + + File.rm(socket_path) + end + + test "repeated failures while unhealthy do not crash" do + socket_path = "/tmp/statix_lost_count_#{:erlang.unique_integer([:positive])}.sock" + {:ok, server} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.LostCountServer) + :ok = GenServer.call(__MODULE__.LostCountServer, {:set_current_test, self()}) + + TestStatix.connect(socket_path: socket_path) + + GenServer.stop(server) + Process.sleep(100) + + for _ <- 1..10 do + TestStatix.increment("lost_metric") + end + + # Sync barrier: forces ConnTracker to process all prior casts + :sys.get_state(Statix.ConnTracker) + + assert {:error, :not_found} == Statix.ConnTracker.get(socket_path) + + File.rm(socket_path) + end + end +end diff --git a/test/statix/uds_test.exs b/test/statix/uds_test.exs index 8b92354..a3f93f6 100644 --- a/test/statix/uds_test.exs +++ b/test/statix/uds_test.exs @@ -104,8 +104,6 @@ defmodule Statix.UDSTest do end test "large packet over 1024 bytes via UDS maintains atomicity", _context do - # Create tags that will result in a packet > 1024 bytes - # Each tag is roughly 30 chars, so ~35 tags = ~1050 bytes total packet tags = for i <- 1..35 do "very_long_tag_name_#{i}:very_long_tag_value_#{i}" @@ -113,20 +111,16 @@ defmodule Statix.UDSTest do TestStatix.increment("sample.with.long.metric.name", 1, tags: tags) - # Verify we receive the complete packet atomically (all or nothing) assert_receive {:test_server, _, packet}, 1000 - # Verify packet structure is intact and complete assert packet =~ ~r/^sample\.with\.long\.metric\.name:1\|c\|#/ assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1") assert String.contains?(packet, "very_long_tag_name_35:very_long_tag_value_35") - # Verify packet size exceeds 1024 bytes assert byte_size(packet) > 1024 end test "very large packet over 4096 bytes via UDS maintains atomicity", _context do - # ~140 tags at 30 chars each = ~4200 bytes total tags = for i <- 1..140 do "very_long_tag_name_#{i}:very_long_tag_value_#{i}" @@ -141,7 +135,6 @@ defmodule Statix.UDSTest do assert String.contains?(packet, "very_long_tag_name_140:very_long_tag_value_140") assert byte_size(packet) > 4096 - # Verify atomicity: all 140 tags present (no truncation) tag_count = packet |> String.split(",") |> length() assert tag_count == 140 end @@ -153,11 +146,10 @@ defmodule Statix.UDSTest do assert length(connections) == 3 socket_refs = - for i <- 1..1000 do + for _i <- 1..1000 do {:ok, conn} = Statix.ConnTracker.get(context[:socket_path]) TestStatix.increment("pooled.metric") assert_receive {:test_server, _, _packet} - # Return the socket reference conn.sock end