Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.7.1"
release_version = "0.7.2"
129 changes: 110 additions & 19 deletions pandajedi/jedimsgprocessor/jedi_job_generator_msg_processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import gc
import json
import time

from pandacommon.pandalogger import logger_utils
from pandacommon.pandautils.PandaUtils import try_malloc_trim

from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
from pandajedi.jedicore.ThreadUtils import ListWithLock
from pandajedi.jediddm.DDMInterface import DDMInterface
from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
from pandajedi.jediorder.JobGenerator import JobGeneratorThread
from pandajedi.jediorder.TaskSetupper import TaskSetupper
from pandaserver.srvcore import CoreUtils

base_logger = logger_utils.setup_logger(__name__.split(".")[-1])

Expand All @@ -22,6 +26,18 @@ def initialize(self):
# DDM interface
self.ddmIF = DDMInterface()
self.ddmIF.setupInterface()
self.task_setupper_map = {}
# cache heavy metadata objects with short TTL to reduce allocation churn
self._cache_ttl_sec = 300
self._site_mapper = None
self._site_mapper_ts = 0
self._work_queue_mapper = None
self._work_queue_mapper_ts = 0
self._resource_types = None
self._resource_types_ts = 0
self._nfiles_cache = {}
# memory limit to trigger early cleanup to avoid OOM killer. This is not a hard limit, just a threshold to trigger early cleanup.
self._mem_usage_threshold_mb = 1500
# get SiteMapper
# siteMapper = self.tbIF.get_site_mapper()
# get work queue mapper
Expand All @@ -31,8 +47,49 @@ def initialize(self):
# taskSetupper.initializeMods(self.tbIF, self.ddmIF)
self.pid = self.get_pid()

def _is_cache_valid(self, ts):
return (time.time() - ts) < self._cache_ttl_sec

def _get_site_mapper(self):
if self._site_mapper is None or not self._is_cache_valid(self._site_mapper_ts):
self._site_mapper = self.tbIF.get_site_mapper()
self._site_mapper_ts = time.time()
return self._site_mapper

def _get_work_queue_mapper(self):
if self._work_queue_mapper is None or not self._is_cache_valid(self._work_queue_mapper_ts):
self._work_queue_mapper = self.tbIF.getWorkQueueMap()
self._work_queue_mapper_ts = time.time()
return self._work_queue_mapper

def _get_resource_types(self):
if self._resource_types is None or not self._is_cache_valid(self._resource_types_ts):
self._resource_types = self.tbIF.load_resource_types()
self._resource_types_ts = time.time()
return self._resource_types

def _get_nfiles(self, vo, queue_name):
cache_key = (vo, queue_name)
cache_item = self._nfiles_cache.get(cache_key)
if cache_item is not None:
nfiles, ts = cache_item
if self._is_cache_valid(ts):
return nfiles
nfiles = self.tbIF.getConfigValue("jobgen", f"NFILES_{queue_name}", "jedi", vo)
if nfiles is None:
nfiles = 100
else:
nfiles = int(nfiles)
self._nfiles_cache[cache_key] = (nfiles, time.time())
return nfiles

