Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/dlstbx/services/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,15 @@ class MultiplexParameters(pydantic.BaseModel):
spacegroup: Optional[str] = None
automatic: Optional[bool] = False
comment: Optional[str] = None
backoff_delay: float = pydantic.Field(default=8, alias="backoff-delay")
backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try")
backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier")
backoff_delay: Dict[str, float] = pydantic.Field(
default={"default": 8}, alias="backoff-delay"
)
backoff_max_try: Dict[str, int] = pydantic.Field(
default={"default": 10}, alias="backoff-max-try"
)
backoff_multiplier: Dict[str, float] = pydantic.Field(
default={"default": 2}, alias="backoff-multiplier"
)
wavelength_tolerance: float = pydantic.Field(default=1e-4, ge=0)
diffraction_plan_info: Optional[DiffractionPlanInfo] = None
recipe: Optional[str] = None
Expand Down Expand Up @@ -1785,9 +1791,9 @@ def trigger_multiplex(
"name": "sample_bar",
}
],
"backoff-delay": 8, # default
"backoff-max-try": 10, # default
"backoff-multiplier": 2, # default
"backoff-delay": {'default': 8}
"backoff-max-try": {'default': 10, 'i02-1': 7}
"backoff-multiplier": {'default': 2}
}
"""
dcid = parameters.dcid
Expand Down Expand Up @@ -1870,14 +1876,26 @@ def trigger_multiplex(
# Calculate message delay for exponential backoff in case a processing
# program for a related data collection is still running, in which case
# we checkpoint with the calculated message delay
backoff_delay = parameters.backoff_delay.get(
parameters.beamline, parameters.backoff_delay["default"]
)
backoff_multiplier = parameters.backoff_multiplier.get(
parameters.beamline, parameters.backoff_multiplier["default"]
)
backoff_max_try = parameters.backoff_max_try.get(
parameters.beamline, parameters.backoff_max_try["default"]
)

self.log.debug(
f"Using backoff parameters: delay {backoff_delay}, multiplier {backoff_multiplier}, max_try {backoff_max_try}"
)

status = {
"ntry": 0,
}
if isinstance(message, dict):
status.update(message.get("trigger-status", {}))
message_delay = int(
parameters.backoff_delay * parameters.backoff_multiplier ** status["ntry"]
)
message_delay = int(backoff_delay * backoff_multiplier ** status["ntry"])
status["ntry"] += 1
self.log.debug(f"dcid={dcid}\nmessage_delay={message_delay}\n{status}")

Expand Down Expand Up @@ -1934,7 +1952,7 @@ def trigger_multiplex(
row.AutoProcProgram.autoProcProgramId
for row in waiting_processing_jobs
]
if status["ntry"] >= parameters.backoff_max_try:
if status["ntry"] >= backoff_max_try:
# Give up waiting for this program to finish and trigger
# multiplex with remaining related results are available
self.log.info(
Expand Down