Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
185 commits
Select commit Hold shift + click to select a range
a0d72ac
workflow: regain changes
mightqxc May 20, 2025
f79ba0b
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc May 21, 2025
abadf0e
workflow: temp func
mightqxc May 21, 2025
b2b3925
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Jun 2, 2025
f8cda98
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Jul 28, 2025
c4dcd68
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Jul 28, 2025
1206334
preliminary workflow core
mightqxc Jul 29, 2025
c3d78a1
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 4, 2025
95ea64b
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 5, 2025
17d8128
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 6, 2025
b0dd610
workflow: preliminary dbproxy methods
mightqxc Aug 12, 2025
53d6126
fix
mightqxc Aug 12, 2025
25a59db
workflow: add test register
mightqxc Aug 13, 2025
730f36c
workflow: update taskbuffer
mightqxc Aug 13, 2025
7bc10bd
workflow: add insert functions
mightqxc Aug 13, 2025
2976f4a
fixes
mightqxc Aug 13, 2025
204a61a
workflow: add process registered workflow
mightqxc Aug 15, 2025
da06be0
fix
mightqxc Aug 15, 2025
4edf1f7
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 19, 2025
2cb0860
workflow: update schema, preliminary handlers
mightqxc Aug 25, 2025
4927322
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 25, 2025
3a9b210
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 25, 2025
0974cc9
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 26, 2025
7d90a5e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Aug 29, 2025
ffc8f03
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 1, 2025
ef30129
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 2, 2025
574507e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 3, 2025
51f5591
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 8, 2025
222bab5
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 16, 2025
0f01c99
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 18, 2025
f35473a
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 22, 2025
0f22282
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 22, 2025
84693b7
add workflow api
mightqxc Sep 23, 2025
7d22bdd
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 24, 2025
4ad2ba5
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 24, 2025
34db560
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 29, 2025
b958b53
workflows4: allow to submit raw request
mightqxc Sep 29, 2025
17d333e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Sep 30, 2025
deefcc3
workflows4: parse workflow
mightqxc Oct 1, 2025
4c253af
workflows4: fix
mightqxc Oct 2, 2025
e013b08
workflows4: parsed to starting
mightqxc Oct 7, 2025
05b68ee
workflows4: workflow starting
mightqxc Oct 8, 2025
017f9e5
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 9, 2025
26c9fe6
fix
mightqxc Oct 9, 2025
3616531
fix
mightqxc Oct 9, 2025
418968d
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 10, 2025
d22b754
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 13, 2025
66aba52
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 13, 2025
16bccf7
workflows4: step submit target
mightqxc Oct 13, 2025
24d9029
workflows4: fix submit task
mightqxc Oct 14, 2025
78c9b04
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 14, 2025
937fb57
workflows4: reuse plugin instance
mightqxc Oct 14, 2025
f23c17b
workflows4: add check step and result obj
mightqxc Oct 14, 2025
23df71e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 15, 2025
0296e16
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 16, 2025
bf69565
workflows4: process running workflow
mightqxc Oct 16, 2025
cb26be4
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 20, 2025
e52b64a
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 20, 2025
4df06c1
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 21, 2025
858fa64
workflows4: more status transition, add ddm data handler
mightqxc Oct 23, 2025
06c05fc
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 23, 2025
09ff0fb
workflows4: add data process methods and ddm_collection data handler
mightqxc Oct 27, 2025
7a48337
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 27, 2025
e25637f
workflows4: add native workflow watchdog
mightqxc Oct 27, 2025
f954b66
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 27, 2025
fa60a58
workflows4: fix
mightqxc Oct 27, 2025
9070f7b
workflows4: fix
mightqxc Oct 27, 2025
189f08a
workflows4: fix
mightqxc Oct 27, 2025
3065119
workflows4: fix data handler and data check status
mightqxc Oct 28, 2025
ec247ee
workflows4: fix mid data
mightqxc Oct 28, 2025
119b01f
workflows4: fix
mightqxc Oct 28, 2025
0796dc9
workflows4: fix, add data step_source_id
mightqxc Oct 28, 2025
c65995f
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 30, 2025
2f9423c
workflows4: pretty
mightqxc Oct 30, 2025
10dfc8e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Oct 30, 2025
528a75c
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 3, 2025
db321c2
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 3, 2025
c6a0379
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 4, 2025
131a0f6
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 5, 2025
5de0932
workflows4: fix by test
mightqxc Nov 5, 2025
afc6e93
workflows4: data target suffix about output_types
mightqxc Nov 5, 2025
9f14ecc
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 6, 2025
a200c8c
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 7, 2025
d59339b
workflows4: fix data status
mightqxc Nov 7, 2025
2152dca
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 7, 2025
c45228d
workflows4: add panda task data handler, rename data status
mightqxc Nov 10, 2025
9678ec4
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 10, 2025
7fe46da
workflows4: fixes
mightqxc Nov 10, 2025
b74c6ee
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 10, 2025
602c3a7
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 11, 2025
1f29d5a
workflows4: fix
mightqxc Nov 11, 2025
7b056f4
workflows4: fixes
mightqxc Nov 11, 2025
7630d70
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 12, 2025
9bd6f8e
workflows4: fix about data status
mightqxc Nov 12, 2025
393c9c3
workflows4: fix step start_time and end_time
mightqxc Nov 12, 2025
4ee9eaf
workflows4: rename step status submitted to starting
mightqxc Nov 13, 2025
01c06ad
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 13, 2025
421900f
workflows4: log
mightqxc Nov 15, 2025
c16e559
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 17, 2025
088d504
workflows4: check step output during checking
mightqxc Nov 17, 2025
dd5617d
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 17, 2025
7faa455
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 17, 2025
6dc443d
workflows4: introduce data status binding
mightqxc Nov 18, 2025
5faeffc
workflows4: fix binding
mightqxc Nov 18, 2025
acb96f3
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 18, 2025
e145f56
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 18, 2025
6281fbc
workflows4: rename statuses
mightqxc Nov 18, 2025
568f4df
workflows4: status pretty
mightqxc Nov 18, 2025
ba79dc3
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 19, 2025
02f428f
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 19, 2025
4359e3a
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 19, 2025
b4cd251
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 20, 2025
8e5a3c1
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 21, 2025
c061194
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 24, 2025
3366682
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 24, 2025
766ec5f
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 24, 2025
cc0098c
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 24, 2025
cc485c9
workflows4: add workflowHoldup task param
mightqxc Nov 24, 2025
a9fab09
workflows4: fix for workflowHoldup
mightqxc Nov 24, 2025
4c4000e
workflows4: fix test mutable for workflowholdup
mightqxc Nov 24, 2025
e2b227d
workflows4: fix
mightqxc Nov 25, 2025
2bb5f44
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 25, 2025
7420a02
workflows4: fix
mightqxc Nov 25, 2025
84cb281
tmp fix
mightqxc Nov 25, 2025
94413de
workflows4: fix for holdup by nReady
mightqxc Nov 25, 2025
7ed55dc
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 25, 2025
6dbee69
workflows4: fix
mightqxc Nov 25, 2025
672caeb
workflows4: improve hook to release task
mightqxc Nov 25, 2025
391d178
workflows4: fix
mightqxc Nov 25, 2025
6d9d25f
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Nov 27, 2025
b29aa28
workflows4: rename watchdog to be workflow manager
mightqxc Nov 27, 2025
dc90947
workflows4: add messaging for fast forward transient status
mightqxc Nov 27, 2025
ec662de
workflows4: fixes
mightqxc Nov 27, 2025
51837c8
workflows4: break down processing methods
mightqxc Nov 27, 2025
b198f3e
workflows4: fix step and data lock for messaging
mightqxc Nov 27, 2025
dfcd9b0
workflows4: pretty log
mightqxc Nov 27, 2025
256eeb1
pretty
mightqxc Nov 27, 2025
0165649
workflows4: log
mightqxc Nov 28, 2025
5723ff4
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Dec 5, 2025
1d341bb
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Dec 9, 2025
3f876b0
Merge remote-tracking branch 'origin/master' into workflows4
Jan 5, 2026
b27df67
Merge remote-tracking branch 'origin/master' into workflows4
Jan 19, 2026
b2034d5
workflows4: improve step status map for pending task
mightqxc Jan 19, 2026
8a1be88
test
mightqxc Jan 19, 2026
2b0d14e
test
mightqxc Jan 19, 2026
17248f6
test
mightqxc Jan 19, 2026
d8aab08
workflows4: improve step status mapping with task superstatus
mightqxc Jan 27, 2026
409d95b
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Jan 29, 2026
e5be752
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Feb 2, 2026
a934a65
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Feb 5, 2026
8d98105
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Feb 9, 2026
fcbebd1
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Feb 17, 2026
173e246
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 4, 2026
5f2ecbb
workflows4: fixes by copilot
mightqxc Mar 4, 2026
2e48f5f
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 4, 2026
b6b59b1
Update pandaserver/workflow/workflow_utils.py
mightqxc Mar 4, 2026
bb50773
Update pandaserver/workflow/workflow_parser.py
mightqxc Mar 4, 2026
1bae97a
Update pandaserver/workflow/workflow_parser.py
mightqxc Mar 4, 2026
31bd56b
typo in pandaserver/taskbuffer/db_proxy_mods/workflow_module.py
mightqxc Mar 4, 2026
209c393
typo pandaserver/workflow/workflow_utils.py
mightqxc Mar 4, 2026
8e8a66a
Update pandaserver/workflow/workflow_parser.py
mightqxc Mar 4, 2026
6cc92f6
workflows4: fix parser to guarantee chdir to original dir
mightqxc Mar 4, 2026
f97ac89
workflows4: use tarfile instead of tar command
mightqxc Mar 4, 2026
b9eb181
workflows4: fix utils according to copilot
mightqxc Mar 4, 2026
d2433da
workflows4: improve test script
mightqxc Mar 4, 2026
9dcb224
workflows4: fix stats of status change of entities
mightqxc Mar 4, 2026
2d2d2bf
Update pandaserver/workflow/workflow_core_smoketest.py
mightqxc Mar 4, 2026
812cfaf
Update pandaserver/workflow/step_handler_plugins/panda_task_step_hand…
mightqxc Mar 4, 2026
c498fac
Update pandajedi/jedidog/AtlasWorkflowManagerWatchDog.py
mightqxc Mar 4, 2026
e8fd04f
Update pandaserver/workflow/step_handler_plugins/base_step_handler.py
mightqxc Mar 4, 2026
7b3f568
workflows4: update workflow checktime
mightqxc Mar 4, 2026
178455e
workflows4: fix resolve_nodes
mightqxc Mar 4, 2026
38135ba
Merge remote-tracking branch 'origin/workflows4' into workflows4
mightqxc Mar 4, 2026
18a5fd7
workflows4: default lock time
mightqxc Mar 4, 2026
a5a7e97
v0.7.2
mightqxc Mar 4, 2026
cf5eba8
v0.8.0; update DB schema version
mightqxc Mar 5, 2026
86f76fe
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 9, 2026
fc3636e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 9, 2026
4f7da4e
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 10, 2026
ab70b08
Merge remote-tracking branch 'origin/master' into workflows4
mightqxc Mar 10, 2026
00086db
update dependency
mightqxc Mar 11, 2026
ff8c93a
workflows4: add cancel_workflow
mightqxc Mar 11, 2026
d5b6f1d
workflows4: fixes
mightqxc Mar 11, 2026
c0f77c9
log pretty
mightqxc Mar 11, 2026
388667c
workflows4: reuse DDM interface
mightqxc Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.7.2"
release_version = "0.8.0"
2 changes: 1 addition & 1 deletion pandajedi/jedicore/JediDBSchemaInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ class JediDBSchemaInfo:
schema_version = None

