Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pandajedi/jedirefine/TaskRefinerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def extractCommon(self, jediTaskID, taskParamMap, workQueueMapper, splitRule):
self.setSplitRule(taskParamMap, "nChunksToWait", JediTaskSpec.splitRuleToken["nChunksToWait"])
self.setSplitRule(taskParamMap, "retryRamOffset", JediTaskSpec.splitRuleToken["retryRamOffset"])
self.setSplitRule(taskParamMap, "retryRamStep", JediTaskSpec.splitRuleToken["retryRamStep"])
self.setSplitRule(taskParamMap, "retryRamMax", JediTaskSpec.splitRuleToken["retryRamMax"])
if "forceStaged" in taskParamMap:
taskParamMap["useLocalIO"] = taskParamMap["forceStaged"]
if "useLocalIO" in taskParamMap:
Expand Down
15 changes: 13 additions & 2 deletions pandaserver/taskbuffer/JediTaskSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1574,13 +1574,24 @@ def get_ram_for_retry(self, current_ram):
step = int(tmpMatch.group(1))
else:
step = 0
tmpMatch = re.search(self.splitRuleToken["retryRamMax"] + r"=(\d+)", self.splitRule)
if tmpMatch:
max_ram = int(tmpMatch.group(1))
else:
max_ram = None
if not current_ram:
return current_ram
if current_ram < offset:
return offset
if max_ram is None:
return offset
else:
return min(offset, max_ram)
if not step:
return current_ram
return offset + math.ceil((current_ram - offset) / step) * step
if max_ram is None:
return offset + math.ceil((current_ram - offset) / step) * step
else:
return min(offset + math.ceil((current_ram - offset) / step) * step, max_ram)

# get number of events per input
def get_num_events_per_input(self):
Expand Down
37 changes: 31 additions & 6 deletions pandaserver/taskbuffer/db_proxy_mods/task_event_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr
"""
comment = " /* DBProxy.increaseRamLimitJobJEDI_xtimes */"
tmp_log = self.create_tagged_logger(comment, f"PandaID={job.PandaID}")
tmp_log.debug(f"start")
tmp_log.debug("start")

# Files defined as input types
input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
Expand Down Expand Up @@ -2667,20 +2667,45 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr
# set default value
retryRamOffset = 0
retryRamStep = 1.0
retryRamMax = None
# set values from task
for tmpItem in items:
if tmpItem.startswith("RX="):
retryRamOffset = int(tmpItem.replace("RX=", ""))
if tmpItem.startswith("RY="):
retryRamStep = float(tmpItem.replace("RY=", ""))
if tmpItem.startswith("RZ="):
retryRamMax = float(tmpItem.replace("RZ=", ""))

jobRamCount = JobUtils.decompensate_ram_count(jobRamCount)

tmp_log.debug(
f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} attemptNr={attemptNr}"
f"RAM limit task={taskRamCount}{taskRamUnit} cores={coreCount} baseRamCount={taskBaseRamCount} job={jobRamCount}{job.minRamUnit} jobPSS={job.maxPSS}kB retryRamOffset={retryRamOffset} retryRamStep={retryRamStep} retryRamMax={retryRamMax} attemptNr={attemptNr}"
)

# normalize the job ram-count by base ram count and number of cores
try:
if taskRamUnit in [
"MBPerCore",
"MBPerCoreFixed",
] and job.minRamUnit in ("MB", None, "NULL"):
jobRamCount = jobRamCount / coreCount
except TypeError:
pass

# normalize the job ram-count by base ram count and number of cores
multiplier = retryRamStep * 1.0 / taskRamCount
minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr)
# should boost memory based on the current job memory, not based on the attemptNr. Because sometimes some failures are not caused by memory limitation.
# minimumRam = retryRamOffset + taskRamCount * (multiplier**attemptNr)
minimumRam = jobRamCount * multiplier
tmp_log.debug(f"minimumRam {minimumRam} = jobRamCount {jobRamCount} * multiplier {multiplier}")
if retryRamMax:
try:
retryRamMaxPerCore = retryRamMax / coreCount
except Exception:
retryRamMaxPerCore = retryRamMax
minimumRam = min(minimumRam, retryRamMaxPerCore)
tmp_log.debug(f"retryRamMaxPerCore {retryRamMaxPerCore}, new minimumRam {minimumRam}")

if taskRamUnit != "MBPerCoreFixed":
# If more than x% of the task's jobs needed a memory increase, increase the task's memory instead
Expand Down Expand Up @@ -2718,12 +2743,12 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr

# Ops could have increased task RamCount through direct DB access. In this case don't do anything
if taskRamCount > minimumRam:
tmp_log.debug(f"task ramcount has already been increased and is higher than minimumRam. Skipping")
tmp_log.debug("task ramcount has already been increased and is higher than minimumRam. Skipping")
return True

# skip if already at largest limit
if jobRamCount >= minimumRam:
tmp_log.debug(f"job ramcount is larger than minimumRam. Skipping")
tmp_log.debug("job ramcount is larger than minimumRam. Skipping")
return True
else:
nextLimit = minimumRam
Expand Down Expand Up @@ -2753,7 +2778,7 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr
if not self._commit():
raise RuntimeError("Commit error")

tmp_log.debug(f"done")
tmp_log.debug("done")
return True
except Exception:
# roll back
Expand Down
1 change: 1 addition & 0 deletions pandaserver/taskbuffer/task_split_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"randomSeed": "RS",
"retryRamOffset": "RX",
"retryRamStep": "RY",
"retryRamMax": "RZ",
"resurrectConsumers": "SC",
"switchEStoNormal": "SE",
"stayOutputOnSite": "SO",
Expand Down