From 17704ba1b5a5273a842de6ee61d089c88c6a6334 Mon Sep 17 00:00:00 2001 From: Matt McKenna Date: Fri, 30 Jan 2026 11:37:15 -0500 Subject: [PATCH 1/2] Add --json flag to tq CLI for programmatic queue inspection Enable agents and status lines to read task queue state by adding structured JSON output to list, logs, and clear commands. The command field is now stored in the queue table and included in JSON output, allowing consumers to see what's actually running (e.g., "./gradlew build"). Changes: - Add --json flag to tq list, logs, and clear subcommands - Add command column to queue schema with migration for existing DBs - Add TestJsonSchemaContracts test class to enforce JSON structure stability - JSON mode for clear skips interactive confirmation (implies --force) Co-Authored-By: Claude Opus 4.5 --- queue_core.py | 17 +++- tests/test_tq_cli.py | 226 +++++++++++++++++++++++++++++++++++++++++++ tq.py | 100 ++++++++++++++++--- 3 files changed, 322 insertions(+), 21 deletions(-) diff --git a/queue_core.py b/queue_core.py index ab1c5e9..a846e18 100644 --- a/queue_core.py +++ b/queue_core.py @@ -52,6 +52,7 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": pid INTEGER, server_id TEXT, child_pid INTEGER, + command TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -62,6 +63,11 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": ALTER TABLE queue ADD COLUMN server_id TEXT """ +# Migration to add command column to existing databases +QUEUE_MIGRATION_COMMAND = """ +ALTER TABLE queue ADD COLUMN command TEXT +""" + QUEUE_INDEX = """ CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(queue_name, status) """ @@ -91,11 +97,12 @@ def init_db(paths: QueuePaths): with get_db(paths.db_path) as conn: conn.execute(QUEUE_SCHEMA) conn.execute(QUEUE_INDEX) - # Run migration for existing databases without server_id column - try: - conn.execute(QUEUE_MIGRATION_SERVER_ID) - except sqlite3.OperationalError: - pass # Column already exists + # Run migrations for existing databases + for migration in [QUEUE_MIGRATION_SERVER_ID, QUEUE_MIGRATION_COMMAND]: + try: + conn.execute(migration) + except sqlite3.OperationalError: + pass # Column already exists def ensure_db(paths: QueuePaths): diff --git a/tests/test_tq_cli.py b/tests/test_tq_cli.py index 7733c40..98a497e 100644 --- a/tests/test_tq_cli.py +++ b/tests/test_tq_cli.py @@ -4,9 +4,12 @@ """ import json +import os +import signal import subprocess import sys import tempfile +import time from pathlib import Path import pytest @@ -153,6 +156,173 @@ def test_list_no_database(self, temp_data_dir): assert result.returncode == 0 assert "No queue database" in result.stdout or "empty" in result.stdout.lower() + def test_list_json_empty_queue(self, temp_data_dir): + """Test list --json command with empty queue.""" + result = run_tq("list", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == { + "tasks": [], + "summary": {"total": 0, "running": 0, "waiting": 0} + } + + def test_list_json_no_database(self, temp_data_dir): + """Test list --json command when database doesn't exist.""" + result = run_tq("list", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output["tasks"] == [] + assert output["summary"]["total"] == 0 + + +class TestJsonSchemaContracts: + """ + Schema contract tests for JSON output. + + These tests ensure the JSON structure remains stable for programmatic consumers + (e.g., Claude Code status lines). Any changes to these schemas should be + intentional and backward-compatible. + """ + + # Expected fields for each schema - used to enforce contracts + LIST_REQUIRED_KEYS = {"tasks", "summary"} + LIST_SUMMARY_REQUIRED_KEYS = {"total", "running", "waiting"} + LIST_TASK_REQUIRED_KEYS = {"id", "queue_name", "status", "command", "pid", "child_pid", "created_at", "updated_at"} + + LOGS_REQUIRED_KEYS = {"entries"} + LOGS_ENTRY_REQUIRED_KEYS = {"event", "timestamp"} # Base keys all entries must have + + CLEAR_REQUIRED_KEYS = {"cleared", "success"} + + def test_list_json_schema_empty(self, temp_data_dir): + """Verify list --json schema structure when empty.""" + result = run_tq("list", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + # Top-level keys + assert set(output.keys()) == self.LIST_REQUIRED_KEYS, \ + f"list --json must have exactly keys {self.LIST_REQUIRED_KEYS}" + + # Summary keys + assert set(output["summary"].keys()) == self.LIST_SUMMARY_REQUIRED_KEYS, \ + f"list --json summary must have exactly keys {self.LIST_SUMMARY_REQUIRED_KEYS}" + + # Type checks + assert isinstance(output["tasks"], list) + assert isinstance(output["summary"]["total"], int) + assert isinstance(output["summary"]["running"], int) + assert isinstance(output["summary"]["waiting"], int) + + def test_list_json_schema_with_running_task(self, temp_data_dir): + """Verify list --json task schema with an active task.""" + # Start a long-running task + proc = subprocess.Popen( + [sys.executable, str(TQ_PATH), f"--data-dir={temp_data_dir}", "sleep", "30"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + ) + + try: + # Wait for it to start + time.sleep(0.5) + + result = run_tq("list", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + # Verify structure + assert set(output.keys()) == self.LIST_REQUIRED_KEYS + assert len(output["tasks"]) >= 1 + + # Verify task object schema + task = output["tasks"][0] + assert set(task.keys()) == self.LIST_TASK_REQUIRED_KEYS, \ + f"Task object must have exactly keys {self.LIST_TASK_REQUIRED_KEYS}, got {set(task.keys())}" + + # Verify task field types + assert isinstance(task["id"], int) + assert isinstance(task["queue_name"], str) + assert task["status"] in ("running", "waiting") + assert task["command"] is None or isinstance(task["command"], str) + assert task["pid"] is None or isinstance(task["pid"], int) + assert task["child_pid"] is None or isinstance(task["child_pid"], int) + assert task["created_at"] is None or isinstance(task["created_at"], str) + assert task["updated_at"] is None or isinstance(task["updated_at"], str) + + # Verify command is populated for the running task + assert task["command"] == "sleep 30", f"Expected command 'sleep 30', got {task['command']}" + + # Verify summary counts are accurate + assert output["summary"]["total"] == len(output["tasks"]) + running_count = sum(1 for t in output["tasks"] if t["status"] == "running") + waiting_count = sum(1 for t in output["tasks"] if t["status"] == "waiting") + assert output["summary"]["running"] == running_count + assert output["summary"]["waiting"] == waiting_count + + finally: + # Clean up + try: + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + except Exception: + proc.terminate() + proc.wait(timeout=5) + + def test_logs_json_schema_empty(self, temp_data_dir): + """Verify logs --json schema structure when empty.""" + result = run_tq("logs", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.LOGS_REQUIRED_KEYS, \ + f"logs --json must have exactly keys {self.LOGS_REQUIRED_KEYS}" + assert isinstance(output["entries"], list) + + def test_logs_json_schema_with_entries(self, temp_data_dir): + """Verify logs --json entry schema with actual log entries.""" + # Generate some logs + run_tq("echo", "test", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.LOGS_REQUIRED_KEYS + assert len(output["entries"]) >= 3 # queued, started, completed + + # Verify each entry has required base keys + for entry in output["entries"]: + assert self.LOGS_ENTRY_REQUIRED_KEYS.issubset(set(entry.keys())), \ + f"Log entry must have at least keys {self.LOGS_ENTRY_REQUIRED_KEYS}, got {set(entry.keys())}" + assert isinstance(entry["event"], str) + assert isinstance(entry["timestamp"], str) + + # Verify specific event schemas + for entry in output["entries"]: + if entry["event"] == "task_queued": + assert "task_id" in entry + assert "queue_name" in entry + elif entry["event"] == "task_started": + assert "task_id" in entry + assert "queue_name" in entry + assert "wait_time_seconds" in entry + elif entry["event"] == "task_completed": + assert "task_id" in entry + assert "queue_name" in entry + assert "exit_code" in entry + assert "duration_seconds" in entry + + def test_clear_json_schema(self, temp_data_dir): + """Verify clear --json schema structure.""" + result = run_tq("clear", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.CLEAR_REQUIRED_KEYS, \ + f"clear --json must have exactly keys {self.CLEAR_REQUIRED_KEYS}" + assert isinstance(output["cleared"], int) + assert isinstance(output["success"], bool) + assert output["cleared"] >= 0 + assert output["success"] is True + class TestTqLogs: """Tests for the tq logs command.""" @@ -189,6 +359,43 @@ def test_logs_n_option(self, temp_data_dir): lines = [line for line in result.stdout.strip().split("\n") if line] assert len(lines) == 3 + def test_logs_json_no_file(self, temp_data_dir): + """Test logs --json command when no log file exists.""" + result = run_tq("logs", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"entries": []} + + def test_logs_json_shows_activity(self, temp_data_dir): + """Test logs --json command shows task activity.""" + # Run a task first to generate logs + run_tq("echo", "test", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert "entries" in output + assert len(output["entries"]) >= 3 # queued, started, completed + + events = [e["event"] for e in output["entries"]] + assert "task_queued" in events + assert "task_started" in events + assert "task_completed" in events + + def test_logs_json_n_option(self, temp_data_dir): + """Test logs --json -n option to limit entries.""" + # Run multiple tasks + for i in range(5): + run_tq("echo", f"test {i}", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", "-n", "3", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert len(output["entries"]) == 3 + class TestTqClear: """Tests for the tq clear command.""" @@ -203,6 +410,25 @@ def test_clear_empty_queue(self, temp_data_dir): assert result.returncode == 0 assert "already empty" in result.stdout.lower() + def test_clear_json_empty_queue(self, temp_data_dir): + """Test clear --json command with empty queue.""" + # Initialize database by running a task that completes + run_tq("echo", "init", data_dir=temp_data_dir) + + result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"cleared": 0, "success": True} + + def test_clear_json_no_database(self, temp_data_dir): + """Test clear --json command when no database exists.""" + result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"cleared": 0, "success": True} + class TestTqHelp: """Tests for help output.""" diff --git a/tq.py b/tq.py index 28a95c9..48de3bd 100644 --- a/tq.py +++ b/tq.py @@ -51,10 +51,14 @@ def get_paths(args) -> QueuePaths: def cmd_list(args): """List all tasks in the queue.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.db_path.exists(): - print(f"No queue database found at {paths.db_path}") - print("Queue is empty (no tasks have been run yet)") + if json_output: + print(json.dumps({"tasks": [], "summary": {"total": 0, "running": 0, "waiting": 0}})) + else: + print(f"No queue database found at {paths.db_path}") + print("Queue is empty (no tasks have been run yet)") return conn = sqlite3.connect(paths.db_path, timeout=5.0) @@ -65,6 +69,38 @@ def cmd_list(args): "SELECT * FROM queue ORDER BY queue_name, id" ).fetchall() + if json_output: + tasks = [] + running_count = 0 + waiting_count = 0 + for row in rows: + task = { + "id": row["id"], + "queue_name": row["queue_name"], + "status": row["status"], + "command": row["command"] if "command" in row.keys() else None, + "pid": row["pid"], + "child_pid": row["child_pid"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + tasks.append(task) + if row["status"] == "running": + running_count += 1 + elif row["status"] == "waiting": + waiting_count += 1 + + output = { + "tasks": tasks, + "summary": { + "total": len(tasks), + "running": running_count, + "waiting": waiting_count, + }, + } + print(json.dumps(output)) + return + if not rows: print("Queue is empty") return @@ -106,9 +142,13 @@ def cmd_list(args): def cmd_clear(args): """Clear all tasks from the queue.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.db_path.exists(): - print("No queue database found") + if json_output: + print(json.dumps({"cleared": 0, "success": True})) + else: + print("No queue database found") return conn = sqlite3.connect(paths.db_path, timeout=5.0) @@ -116,17 +156,26 @@ def cmd_clear(args): # Check how many tasks exist count = conn.execute("SELECT COUNT(*) FROM queue").fetchone()[0] if count == 0: - print("Queue is already empty") + if json_output: + print(json.dumps({"cleared": 0, "success": True})) + else: + print("Queue is already empty") return - response = input(f"Clear {count} task(s) from queue? [y/N] ") - if response.lower() != 'y': - print("Cancelled") - return + # JSON mode skips confirmation (implies --force) + if not json_output: + response = input(f"Clear {count} task(s) from queue? [y/N] ") + if response.lower() != 'y': + print("Cancelled") + return cursor = conn.execute("DELETE FROM queue") conn.commit() - print(f"Cleared {cursor.rowcount} task(s) from queue") + + if json_output: + print(json.dumps({"cleared": cursor.rowcount, "success": True})) + else: + print(f"Cleared {cursor.rowcount} task(s) from queue") finally: conn.close() @@ -134,14 +183,30 @@ def cmd_clear(args): def cmd_logs(args): """Show recent log entries.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.metrics_path.exists(): - print(f"No log file found at {paths.metrics_path}") + if json_output: + print(json.dumps({"entries": []})) + else: + print(f"No log file found at {paths.metrics_path}") return lines = paths.metrics_path.read_text().strip().split("\n") recent = lines[-args.n:] if len(lines) > args.n else lines + if json_output: + entries = [] + for line in recent: + try: + entry = json.loads(line) + entries.append(entry) + except json.JSONDecodeError: + # Skip malformed lines in JSON mode + pass + print(json.dumps({"entries": entries})) + return + for line in recent: try: entry = json.loads(line) @@ -214,13 +279,13 @@ def cleanup_queue(conn, queue_name: str, paths: QueuePaths): print(f"[tq] WARNING: Cleared task from old CLI instance (ID: {task['id']}, old_instance: {task['server_id']})") -def register_task(conn, queue_name: str, paths: QueuePaths) -> int: +def register_task(conn, queue_name: str, paths: QueuePaths, command: str = None) -> int: """Register a task in the queue. Returns task_id immediately.""" my_pid = os.getpid() cursor = conn.execute( - "INSERT INTO queue (queue_name, status, pid, server_id) VALUES (?, ?, ?, ?)", - (queue_name, "waiting", my_pid, CLI_INSTANCE_ID), + "INSERT INTO queue (queue_name, status, pid, server_id, command) VALUES (?, ?, ?, ?, ?)", + (queue_name, "waiting", my_pid, CLI_INSTANCE_ID, command), ) conn.commit() task_id = cursor.lastrowid @@ -364,7 +429,7 @@ def cleanup_handler(signum, frame): cleanup_queue(conn, queue_name, paths) # Register task first so task_id is available for cleanup if interrupted - task_id = register_task(conn, queue_name, paths) + task_id = register_task(conn, queue_name, paths, command=command) wait_for_turn(conn, queue_name, task_id, paths) print(f"[tq] Running: {command}") @@ -477,14 +542,17 @@ def main(): run_parser.add_argument("run_command", nargs=argparse.REMAINDER, metavar="COMMAND", help="Command to run") # list - subparsers.add_parser("list", help="List tasks in queue") + list_parser = subparsers.add_parser("list", help="List tasks in queue") + list_parser.add_argument("--json", action="store_true", help="Output in JSON format") # clear - subparsers.add_parser("clear", help="Clear all tasks from queue") + clear_parser = subparsers.add_parser("clear", help="Clear all tasks from queue") + clear_parser.add_argument("--json", action="store_true", help="Output in JSON format (implies --force)") # logs logs_parser = subparsers.add_parser("logs", help="Show recent log entries") logs_parser.add_argument("-n", type=int, default=20, help="Number of entries (default: 20)") + logs_parser.add_argument("--json", action="store_true", help="Output in JSON format") # Handle implicit run: tq ./gradlew build -> tq run ./gradlew build # Pre-process argv to insert 'run' if needed From f0aad988841bfbf52cc350108afbffea1dea955b Mon Sep 17 00:00:00 2001 From: Matt McKenna Date: Fri, 30 Jan 2026 12:09:09 -0500 Subject: [PATCH 2/2] Address PR review feedback - Fix --json help text: say "skip confirmation" instead of "implies --force" - Fix test_list_json_empty_queue to actually test DB-exists-but-empty case Co-Authored-By: Claude Opus 4.5 --- tests/test_tq_cli.py | 5 ++++- tq.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_tq_cli.py b/tests/test_tq_cli.py index 98a497e..15c0290 100644 --- a/tests/test_tq_cli.py +++ b/tests/test_tq_cli.py @@ -157,7 +157,10 @@ def test_list_no_database(self, temp_data_dir): assert "No queue database" in result.stdout or "empty" in result.stdout.lower() def test_list_json_empty_queue(self, temp_data_dir): - """Test list --json command with empty queue.""" + """Test list --json command with DB exists but queue is empty.""" + # Initialize DB by running a task that completes + run_tq("echo", "init", data_dir=temp_data_dir) + result = run_tq("list", "--json", data_dir=temp_data_dir) assert result.returncode == 0 diff --git a/tq.py b/tq.py index 48de3bd..9ab1e50 100644 --- a/tq.py +++ b/tq.py @@ -547,7 +547,7 @@ def main(): # clear clear_parser = subparsers.add_parser("clear", help="Clear all tasks from queue") - clear_parser.add_argument("--json", action="store_true", help="Output in JSON format (implies --force)") + clear_parser.add_argument("--json", action="store_true", help="Output in JSON format and skip confirmation") # logs logs_parser = subparsers.add_parser("logs", help="Show recent log entries")