Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.conda.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
drmaa
hdf5plugin
ispyb>=11.1.0
ispyb>=11.1.2
junit-xml>=1.9
marshmallow-sqlalchemy
minio>=7.1.0
Expand Down
16 changes: 16 additions & 0 deletions src/dlstbx/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,22 @@ def get_ssx_events_for_dcid(
return query.all()


def get_app_id_for_scaling_id(
session: sqlalchemy.orm.session.Session,
scaling_id: int,
) -> int:
query = (
session.query(models.AutoProc.autoProcProgramId)
.join(
models.AutoProcScaling,
models.AutoProcScaling.autoProcId == models.AutoProc.autoProcId,
)
.filter(models.AutoProcScaling.autoProcScalingId == scaling_id)
)
result = query.first()
return result.autoProcProgramId


def insert_xray_centring(
xrc: schemas.XrayCentring,
session: sqlalchemy.orm.session.Session,
Expand Down
40 changes: 25 additions & 15 deletions src/dlstbx/services/ispybsvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os.path
import pathlib
import time
from datetime import datetime
from typing import List

import ispyb.sqlalchemy
Expand Down Expand Up @@ -323,11 +324,21 @@ def do_store_dimple_failure(self, parameters, **kwargs):
)
return False

def do_register_processing(self, parameters, **kwargs):
def do_register_processing(
self, parameters, session: sqlalchemy.orm.Session, **kwargs
):
program = parameters("program")
cmdline = parameters("cmdline")
environment = parameters("environment") or ""
environment = parameters("environment") or {}
upstream_source = parameters("upstream_source") or ""
scaling_id = parameters("scaling_id") or environment.get("scaling_id")
if isinstance(scaling_id, list):
scaling_id = scaling_id[0]
parent_appid = (
crud.get_app_id_for_scaling_id(session, int(scaling_id))
if scaling_id
else None
)
processingpipelineid = self.get_pipeline_id(program, upstream_source)
if isinstance(environment, dict):
environment = ", ".join(
Expand All @@ -339,21 +350,20 @@ def do_register_processing(self, parameters, **kwargs):
self.log.error("Invalid processing id '%s'", rpid)
return False
try:
result = self.ispyb.mx_processing.upsert_program_ex(
job_id=rpid,
name=program,
command=cmdline,
environment=environment,
pipeline_id=processingpipelineid,
new_app = ispyb.sqlalchemy.AutoProcProgram(
processingJobId=rpid,
processingPrograms=program,
processingCommandLine=cmdline,
processingEnvironment=environment,
processingPipelineId=processingpipelineid,
parentAutoProcProgramId=parent_appid,
recordTimeStamp=datetime.now(),
)
session.add(new_app)
session.commit()
result = new_app.autoProcProgramId
self.log.info(
"Registered new program '%s' for processing id '%s' with command line '%s' and environment '%s' and pipeline id '%s' with result '%s'.",
program,
rpid,
cmdline,
environment,
processingpipelineid,
result,
f"Registered new program '{program}' for processing id '{rpid}' with command line '{cmdline}' and environment '{environment}', pipeline id '{processingpipelineid}' and parent program id '{parent_appid}' with result '{result}'.",
)
return {"success": True, "return_value": result}
except ispyb.ISPyBException as e:
Expand Down