def method(self):
schema_version = "0.0.30"
schema_version = "0.1.1"
_logger.debug(f"PanDA schema version required for JEDI is : {schema_version}")
return schema_version
63 changes: 63 additions & 0 deletions pandajedi/jedidog/AtlasWorkflowManagerWatchDog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import sys
import traceback

# logger
from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandajedi.jedicore.MsgWrapper import MsgWrapper
from pandaserver.workflow.workflow_core import WorkflowInterface

from .WatchDogBase import WatchDogBase

logger = PandaLogger().getLogger(__name__.split(".")[-1])


class AtlasWorkflowManagerWatchDog(WatchDogBase):
"""
Workflow manager watchdog for ATLAS
"""

# constructor
def __init__(self, taskBufferIF, ddmIF):
WatchDogBase.__init__(self, taskBufferIF, ddmIF)
self.vo = "atlas"
self.workflow_interface = WorkflowInterface(taskBufferIF)

def doProcessWorkflows(self):
"""
Action to process active workflows
"""
tmpLog = MsgWrapper(logger, " #ATM #KV doProcessWorkflows")
tmpLog.debug("start")
try:
# watchdog lock
got_lock = self.get_process_lock("AtlasWFManagerDog.doProcessWorkflows", timeLimit=5)
if not got_lock:
tmpLog.debug("locked by another watchdog process. Skipped")
return
tmpLog.debug("got watchdog lock")
# process active workflows
stats = self.workflow_interface.process_active_workflows()
tmpLog.info(f"processed workflows: {stats}")
# done
tmpLog.debug("done")
except Exception:
errtype, errvalue = sys.exc_info()[:2]
tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")

