Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 5.5.0
* Added cleanup commands

# Version 5.4.1
* Fix File Exceptions integration

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`.

Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:

```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
Expand Down Expand Up @@ -598,6 +597,10 @@ the messenger component instead.

`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries

`code-rhapsodie:dataflow:set_crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status.

`code-rhapsodie:dataflow:job_cleanup` Remove old completed or crashed jobs

### Work with many databases

All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.
Expand Down
32 changes: 32 additions & 0 deletions src/Command/JobCleanupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Command;

use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'code-rhapsodie:job_cleanup', description: 'Cleanup job history.')]
class JobCleanupCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $retention)
{
parent::__construct();
}

protected function configure()
{
$this->setHelp('Job retention can be configured with the "job_history.retention" configuration.');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->deleteOld($this->retention);

return Command::SUCCESS;
}
}
2 changes: 1 addition & 1 deletion src/Command/JobShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());

$io->write($exceptions);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Command/SchemaCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

// add -- before each keys
$options = array_combine(
array_map(fn ($key) => '--'.$key, array_keys($options)),
array_map(static fn ($key) => '--'.$key, array_keys($options)),
array_values($options)
);

Expand Down
32 changes: 32 additions & 0 deletions src/Command/SetCrashedCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Command;

use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'code-rhapsodie:set_crashed', description: 'Set long running jobs as crashed.')]
class SetCrashedCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $crashedDelay)
{
parent::__construct();
}

protected function configure()
{
$this->setHelp('How long jobs have to run before they are set as crashed can be configured with the "job_history.crashed_delay" configuration.');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->crashLongRunning($this->crashedDelay);

return Command::SUCCESS;
}
}
2 changes: 1 addition & 1 deletion src/DataflowType/Dataflow/Dataflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function setCustomExceptionIndex(callable $callable): self
*/
public function setAfterItemProcessors(array $processors): self
{
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
$this->afterItemProcessors = array_map(static fn (callable $callable) => \Closure::fromCallable($callable), $processors);

return $this;
}
Expand Down
2 changes: 2 additions & 0 deletions src/DependencyInjection/CodeRhapsodieDataflowExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public function load(array $configs, ContainerBuilder $container): void
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
$container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']);
$container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']);

if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
Expand Down
14 changes: 14 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public function getConfigTreeBuilder(): TreeBuilder
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->arrayNode('job_history')
->addDefaultsIfNotSet()
->children()
->integerNode('retention')
->defaultValue(30)
->min(0)
->info('How many days completed and crashed jobs are kept when running the cleanup command.')
->end()
->integerNode('crashed_delay')
->defaultValue(24)
->min(24)
->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.')
->end()
->end()
->end()
;

Expand Down
1 change: 1 addition & 0 deletions src/Entity/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Job
public const STATUS_RUNNING = 1;
public const STATUS_COMPLETED = 2;
public const STATUS_QUEUED = 3;
public const STATUS_CRASHED = 4;

private const KEYS = [
'id',
Expand Down
27 changes: 27 additions & 0 deletions src/Repository/JobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,33 @@ public function createQueryBuilder($alias = null): QueryBuilder
return $qb;
}

public function crashLongRunning(int $hours): void
{
$qb = $this->connection->createQueryBuilder();
$qb->update(static::TABLE_NAME, 'j')
->set('j.status', ':new_status')
->set('j.end_time', ':now')
->andWhere('j.status = :status')
->andWhere('j.start_time < :date')
->setParameter('status', Job::STATUS_RUNNING)
->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime')
->setParameter('new_status', Job::STATUS_CRASHED)
->setParameter('now', new \DateTime(), 'datetime')
->executeStatement()
;
}

public function deleteOld(int $days): void
{
$qb = $this->connection->createQueryBuilder();
$qb->delete(static::TABLE_NAME, 'j')
->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED]))
->andWhere('j.end_time < :date')
->setParameter('date', new \DateTime("- {$days} days"), 'datetime')
->executeStatement()
;
}

private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();
Expand Down
14 changes: 13 additions & 1 deletion src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
public: false

CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry'
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry:
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~

CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand:
arguments:
Expand Down Expand Up @@ -100,3 +100,15 @@ services:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'

CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$retention: '%coderhapsodie.dataflow.job_history.retention%'
tags: ['console.command']

CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%'
tags: ['console.command']