<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
final class Processor
|
{
|
public function __construct(
|
private Storage $storage,
|
private Executor $defaultExecutor,
|
private TypeRegistry $registry,
|
private Locker $locker
|
) {}
|
|
public function run(): void
|
{
|
$this->locker->withLock(function () {
|
$ops = $this->storage->fetchRunnable(10);
|
|
foreach ($ops as $op) {
|
if (!$this->storage->markProcessing($op->id)) {
|
continue;
|
}
|
|
$this->processOne($op);
|
}
|
|
$this->storage->invalidateQueueCache();
|
});
|
}
|
|
private function processOne(Operation $op): void
|
{
|
$progress = new Progress($op);
|
$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
|
|
try {
|
$result = $executor->execute($op, $progress);
|
|
$op->state = 'completed';
|
$op->outcome = $result->outcome;
|
$op->result = $result->result;
|
$op->completedAt = current_time('mysql');
|
|
} catch (\Throwable $e) {
|
$this->handleFailure($op, $e);
|
}
|
|
$this->storage->save($op);
|
}
|
|
private function handleFailure(Operation $op, \Throwable $e): void
|
{
|
$hash = md5($e->getMessage());
|
|
if ($op->lastErrorHash === $hash) {
|
// Same error twice → permanent failure
|
$op->state = 'completed';
|
$op->outcome = 'failed_permanent';
|
$op->completedAt = current_time('mysql');
|
} else {
|
// New error → schedule retry
|
$op->retries++;
|
$op->lastErrorHash = $hash;
|
$op->state = 'scheduled';
|
$op->scheduledAt = $this->calculateBackoff($op->retries);
|
}
|
|
$op->errorMessage = $e->getMessage();
|
|
JVB()->error()->log(
|
'[Queue]:processOne',
|
$e->getMessage(),
|
[
|
'operation_id' => $op->id,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'retries' => $op->retries,
|
],
|
$op->outcome === 'failed_permanent' ? 'critical' : 'warning'
|
);
|
}
|
|
private function calculateBackoff(int $attempt): string
|
{
|
$delay = min(30 * pow(2, $attempt - 1), 3600);
|
$jitter = rand(0, (int)($delay * 0.1));
|
return date('Y-m-d H:i:s', time() + $delay + $jitter);
|
}
|
}
|