# main
def doAction(self):
try:
# get logger
origTmpLog = MsgWrapper(logger)
origTmpLog.debug("start")
# clean up
# check
# process workflows
self.doProcessWorkflows()
except Exception:
errtype, errvalue = sys.exc_info()[:2]
origTmpLog.error(f"failed with {errtype} {errvalue}")
# return
origTmpLog.debug("done")
return self.SC_SUCCEEDED
89 changes: 89 additions & 0 deletions pandajedi/jedimsgprocessor/workflow_manager_msg_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json

from pandacommon.pandalogger import logger_utils

from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
from pandaserver.workflow.workflow_core import WorkflowInterface

base_logger = logger_utils.setup_logger(__name__.split(".")[-1])


# Workflow manager message processor plugin
class WorkflowManagerMsgProcPlugin(BaseMsgProcPlugin):
"""
Message-driven workflow manager
"""

def initialize(self):
"""
Initialize the plugin
"""
BaseMsgProcPlugin.initialize(self)
self.workflow_interface = WorkflowInterface(self.tbIF)

def process(self, msg_obj):
"""
Process the message
Typical message data looks like:
{"msg_type":"workflow", "workflow_id": 123, "timestamp": 987654321}
{"msg_type":"wfstep", "step_id": 456, "timestamp": 987654321}
{"msg_type":"wfdata", "data_id": 789, "timestamp": 987654321}

Args:
msg_obj: message object
"""
tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
# start
tmp_log.info("start")
tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
# parse json
try:
msg_dict = json.loads(msg_obj.data)
except Exception as e:
err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
tmp_log.error(err_str)
raise
# sanity check
try:
msg_type = msg_dict["msg_type"]
except Exception as e:
err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
tmp_log.error(err_str)
raise
if msg_type not in ("workflow", "wfstep", "wfdata"):
err_str = f"got unknown msg_type {msg_type} , skipped "
tmp_log.error(err_str)
raise
# run
try:
tmp_log.info(f"got message {msg_dict}")
if msg_type == "workflow":
workflow_id = msg_dict["workflow_id"]
workflow_spec = self.tbIF.get_workflow(workflow_id)
if workflow_spec is None:
tmp_log.warning(f"workflow_id={workflow_id} not found; skipped")
return
stats, workflow_spec = self.workflow_interface.process_workflow(workflow_spec, by="msg")
tmp_log.info(f"processed workflow_id={workflow_id}")
elif msg_type == "wfstep":
step_id = msg_dict["step_id"]
step_spec = self.tbIF.get_workflow_step(step_id)
if step_spec is None:
tmp_log.warning(f"step_id={step_id} not found; skipped")
return
stats, step_spec = self.workflow_interface.process_step(step_spec, by="msg")
tmp_log.info(f"processed step_id={step_id}")
elif msg_type == "wfdata":
data_id = msg_dict["data_id"]
data_spec = self.tbIF.get_workflow_data(data_id)
if data_spec is None:
tmp_log.warning(f"data_id={data_id} not found; skipped")
return
stats, data_spec = self.workflow_interface.process_data(data_spec, by="msg")
tmp_log.info(f"processed data_id={data_id}")
except Exception as e:
err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
tmp_log.error(err_str)
raise
# done
tmp_log.info("done")
16 changes: 15 additions & 1 deletion pandajedi/jediorder/ContentsFeeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
parentOutDatasets.add(tmpParentOutDataset.containerName + "/")
# loop over all datasets
nFilesMaster = 0
nFilesMasterReady = 0
checkedMaster = False
setFrozenTime = True
master_offset = None
Expand Down Expand Up @@ -240,6 +241,9 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
):
# dummy metadata when parent is running
tmpMetadata = {"state": "mutable"}
# set mutable when workflow holdup is set
if taskSpec.is_workflow_holdup():
tmpMetadata = {"state": "mutable"}
except Exception:
errtype, errvalue = sys.exc_info()[:2]
tmpLog.error(f"{self.__class__.__name__} failed to get metadata to {errtype.__name__}:{errvalue}")
Expand Down Expand Up @@ -529,7 +533,7 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
orderBy = None
# feed files to the contents table
tmpLog.debug("update contents")
retDB, missingFileList, nFilesUnique, diagMap = self.taskBufferIF.insertFilesForDataset_JEDI(
res_dict = self.taskBufferIF.insertFilesForDataset_JEDI(
datasetSpec,
tmpRet,
tmpMetadata["state"],
Expand Down Expand Up @@ -565,6 +569,10 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
maxFileRecords,
skip_short_output,
)
retDB = res_dict["ret_val"]
missingFileList = res_dict["missingFileList"]
nFilesUnique = res_dict["numUniqueLfn"]
diagMap = res_dict["diagMap"]
if retDB is False:
taskSpec.setErrDiag(f"failed to insert files for {datasetSpec.datasetName}. {diagMap['errMsg']}")
allUpdated = False
Expand Down Expand Up @@ -595,6 +603,7 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
if datasetSpec.isMaster():
checkedMaster = True
nFilesMaster += nFilesUnique
nFilesMasterReady += res_dict.get("nReady", 0)
master_offset = datasetSpec.getOffset()
# running task
if diagMap["isRunningTask"]:
Expand All @@ -618,6 +627,11 @@ def feed_contents_to_tasks(self, task_ds_list, real_run=True):
setFrozenTime = False
skip_secondaries = True
tmpLog.debug("end loop")
# task holdup by workflow if no master inputs are ready
if not taskOnHold and not taskBroken and allUpdated and nFilesMasterReady == 0 and checkedMaster and taskSpec.is_workflow_holdup():
# hold up by the workflow
taskOnHold = True
tmpLog.debug("task to hold up by workflow")
# no master input
if not taskOnHold and not taskBroken and allUpdated and nFilesMaster == 0 and checkedMaster:
tmpErrStr = "no master input files. input dataset is empty"
Expand Down
2 changes: 2 additions & 0 deletions pandajedi/jedirefine/TaskRefinerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["allowIncompleteInDS"])
if "noAutoPause" in taskParamMap and taskParamMap["noAutoPause"]:
self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["noAutoPause"])
if "workflowHoldup" in taskParamMap and taskParamMap["workflowHoldup"]:
self.setSplitRule(None, 1, JediTaskSpec.splitRuleToken["workflowHoldup"])
# work queue
workQueue = None
if "workQueueName" in taskParamMap:
Expand Down
136 changes: 136 additions & 0 deletions pandaserver/api/v1/workflow_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import datetime
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from typing import Any, Dict, List

