From c70c22345b5c8e2cd6ce426ac5c1072d06ad5ebf Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 16 Dec 2025 14:09:38 +0100 Subject: [PATCH 1/6] fix boosting memory --- pandajedi/jedirefine/TaskRefinerBase.py | 1 + pandaserver/server/panda.py | 5 +++++ pandaserver/taskbuffer/JediTaskSpec.py | 15 +++++++++++++-- .../taskbuffer/db_proxy_mods/task_event_module.py | 11 +++++++++-- pandaserver/taskbuffer/task_split_rules.py | 1 + 5 files changed, 29 insertions(+), 4 deletions(-) diff --git a/pandajedi/jedirefine/TaskRefinerBase.py b/pandajedi/jedirefine/TaskRefinerBase.py index 74fdd4fbf..78f1e2daa 100644 --- a/pandajedi/jedirefine/TaskRefinerBase.py +++ b/pandajedi/jedirefine/TaskRefinerBase.py @@ -293,6 +293,7 @@ def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule): self.setSplitRule(taskParamMap, "nChunksToWait", JediTaskSpec.splitRuleToken["nChunksToWait"]) self.setSplitRule(taskParamMap, "retryRamOffset", JediTaskSpec.splitRuleToken["retryRamOffset"]) self.setSplitRule(taskParamMap, "retryRamStep", JediTaskSpec.splitRuleToken["retryRamStep"]) + self.setSplitRule(taskParamMap, "retryRamMax", JediTaskSpec.splitRuleToken["retryRamMax"]) if "forceStaged" in taskParamMap: taskParamMap["useLocalIO"] = taskParamMap["forceStaged"] if "useLocalIO" in taskParamMap: diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index 3cf3d491f..6152e6c29 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -385,6 +385,11 @@ def validate_method(method_name, api_module, version): def encode_special_cases(obj): if isinstance(obj, datetime.datetime): return {"__datetime__": obj.isoformat()} + if isinstance(obj, decimal.Decimal): + if obj == obj.to_integral_value(): + return int(obj) + else: + return float(obj) raise TypeError(f"Type not serializable for {obj} ({type(obj)})") diff --git a/pandaserver/taskbuffer/JediTaskSpec.py b/pandaserver/taskbuffer/JediTaskSpec.py index a5f419253..fa63593e4 100644 --- a/pandaserver/taskbuffer/JediTaskSpec.py +++ b/pandaserver/taskbuffer/JediTaskSpec.py @@ -1565,13 +1565,24 @@ def get_ram_for_retry(self, current_ram): step = int(tmpMatch.group(1)) else: step = 0 + tmpMatch = re.search(self.splitRuleToken["retryRamMax"] + r"=(\d+)", self.splitRule) + if tmpMatch: + max_ram = int(tmpMatch.group(1)) + else: + max_ram = None if not current_ram: return current_ram if current_ram < offset: - return offset + if max_ram is None: + return offset + else: + return min(offset, max_ram) if not step: return current_ram - return offset + math.ceil((current_ram - offset) / step) * step + if max_ram is None: + return offset + math.ceil((current_ram - offset) / step) * step + else: + return min(offset + math.ceil((current_ram - offset) / step) * step, max_ram) # get number of events per input def get_num_events_per_input(self): diff --git a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py index f41e54cc1..8cc01d775 100644 --- a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py +++ b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py @@ -2667,20 +2667,27 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr # set default value retryRamOffset = 0 retryRamStep = 1.0 + retryRamMax = None # set values from task for tmpItem in items: if tmpItem.startswith("RX="): retryRamOffset = int(tmpItem.replace("RX=", "")) if tmpItem.startswith("RY="): retryRamStep = float(tmpItem.replace("RY=", "")) + if tmpItem.startswith("RZ="): + retryRamMax = float(tmpItem.replace("RZ=", "")) tmp_log.debug( - f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} attemptNr={attemptNr}" + f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}" ) # normalize the job ram-count by base ram count and number of cores multiplier = retryRamStep * 1.0 / taskRamCount - minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr) + # should boost memory based on the current job memory, not based on the attemptNr. Because sometimes some failures are not caused by memory limitation. + # minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr) + minimumRam = jobRamCount * multiplier + if retryRamMax: + minimumRam = min(minimumRam, retryRamMax) if taskRamUnit != "MBPerCoreFixed": # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead diff --git a/pandaserver/taskbuffer/task_split_rules.py b/pandaserver/taskbuffer/task_split_rules.py index 448311e03..0bd6eb9b8 100644 --- a/pandaserver/taskbuffer/task_split_rules.py +++ b/pandaserver/taskbuffer/task_split_rules.py @@ -80,6 +80,7 @@ "randomSeed": "RS", "retryRamOffset": "RX", "retryRamStep": "RY", + "retryRamMax": "RZ", "resurrectConsumers": "SC", "switchEStoNormal": "SE", "stayOutputOnSite": "SO", From 61ed31acb426795cbd6f5316f4876057f8236177 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 17 Dec 2025 01:39:19 +0100 Subject: [PATCH 2/6] fix import --- pandaserver/server/panda.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index 6152e6c29..6091929d6 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -6,6 +6,7 @@ """ import datetime +import decimal import gzip import io import json From 3c2d266072fec6e1a59e54126532cfd74e6eb2fa Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 22 Jan 2026 14:53:09 +0100 Subject: [PATCH 3/6] add catchall keys --- pandaserver/taskbuffer/SiteSpec.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pandaserver/taskbuffer/SiteSpec.py b/pandaserver/taskbuffer/SiteSpec.py index 864f8181b..c1a806f16 100644 --- a/pandaserver/taskbuffer/SiteSpec.py +++ b/pandaserver/taskbuffer/SiteSpec.py @@ -22,6 +22,9 @@ "allowed_processing", "excluded_processing", "per_core_attr", + "allow_no_pilot", + "logging", + "loggingfile", ] } From 9257b16daa4073818095deb385b31f493510df1c Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 22 Jan 2026 23:35:17 +0100 Subject: [PATCH 4/6] remove catchall items for pilot --- pandaserver/taskbuffer/SiteSpec.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pandaserver/taskbuffer/SiteSpec.py b/pandaserver/taskbuffer/SiteSpec.py index c1a806f16..bf91ad109 100644 --- a/pandaserver/taskbuffer/SiteSpec.py +++ b/pandaserver/taskbuffer/SiteSpec.py @@ -23,8 +23,6 @@ "excluded_processing", "per_core_attr", "allow_no_pilot", - "logging", - "loggingfile", ] } From 49fea4057d792b2bb0011d96d31581d3f3f99262 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 26 Jan 2026 22:59:54 +0100 Subject: [PATCH 5/6] fix jobRamCount to normalize it with coreCount --- .../db_proxy_mods/task_event_module.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py index 8cc01d775..38def59e1 100644 --- a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py +++ b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py @@ -2633,7 +2633,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr """ comment = " /* DBProxy.increaseRamLimitJobJEDI_xtimes */" tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}") - tmp_log.debug(f"start") + tmp_log.debug("start") # Files defined as input types input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output") @@ -2677,17 +2677,31 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr if tmpItem.startswith("RZ="): retryRamMax = float(tmpItem.replace("RZ=", "")) + jobRamCount = JobUtils.decompensate_ram_count(jobRamCount) + tmp_log.debug( f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}" ) + # normalize the job ram-count by base ram count and number of cores + try: + if taskRamUnit in [ + "MBPerCore", + "MBPerCoreFixed", + ] and job.minRamUnit in ("MB", None, "NULL"): + jobRamCount = jobRamCount / coreCount + except TypeError: + pass + # normalize the job ram-count by base ram count and number of cores multiplier = retryRamStep * 1.0 / taskRamCount # should boost memory based on the current job memory, not based on the attemptNr. Because sometimes some failures are not caused by memory limitation. # minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr) minimumRam = jobRamCount * multiplier + tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}") if retryRamMax: minimumRam = min(minimumRam, retryRamMax) + tmp_log.debug(f"retryRamMax {retryRamMax}, new minimumRam {minimumRam}") if taskRamUnit != "MBPerCoreFixed": # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead @@ -2725,12 +2739,12 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr # Ops could have increased task RamCount through direct DB access. In this case don't do anything if taskRamCount > minimumRam: - tmp_log.debug(f"task ramcount has already been increased and is higher than minimumRam. Skipping") + tmp_log.debug("task ramcount has already been increased and is higher than minimumRam. Skipping") return True # skip if already at largest limit if jobRamCount >= minimumRam: - tmp_log.debug(f"job ramcount is larger than minimumRam. Skipping") + tmp_log.debug("job ramcount is larger than minimumRam. Skipping") return True else: nextLimit = minimumRam @@ -2760,7 +2774,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr if not self._commit(): raise RuntimeError("Commit error") - tmp_log.debug(f"done") + tmp_log.debug("done") return True except Exception: # roll back From f106332f1d54528e4de673aece60c51a1a86e60f Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 27 Jan 2026 23:04:59 +0100 Subject: [PATCH 6/6] fix maxMem based on per core --- pandaserver/taskbuffer/db_proxy_mods/task_event_module.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py index 38def59e1..dff95d7f2 100644 --- a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py +++ b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py @@ -2700,8 +2700,12 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr minimumRam = jobRamCount * multiplier tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}") if retryRamMax: - minimumRam = min(minimumRam, retryRamMax) - tmp_log.debug(f"retryRamMax {retryRamMax}, new minimumRam {minimumRam}") + try: + retryRamMaxPerCore = retryRamMax / coreCount + except Exception: + retryRamMaxPerCore = retryRamMax + minimumRam = min(minimumRam, retryRamMaxPerCore) + tmp_log.debug(f"retryRamMaxPerCore {retryRamMaxPerCore}, new minimumRam {minimumRam}") if taskRamUnit != "MBPerCoreFixed": # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead