diff --git a/pandajedi/jedirefine/TaskRefinerBase.py b/pandajedi/jedirefine/TaskRefinerBase.py index 74fdd4fbf..78f1e2daa 100644 --- a/pandajedi/jedirefine/TaskRefinerBase.py +++ b/pandajedi/jedirefine/TaskRefinerBase.py @@ -293,6 +293,7 @@ def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule): self.setSplitRule(taskParamMap, "nChunksToWait", JediTaskSpec.splitRuleToken["nChunksToWait"]) self.setSplitRule(taskParamMap, "retryRamOffset", JediTaskSpec.splitRuleToken["retryRamOffset"]) self.setSplitRule(taskParamMap, "retryRamStep", JediTaskSpec.splitRuleToken["retryRamStep"]) + self.setSplitRule(taskParamMap, "retryRamMax", JediTaskSpec.splitRuleToken["retryRamMax"]) if "forceStaged" in taskParamMap: taskParamMap["useLocalIO"] = taskParamMap["forceStaged"] if "useLocalIO" in taskParamMap: diff --git a/pandaserver/taskbuffer/JediTaskSpec.py b/pandaserver/taskbuffer/JediTaskSpec.py index 791e16604..233e257a8 100644 --- a/pandaserver/taskbuffer/JediTaskSpec.py +++ b/pandaserver/taskbuffer/JediTaskSpec.py @@ -1574,13 +1574,24 @@ def get_ram_for_retry(self, current_ram): step = int(tmpMatch.group(1)) else: step = 0 + tmpMatch = re.search(self.splitRuleToken["retryRamMax"] + r"=(\d+)", self.splitRule) + if tmpMatch: + max_ram = int(tmpMatch.group(1)) + else: + max_ram = None if not current_ram: return current_ram if current_ram < offset: - return offset + if max_ram is None: + return offset + else: + return min(offset, max_ram) if not step: return current_ram - return offset + math.ceil((current_ram - offset) / step) * step + if max_ram is None: + return offset + math.ceil((current_ram - offset) / step) * step + else: + return min(offset + math.ceil((current_ram - offset) / step) * step, max_ram) # get number of events per input def get_num_events_per_input(self): diff --git a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py index c30b51c3f..4f0f9a827 100644 --- a/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py +++ b/pandaserver/taskbuffer/db_proxy_mods/task_event_module.py @@ -2633,7 +2633,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr """ comment = " /* DBProxy.increaseRamLimitJobJEDI_xtimes */" tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}") - tmp_log.debug(f"start") + tmp_log.debug("start") # Files defined as input types input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output") @@ -2667,20 +2667,45 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr # set default value retryRamOffset = 0 retryRamStep = 1.0 + retryRamMax = None # set values from task for tmpItem in items: if tmpItem.startswith("RX="): retryRamOffset = int(tmpItem.replace("RX=", "")) if tmpItem.startswith("RY="): retryRamStep = float(tmpItem.replace("RY=", "")) + if tmpItem.startswith("RZ="): + retryRamMax = float(tmpItem.replace("RZ=", "")) + + jobRamCount = JobUtils.decompensate_ram_count(jobRamCount) tmp_log.debug( - f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} attemptNr={attemptNr}" + f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}" ) + # normalize the job ram-count by base ram count and number of cores + try: + if taskRamUnit in [ + "MBPerCore", + "MBPerCoreFixed", + ] and job.minRamUnit in ("MB", None, "NULL"): + jobRamCount = jobRamCount / coreCount + except TypeError: + pass + # normalize the job ram-count by base ram count and number of cores multiplier = retryRamStep * 1.0 / taskRamCount - minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr) + # should boost memory based on the current job memory, not based on the attemptNr. Because sometimes some failures are not caused by memory limitation. + # minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr) + minimumRam = jobRamCount * multiplier + tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}") + if retryRamMax: + try: + retryRamMaxPerCore = retryRamMax / coreCount + except Exception: + retryRamMaxPerCore = retryRamMax + minimumRam = min(minimumRam, retryRamMaxPerCore) + tmp_log.debug(f"retryRamMaxPerCore {retryRamMaxPerCore}, new minimumRam {minimumRam}") if taskRamUnit != "MBPerCoreFixed": # If more than x% of the task's jobs needed a memory increase, increase the task's memory instead @@ -2718,12 +2743,12 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr # Ops could have increased task RamCount through direct DB access. In this case don't do anything if taskRamCount > minimumRam: - tmp_log.debug(f"task ramcount has already been increased and is higher than minimumRam. Skipping") + tmp_log.debug("task ramcount has already been increased and is higher than minimumRam. Skipping") return True # skip if already at largest limit if jobRamCount >= minimumRam: - tmp_log.debug(f"job ramcount is larger than minimumRam. Skipping") + tmp_log.debug("job ramcount is larger than minimumRam. Skipping") return True else: nextLimit = minimumRam @@ -2753,7 +2778,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr if not self._commit(): raise RuntimeError("Commit error") - tmp_log.debug(f"done") + tmp_log.debug("done") return True except Exception: # roll back diff --git a/pandaserver/taskbuffer/task_split_rules.py b/pandaserver/taskbuffer/task_split_rules.py index 448311e03..0bd6eb9b8 100644 --- a/pandaserver/taskbuffer/task_split_rules.py +++ b/pandaserver/taskbuffer/task_split_rules.py @@ -80,6 +80,7 @@ "randomSeed": "RS", "retryRamOffset": "RX", "retryRamStep": "RY", + "retryRamMax": "RZ", "resurrectConsumers": "SC", "switchEStoNormal": "SE", "stayOutputOnSite": "SO",