diff --git a/backend/app/api/routes/stt_evaluations/evaluation.py b/backend/app/api/routes/stt_evaluations/evaluation.py index 8f255ed92..6b96f4414 100644 --- a/backend/app/api/routes/stt_evaluations/evaluation.py +++ b/backend/app/api/routes/stt_evaluations/evaluation.py @@ -7,7 +7,7 @@ from app.api.deps import AuthContextDep, SessionDep from app.api.permissions import Permission, require_permission -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_stt_batch_submission from app.core.cloud import get_cloud_storage from app.crud.stt_evaluations import ( create_stt_run, @@ -83,8 +83,7 @@ def start_stt_evaluation( # Offload batch submission (signed URLs, JSONL, Gemini upload) to Celery worker trace_id = correlation_id.get() or "N/A" try: - celery_task_id = start_low_priority_job( - function_path="app.services.stt_evaluations.batch_job.execute_batch_submission", + celery_task_id = start_stt_batch_submission( project_id=auth_context.project_.id, job_id=str(run.id), trace_id=trace_id, diff --git a/backend/app/api/routes/tts_evaluations/evaluation.py b/backend/app/api/routes/tts_evaluations/evaluation.py index 8c85f0296..6751d8219 100644 --- a/backend/app/api/routes/tts_evaluations/evaluation.py +++ b/backend/app/api/routes/tts_evaluations/evaluation.py @@ -7,7 +7,7 @@ from app.api.deps import AuthContextDep, SessionDep from app.api.permissions import Permission, require_permission -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_tts_batch_submission from app.core.cloud import get_cloud_storage from app.crud.tts_evaluations import ( create_tts_run, @@ -86,8 +86,7 @@ def start_tts_evaluation( # Offload batch submission (result creation, JSONL, Gemini upload) to Celery worker trace_id = correlation_id.get() or "N/A" try: - celery_task_id = start_low_priority_job( - function_path="app.services.tts_evaluations.batch_job.execute_batch_submission", + celery_task_id = start_tts_batch_submission( project_id=auth_context.project_.id, job_id=str(run.id), trace_id=trace_id, diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index ce126735c..463a1c982 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -41,18 +41,10 @@ Queue("cron", exchange=default_exchange, routing_key="cron"), Queue("default", exchange=default_exchange, routing_key="default"), ), - # Task routing + # Task routing — queue is set per-task via @celery_app.task(queue=...). + # Only cron tasks need an explicit override here. task_routes={ - "app.celery.tasks.job_execution.execute_high_priority_task": { - "queue": "high_priority", - "priority": 9, - }, - "app.celery.tasks.job_execution.execute_low_priority_task": { - "queue": "low_priority", - "priority": 1, - }, "app.celery.tasks.*_cron_*": {"queue": "cron"}, - "app.celery.tasks.*": {"queue": "default"}, }, task_default_queue="default", # Enable priority support @@ -93,5 +85,3 @@ broker_pool_limit=settings.CELERY_BROKER_POOL_LIMIT, ) -# Auto-discover tasks -# celery_app.autodiscover_tasks() diff --git a/backend/app/celery/tasks/job_execution.py b/backend/app/celery/tasks/job_execution.py index 58c961902..aaa763830 100644 --- a/backend/app/celery/tasks/job_execution.py +++ b/backend/app/celery/tasks/job_execution.py @@ -1,143 +1,153 @@ import logging -from collections.abc import Callable -from celery import current_task + from asgi_correlation_id import correlation_id +from celery import current_task from app.celery.celery_app import celery_app -import app.services.llm.jobs as _llm_jobs -import app.services.response.jobs as _response_jobs -import app.services.doctransform.job as _doctransform_job -import app.services.collections.create_collection as _create_collection -import app.services.collections.delete_collection as _delete_collection -import app.services.stt_evaluations.batch_job as _stt_batch_job -import app.services.stt_evaluations.metric_job as _stt_metric_job -import app.services.tts_evaluations.batch_job as _tts_batch_job -import app.services.tts_evaluations.batch_result_processing as _tts_result_processing logger = logging.getLogger(__name__) -# Hardcoded dispatch table — avoids dynamic importlib at task execution time. -# Imports above happen once in the main Celery process before worker forks, -# so all child workers inherit them via copy-on-write instead of each loading -# them independently (which was causing OOM with warmup_job_modules). -_FUNCTION_REGISTRY: dict[str, Callable] = { - "app.services.llm.jobs.execute_job": _llm_jobs.execute_job, - "app.services.llm.jobs.execute_chain_job": _llm_jobs.execute_chain_job, - "app.services.response.jobs.execute_job": _response_jobs.execute_job, - "app.services.doctransform.job.execute_job": _doctransform_job.execute_job, - "app.services.collections.create_collection.execute_job": _create_collection.execute_job, - "app.services.collections.delete_collection.execute_job": _delete_collection.execute_job, - "app.services.stt_evaluations.batch_job.execute_batch_submission": _stt_batch_job.execute_batch_submission, - "app.services.stt_evaluations.metric_job.execute_metric_computation": _stt_metric_job.execute_metric_computation, - "app.services.tts_evaluations.batch_job.execute_batch_submission": _tts_batch_job.execute_batch_submission, - "app.services.tts_evaluations.batch_result_processing.execute_tts_result_processing": _tts_result_processing.execute_tts_result_processing, -} - - -@celery_app.task(bind=True, queue="high_priority") -def execute_high_priority_task( - self, - function_path: str, - project_id: int, - job_id: str, - trace_id: str, - **kwargs, -): - """ - High priority Celery task to execute any job function. - Use this for urgent operations that need immediate processing. - - Args: - function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") - project_id: ID of the project executing the job - job_id: ID of the job (should already exist in database) - trace_id: Trace/correlation ID to preserve context across Celery tasks - **kwargs: Additional arguments to pass to the execute_job function - """ - return _execute_job_internal( - self, function_path, project_id, job_id, "high_priority", trace_id, **kwargs + +def _set_trace(trace_id: str) -> None: + correlation_id.set(trace_id) + logger.info(f"[_set_trace] Set correlation ID: {trace_id}") + + +@celery_app.task(bind=True, queue="high_priority", priority=9) +def run_llm_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.llm.jobs import execute_job + + _set_trace(trace_id) + return execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, ) -@celery_app.task(bind=True, queue="low_priority") -def execute_low_priority_task( - self, - function_path: str, - project_id: int, - job_id: str, - trace_id: str, - **kwargs, -): - """ - Low priority Celery task to execute any job function. - Use this for background operations that can wait. - - Args: - function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") - project_id: ID of the project executing the job - job_id: ID of the job (should already exist in database) - trace_id: Trace/correlation ID to preserve context across Celery tasks - **kwargs: Additional arguments to pass to the execute_job function - """ - return _execute_job_internal( - self, function_path, project_id, job_id, "low_priority", trace_id, **kwargs +@celery_app.task(bind=True, queue="high_priority", priority=9) +def run_llm_chain_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.llm.jobs import execute_chain_job + + _set_trace(trace_id) + return execute_chain_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, ) -def _execute_job_internal( - task_instance, - function_path: str, - project_id: int, - job_id: str, - priority: str, - trace_id: str, - **kwargs, -): - """ - Internal function to execute job logic for both priority levels. - - Args: - task_instance: Celery task instance (for progress updates, retries, etc.) - function_path: Import path to the execute_job function - project_id: ID of the project executing the job - job_id: ID of the job (should already exist in database) - priority: Priority level ("high_priority" or "low_priority") - trace_id: Trace/correlation ID to preserve context across Celery tasks - **kwargs: Additional arguments to pass to the execute_job function - """ - task_id = current_task.request.id +@celery_app.task(bind=True, queue="high_priority", priority=9) +def run_response_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.response.jobs import execute_job - correlation_id.set(trace_id) - logger.info(f"Set correlation ID context: {trace_id} for job {job_id}") - - try: - execute_function = _FUNCTION_REGISTRY.get(function_path) - if execute_function is None: - raise ValueError( - f"[_execute_job_internal] Unknown function path: {function_path}" - ) - - logger.info( - f"Executing {priority} job {job_id} (task {task_id}) using function {function_path}" - ) - - # Execute the business logic function with standardized parameters - result = execute_function( - project_id=project_id, - job_id=job_id, - task_id=task_id, - task_instance=task_instance, # For progress updates, retries if needed - **kwargs, - ) - - logger.info( - f"{priority.capitalize()} job {job_id} (task {task_id}) completed successfully" - ) - return result - - except Exception as exc: - logger.error( - f"{priority.capitalize()} job {job_id} (task {task_id}) failed: {exc}", - exc_info=True, - ) - raise + _set_trace(trace_id) + return execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_doctransform_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.doctransform.job import execute_job + + _set_trace(trace_id) + return execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_create_collection_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.collections.create_collection import execute_job + + _set_trace(trace_id) + return execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_delete_collection_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.collections.delete_collection import execute_job + + _set_trace(trace_id) + return execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_stt_batch_submission(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.stt_evaluations.batch_job import execute_batch_submission + + _set_trace(trace_id) + return execute_batch_submission( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_stt_metric_computation(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.stt_evaluations.metric_job import execute_metric_computation + + _set_trace(trace_id) + return execute_metric_computation( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_tts_batch_submission(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.tts_evaluations.batch_job import execute_batch_submission + + _set_trace(trace_id) + return execute_batch_submission( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) + + +@celery_app.task(bind=True, queue="low_priority", priority=1) +def run_tts_result_processing(self, project_id: int, job_id: str, trace_id: str, **kwargs): + from app.services.tts_evaluations.batch_result_processing import execute_tts_result_processing + + _set_trace(trace_id) + return execute_tts_result_processing( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ) diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index e500a5d63..3fd871724 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -3,7 +3,8 @@ Business logic modules can use these functions without knowing Celery internals. """ import logging -from typing import Any, Dict, Optional +from typing import Any, Dict + from celery.result import AsyncResult from app.celery.celery_app import celery_app @@ -11,76 +12,107 @@ logger = logging.getLogger(__name__) -def start_high_priority_job( - function_path: str, project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +def start_llm_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs ) -> str: - """ - Start a high priority job using Celery. - - Args: - function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") - project_id: ID of the project executing the job - job_id: ID of the job (should already exist in database) - trace_id: Trace/correlation ID to preserve context across Celery tasks - **kwargs: Additional arguments to pass to the execute_job function - - Returns: - Celery task ID (different from job_id) - """ - from app.celery.tasks.job_execution import execute_high_priority_task - - task = execute_high_priority_task.delay( - function_path=function_path, - project_id=project_id, - job_id=job_id, - trace_id=trace_id, - **kwargs, - ) - - logger.info(f"Started high priority job {job_id} with Celery task {task.id}") + from app.celery.tasks.job_execution import run_llm_job + + task = run_llm_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_llm_job] Started job {job_id} with Celery task {task.id}") return task.id -def start_low_priority_job( - function_path: str, project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +def start_llm_chain_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs ) -> str: - """ - Start a low priority job using Celery. - - Args: - function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") - project_id: ID of the project executing the job - job_id: ID of the job (should already exist in database) - trace_id: Trace/correlation ID to preserve context across Celery tasks - **kwargs: Additional arguments to pass to the execute_job function - - Returns: - Celery task ID (different from job_id) - """ - from app.celery.tasks.job_execution import execute_low_priority_task - - task = execute_low_priority_task.delay( - function_path=function_path, - project_id=project_id, - job_id=job_id, - trace_id=trace_id, - **kwargs, - ) - - logger.info(f"Started low priority job {job_id} with Celery task {task.id}") + from app.celery.tasks.job_execution import run_llm_chain_job + + task = run_llm_chain_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_llm_chain_job] Started job {job_id} with Celery task {task.id}") return task.id -def get_task_status(task_id: str) -> Dict[str, Any]: - """ - Get the status of a Celery task. +def start_response_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_response_job + + task = run_response_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_response_job] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_doctransform_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_doctransform_job + + task = run_doctransform_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_doctransform_job] Started job {job_id} with Celery task {task.id}") + return task.id - Args: - task_id: Celery task ID - Returns: - Dictionary with task status information - """ +def start_create_collection_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_create_collection_job + + task = run_create_collection_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_create_collection_job] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_delete_collection_job( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_delete_collection_job + + task = run_delete_collection_job.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_delete_collection_job] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_stt_batch_submission( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_stt_batch_submission + + task = run_stt_batch_submission.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_stt_batch_submission] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_stt_metric_computation( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_stt_metric_computation + + task = run_stt_metric_computation.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_stt_metric_computation] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_tts_batch_submission( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_tts_batch_submission + + task = run_tts_batch_submission.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_tts_batch_submission] Started job {job_id} with Celery task {task.id}") + return task.id + + +def start_tts_result_processing( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_tts_result_processing + + task = run_tts_result_processing.delay(project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs) + logger.info(f"[start_tts_result_processing] Started job {job_id} with Celery task {task.id}") + return task.id + + +def get_task_status(task_id: str) -> Dict[str, Any]: result = AsyncResult(task_id) return { "task_id": task_id, @@ -91,20 +123,10 @@ def get_task_status(task_id: str) -> Dict[str, Any]: def revoke_task(task_id: str, terminate: bool = False) -> bool: - """ - Revoke (cancel) a Celery task. - - Args: - task_id: Celery task ID - terminate: Whether to terminate the task if it's already running - - Returns: - True if task was revoked successfully - """ try: celery_app.control.revoke(task_id, terminate=terminate) - logger.info(f"Revoked task {task_id}") + logger.info(f"[revoke_task] Revoked task {task_id}") return True except Exception as e: - logger.error(f"Failed to revoke task {task_id}: {e}") + logger.error(f"[revoke_task] Failed to revoke task {task_id}: {e}") return False diff --git a/backend/app/core/config.py b/backend/app/core/config.py index a7cb7376a..05e28440c 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -109,7 +109,7 @@ def AWS_S3_BUCKET(self) -> str: # Celery Configuration CELERY_WORKER_CONCURRENCY: int | None = None - CELERY_WORKER_MAX_TASKS_PER_CHILD: int = 1000 + CELERY_WORKER_MAX_TASKS_PER_CHILD: int = 1 CELERY_WORKER_MAX_MEMORY_PER_CHILD: int = 200000 CELERY_TASK_SOFT_TIME_LIMIT: int = 300 CELERY_TASK_TIME_LIMIT: int = 600 diff --git a/backend/app/crud/stt_evaluations/cron.py b/backend/app/crud/stt_evaluations/cron.py index 0284639e1..feca397fb 100644 --- a/backend/app/crud/stt_evaluations/cron.py +++ b/backend/app/crud/stt_evaluations/cron.py @@ -13,7 +13,7 @@ from sqlmodel import Session -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_stt_metric_computation from app.core.batch import ( BATCH_KEY, GeminiBatchProvider, @@ -174,8 +174,7 @@ async def _on_batch_succeeded(batch_job: BatchJob, provider_name: str) -> bool: # Trigger automated metric computation (WER, CER, lenient WER, WIP) if result.any_succeeded: try: - celery_task_id = start_low_priority_job( - function_path="app.services.stt_evaluations.metric_job.execute_metric_computation", + celery_task_id = start_stt_metric_computation( project_id=run.project_id, job_id=str(run.id), organization_id=run.organization_id, diff --git a/backend/app/crud/tts_evaluations/cron.py b/backend/app/crud/tts_evaluations/cron.py index 62c6a4faa..2f3c67aaf 100644 --- a/backend/app/crud/tts_evaluations/cron.py +++ b/backend/app/crud/tts_evaluations/cron.py @@ -13,7 +13,7 @@ from sqlmodel import Session -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_tts_result_processing from app.core.batch import GeminiBatchProvider from app.crud.evaluations.cron_utils import ( get_batch_jobs_for_run, @@ -33,11 +33,6 @@ logger = logging.getLogger(__name__) -# Function path for Celery task dispatch -_TTS_RESULT_PROCESSING_PATH = ( - "app.services.tts_evaluations.batch_result_processing.execute_tts_result_processing" -) - async def poll_all_pending_tts_evaluations( session: Session, @@ -77,8 +72,7 @@ def _dispatch_tts_result_processing( Returns: str: Celery task ID """ - celery_task_id = start_low_priority_job( - function_path=_TTS_RESULT_PROCESSING_PATH, + celery_task_id = start_tts_result_processing( project_id=run.project_id, job_id=str(batch_job.id), organization_id=org_id, diff --git a/backend/app/services/collections/create_collection.py b/backend/app/services/collections/create_collection.py index dd4016616..b69154ac4 100644 --- a/backend/app/services/collections/create_collection.py +++ b/backend/app/services/collections/create_collection.py @@ -27,7 +27,7 @@ to_collection_public, ) from app.services.collections.providers.registry import get_llm_provider -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_create_collection_job from app.utils import send_callback, APIResponse @@ -49,8 +49,7 @@ def start_job( collection_job_id, CollectionJobUpdate(trace_id=trace_id) ) - task_id = start_low_priority_job( - function_path="app.services.collections.create_collection.execute_job", + task_id = start_create_collection_job( project_id=project_id, job_id=str(collection_job_id), trace_id=trace_id, diff --git a/backend/app/services/collections/delete_collection.py b/backend/app/services/collections/delete_collection.py index db02a3538..8d10cc119 100644 --- a/backend/app/services/collections/delete_collection.py +++ b/backend/app/services/collections/delete_collection.py @@ -16,7 +16,7 @@ from app.models.collection import DeletionRequest from app.services.collections.helpers import extract_error_message from app.services.collections.providers.registry import get_llm_provider -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_delete_collection_job from app.utils import send_callback, APIResponse @@ -37,12 +37,11 @@ def start_job( collection_job_id, CollectionJobUpdate(trace_id=trace_id) ) - task_id = start_low_priority_job( - function_path="app.services.collections.delete_collection.execute_job", + task_id = start_delete_collection_job( project_id=project_id, job_id=str(collection_job_id), - collection_id=str(request.collection_id), trace_id=trace_id, + collection_id=str(request.collection_id), request=request.model_dump(mode="json"), organization_id=organization_id, ) diff --git a/backend/app/services/doctransform/job.py b/backend/app/services/doctransform/job.py index 3018ffc4b..cdbe295f8 100644 --- a/backend/app/services/doctransform/job.py +++ b/backend/app/services/doctransform/job.py @@ -22,7 +22,7 @@ DocTransformationJob, ) from app.core.cloud import get_cloud_storage -from app.celery.utils import start_low_priority_job +from app.celery.utils import start_doctransform_job from app.utils import send_callback, APIResponse from app.services.doctransform.registry import convert_document, FORMAT_TO_EXTENSION from app.core.db import engine @@ -43,12 +43,11 @@ def start_job( job_crud.update(job_id, DocTransformJobUpdate(trace_id=trace_id)) job = job_crud.read_one(job_id) - task_id = start_low_priority_job( - function_path="app.services.doctransform.job.execute_job", + task_id = start_doctransform_job( project_id=project_id, job_id=str(job.id), - source_document_id=str(job.source_document_id), trace_id=trace_id, + source_document_id=str(job.source_document_id), transformer_name=transformer_name, target_format=target_format, callback_url=callback_url, diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index b28c01cdc..0e2a983b0 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -7,7 +7,7 @@ from fastapi import HTTPException from sqlmodel import Session -from app.celery.utils import start_high_priority_job +from app.celery.utils import start_llm_chain_job, start_llm_job from app.core.db import engine from app.core.langfuse.langfuse import observe_llm_execution from app.crud.config import ConfigVersionCrud @@ -57,8 +57,7 @@ def start_job( ) try: - task_id = start_high_priority_job( - function_path="app.services.llm.jobs.execute_job", + task_id = start_llm_job( project_id=project_id, job_id=str(job.id), trace_id=trace_id, @@ -99,8 +98,7 @@ def start_chain_job( ) try: - task_id = start_high_priority_job( - function_path="app.services.llm.jobs.execute_chain_job", + task_id = start_llm_chain_job( project_id=project_id, job_id=str(job.id), trace_id=trace_id, diff --git a/backend/app/services/response/jobs.py b/backend/app/services/response/jobs.py index cab0f0e83..405243498 100644 --- a/backend/app/services/response/jobs.py +++ b/backend/app/services/response/jobs.py @@ -5,7 +5,7 @@ from asgi_correlation_id import correlation_id from app.crud import JobCrud from app.models import JobType, JobStatus, JobUpdate, ResponsesAPIRequest -from app.celery.utils import start_high_priority_job +from app.celery.utils import start_response_job from app.services.response.response import process_response from app.services.response.callbacks import send_response_callback @@ -22,8 +22,7 @@ def start_job( job = job_crud.create(job_type=JobType.RESPONSE, trace_id=trace_id) try: - task_id = start_high_priority_job( - function_path="app.services.response.jobs.execute_job", + task_id = start_response_job( project_id=project_id, job_id=str(job.id), trace_id=trace_id,