from pandacommon.pandalogger.LogWrapper import LogWrapper
from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.PandaUtils import naive_utcnow

from pandaserver.api.v1.common import (
MESSAGE_DATABASE,
TIME_OUT,
TimedMethod,
generate_response,
get_dn,
has_production_role,
request_validation,
)
from pandaserver.srvcore.panda_request import PandaRequest
from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
from pandaserver.workflow.workflow_core import WorkflowInterface

_logger = PandaLogger().getLogger("api_workflow")

# These global variables are initialized in the init_task_buffer method
global_task_buffer = None
global_wfif = None

# These global variables don't depend on DB access and can be initialized here
# global_proxy_cache = panda_proxy_cache.MyProxyInterface()
# global_token_cache = token_cache.TokenCache()


def init_task_buffer(task_buffer: TaskBuffer) -> None:
"""
Initialize the task buffer and other interfaces. This method needs to be called before any other method in this module.
"""
global global_task_buffer
global_task_buffer = task_buffer

global global_wfif
global_wfif = WorkflowInterface(global_task_buffer)


@request_validation(_logger, secure=True, production=False, request_method="POST")
def submit_workflow_raw_request(req: PandaRequest, params: dict | str) -> dict:
"""
Submit raw request of PanDA native workflow.

API details:
HTTP Method: POST
Path: /v1/workflow/submit_workflow_raw_request

Args:
req(PandaRequest): internally generated request object containing the env variables
params (dict|str): dictionary or JSON of parameters of the raw request

Returns:
dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
"""