def process(self, msg_obj):
tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
input_list = None
gen_thr = None
tmp_list = None
taskSpec = None
taskSetupper = None
# start
tmp_log.info("start")
tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
Expand All @@ -52,7 +109,7 @@ def process(self, msg_obj):
raise
if msg_type != "jedi_job_generator":
# FIXME
err_str = f"got unknown msg_type {msg_type} , skipped "
err_str = f"got unknown msg_type {msg_type} , skipped"
tmp_log.error(err_str)
raise
# run
Expand All @@ -61,28 +118,39 @@ def process(self, msg_obj):
task_id = int(msg_dict["taskid"])
_, taskSpec = self.tbIF.getTaskWithID_JEDI(task_id)
if not taskSpec:
tmp_log.debug(f"unknown task {task_id}")
tmp_log.debug(f"unknown task_id={task_id}, skipped")
else:
tmp_log.debug(f"processing task_id={task_id} workqueue_id={taskSpec.workQueue_ID} status={taskSpec.status}")
# get WQ
vo = taskSpec.vo
prodSourceLabel = taskSpec.prodSourceLabel
workQueue = self.tbIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
workQueue = self._get_work_queue_mapper().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
# get resource types
resource_types = self.tbIF.load_resource_types()
resource_types = self._get_resource_types()
if not resource_types:
raise RuntimeError("failed to get resource types")
# nFiles from config with a short TTL cache
nFiles = 100
if workQueue is None:
tmp_log.warning(f"workQueue_ID={taskSpec.workQueue_ID} gshare={taskSpec.gshare} not found in work queue mapper")
else:
nFiles = self._get_nfiles(vo, workQueue.queue_name)
# get inputs
tmpList = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=1000, target_tasks=[task_id])
if tmpList:
inputList = ListWithLock(tmpList)
# create thread
threadPool = ThreadPool()
siteMapper = self.tbIF.get_site_mapper()
taskSetupper = TaskSetupper(vo, prodSourceLabel)
taskSetupper.initializeMods(self.tbIF, self.ddmIF)
tmp_list = self.tbIF.getTasksToBeProcessed_JEDI(self.pid, None, workQueue, None, None, nFiles=nFiles, target_tasks=[task_id])
if tmp_list:
input_list = ListWithLock(tmp_list)
# run generator inline to avoid creating an extra worker thread.
siteMapper = self._get_site_mapper()
setupper_key = (vo, prodSourceLabel)
taskSetupper = self.task_setupper_map.get(setupper_key)
if taskSetupper is None:
taskSetupper = TaskSetupper(vo, prodSourceLabel)
taskSetupper.initializeMods(self.tbIF, self.ddmIF)
self.task_setupper_map[setupper_key] = taskSetupper
# log memory usage before running job generator to help identify if this is a source of memory bloat.
gen_thr = JobGeneratorThread(
inputList,
threadPool,
input_list,
None,
self.tbIF,
self.ddmIF,
siteMapper,
Expand All @@ -97,14 +165,37 @@ def process(self, msg_obj):
False,
resource_types,
)
gen_thr.start()
gen_thr.join()
tmp_log.info(f"generated jobs for task {task_id}")
gen_thr.run()
tmp_log.info(f"task_id={task_id} generated jobs")
else:
tmp_log.debug(f"task {task_id} is not considered to be processed; skipped ")
tmp_log.debug(f"task_id={task_id} is not considered to be processed, skipped")
except Exception as e:
err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
tmp_log.error(err_str)
raise
finally:
# Explicitly clear large per-run maps to release references as early as possible.
if gen_thr is not None:
try:
gen_thr.buildSpecMap.clear()
gen_thr.finished_lib_specs_map.clear()
gen_thr.active_lib_specs_map.clear()
gen_thr.inputList = None
except Exception:
pass
gen_thr = None
input_list = None
tmp_list = None
taskSpec = None
taskSetupper = None
# If memory usage is above the threshold, trigger early cleanup to avoid OOM killer.
mem_usage = CoreUtils.getMemoryUsage()
if mem_usage is None:
tmp_log.warning("failed to get memory usage, skipped memory cleanup")
elif mem_usage > self._mem_usage_threshold_mb:
gc.collect()
try_malloc_trim(tmp_log)
new_mem_usage = CoreUtils.getMemoryUsage()
tmp_log.debug(f"trimmed memory usage from {mem_usage} MB to {new_mem_usage} MB")
# done
tmp_log.info("done")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ authors = [
{ name = "PanDA Team", email = "panda-support@cern.ch" },
]
dependencies = [
'panda-common>=0.1.5',
'panda-common>=0.1.6',
'panda-client-light>=1.5.55',
'pyOpenSSL',
'python-daemon',
Expand Down