From ee5703f0ab94f844c7f9cb3f1d7a773126a38ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20J?= Date: Mon, 23 Feb 2026 16:55:22 +0100 Subject: [PATCH 1/2] Added cleanup commands --- CHANGELOG.md | 3 ++ README.md | 5 ++- src/Command/JobCleanupCommand.php | 32 +++++++++++++++++++ src/Command/SetCrashedCommand.php | 32 +++++++++++++++++++ .../CodeRhapsodieDataflowExtension.php | 2 ++ src/DependencyInjection/Configuration.php | 14 ++++++++ src/Entity/Job.php | 1 + src/Repository/JobRepository.php | 27 ++++++++++++++++ src/Resources/config/services.yaml | 14 +++++++- 9 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 src/Command/JobCleanupCommand.php create mode 100644 src/Command/SetCrashedCommand.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bc8f7c..152bfeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Version 5.5.0 +* Added cleanup commands + # Version 5.4.1 * Fix File Exceptions integration diff --git a/README.md b/README.md index 84638b5..dab6151 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`. Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration: -```yaml ```yaml CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType: tags: @@ -598,6 +597,10 @@ the messenger component instead. `code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries +`code-rhapsodie:dataflow:set_crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status. + +`code-rhapsodie:dataflow:job_cleanup` Remove old completed or crashed jobs + ### Work with many databases All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution. diff --git a/src/Command/JobCleanupCommand.php b/src/Command/JobCleanupCommand.php new file mode 100644 index 0000000..402106b --- /dev/null +++ b/src/Command/JobCleanupCommand.php @@ -0,0 +1,32 @@ +setHelp('Job retention can be configured with the "job_history.retention" configuration.'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->jobRepository->deleteOld($this->retention); + + return Command::SUCCESS; + } +} diff --git a/src/Command/SetCrashedCommand.php b/src/Command/SetCrashedCommand.php new file mode 100644 index 0000000..bd7925c --- /dev/null +++ b/src/Command/SetCrashedCommand.php @@ -0,0 +1,32 @@ +setHelp('How long jobs have to run before they are set as crashed can be configured with the "job_history.crashed_delay" configuration.'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->jobRepository->crashLongRunning($this->crashedDelay); + + return Command::SUCCESS; + } +} diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index ca4bb9c..b09e69c 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -30,6 +30,8 @@ public function load(array $configs, ContainerBuilder $container): void $container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']); $container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']); $container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']); + $container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']); + $container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']); if ($config['exceptions_mode']['type'] === 'file') { $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']); diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 12a6df0..a4be510 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -52,6 +52,20 @@ public function getConfigTreeBuilder(): TreeBuilder ->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.') ->end() ->end() + ->arrayNode('job_history') + ->addDefaultsIfNotSet() + ->children() + ->integerNode('retention') + ->defaultValue(30) + ->min(0) + ->info('How many days completed and crashed jobs are kept when running the cleanup command.') + ->end() + ->integerNode('crashed_delay') + ->defaultValue(24) + ->min(24) + ->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.') + ->end() + ->end() ->end() ; diff --git a/src/Entity/Job.php b/src/Entity/Job.php index cd931a6..dd721b2 100644 --- a/src/Entity/Job.php +++ b/src/Entity/Job.php @@ -17,6 +17,7 @@ class Job public const STATUS_RUNNING = 1; public const STATUS_COMPLETED = 2; public const STATUS_QUEUED = 3; + public const STATUS_CRASHED = 4; private const KEYS = [ 'id', diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php index 92d36a6..ae1a7e4 100644 --- a/src/Repository/JobRepository.php +++ b/src/Repository/JobRepository.php @@ -151,6 +151,33 @@ public function createQueryBuilder($alias = null): QueryBuilder return $qb; } + public function crashLongRunning(int $hours): void + { + $qb = $this->connection->createQueryBuilder(); + $qb->update(static::TABLE_NAME, 'j') + ->set('j.status', ':new_status') + ->set('j.end_time', ':now') + ->andWhere('j.status = :status') + ->andWhere('j.start_time < :date') + ->setParameter('status', Job::STATUS_RUNNING) + ->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime') + ->setParameter('new_status', Job::STATUS_CRASHED) + ->setParameter('now', new \DateTime(), 'datetime') + ->executeStatement() + ; + } + + public function deleteOld(int $days): void + { + $qb = $this->connection->createQueryBuilder(); + $qb->delete(static::TABLE_NAME, 'j') + ->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED])) + ->andWhere('j.end_time < :date') + ->setParameter('date', new \DateTime("- {$days} days"), 'datetime') + ->executeStatement() + ; + } + private function returnFirstOrNull(QueryBuilder $qb): ?Job { $stmt = $qb->executeQuery(); diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index dce5b62..3f3bb9a 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -3,7 +3,7 @@ services: public: false CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry' - CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: + CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~ CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand: arguments: @@ -100,3 +100,15 @@ services: arguments: $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface' + + CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand: + arguments: + $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $retention: '%coderhapsodie.dataflow.job_history.retention%' + tags: ['console.command'] + + CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand: + arguments: + $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%' + tags: ['console.command'] From 83f00e79d33565d347b1ef6f5f9afd1341d2ae95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20J?= Date: Mon, 23 Feb 2026 16:59:28 +0100 Subject: [PATCH 2/2] php-cs-fixer --- src/Command/JobShowCommand.php | 2 +- src/Command/SchemaCommand.php | 2 +- src/DataflowType/Dataflow/Dataflow.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 2863b94..15da603 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -93,7 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $io->table(['Field', 'Value'], $display); if ($input->getOption('details')) { $io->section('Exceptions'); - $exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); + $exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); $io->write($exceptions); } diff --git a/src/Command/SchemaCommand.php b/src/Command/SchemaCommand.php index 237724c..f774a3e 100644 --- a/src/Command/SchemaCommand.php +++ b/src/Command/SchemaCommand.php @@ -38,7 +38,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int // add -- before each keys $options = array_combine( - array_map(fn ($key) => '--'.$key, array_keys($options)), + array_map(static fn ($key) => '--'.$key, array_keys($options)), array_values($options) ); diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php index dbbb761..d2021e3 100644 --- a/src/DataflowType/Dataflow/Dataflow.php +++ b/src/DataflowType/Dataflow/Dataflow.php @@ -67,7 +67,7 @@ public function setCustomExceptionIndex(callable $callable): self */ public function setAfterItemProcessors(array $processors): self { - $this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors); + $this->afterItemProcessors = array_map(static fn (callable $callable) => \Closure::fromCallable($callable), $processors); return $this; }