user_dn = get_dn(req)
prodsourcelabel = "user"

# FIXME: only for analysis temporarily
# if has_production_role(req):
# prodsourcelabel = "managed"

tmp_logger = LogWrapper(_logger, f'submit_workflow_raw_request prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" ')
tmp_logger.debug("Start")
success, message, data = False, "", None
time_start = naive_utcnow()

if isinstance(params, str):
try:
params = json.loads(params)
except Exception as exc:
message = f"Failed to parse params: {params} {str(exc)}"
tmp_logger.error(message)
return generate_response(success, message, data)

workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, raw_request_params=params)

if workflow_id is not None:
success = True
data = {"workflow_id": workflow_id}
else:
message = "Failed to submit raw workflow request"

time_delta = naive_utcnow() - time_start
tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")

return generate_response(success, message, data)


@request_validation(_logger, secure=True, production=False, request_method="POST")
def submit_workflow(req: PandaRequest, workflow_definition: dict) -> dict:
"""
Submit a PanDA native workflow.

API details:
HTTP Method: POST
Path: /v1/workflow/submit_workflow

Args:
req(PandaRequest): internally generated request object containing the env variables
workflow_definition (dict): dictionary of workflow definition

Returns:
dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
"""

user_dn = get_dn(req)
prodsourcelabel = "user"
if has_production_role(req):
prodsourcelabel = "managed"
workflow_name = workflow_definition.get("workflow_name", None)

tmp_logger = LogWrapper(_logger, f'submit_workflow prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" workflow_name={workflow_name}')
tmp_logger.debug("Start")
success, message, data = False, "", None
time_start = naive_utcnow()

workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, workflow_name, workflow_definition)

if workflow_id is not None:
success = True
data = {"workflow_id": workflow_id}
else:
message = "Failed to submit workflow"

time_delta = naive_utcnow() - time_start
tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")

return generate_response(success, message, data)
Loading
Loading