locker->withLock(function () { $ops = $this->storage->fetchRunnable(10); foreach ($ops as $op) { if (!$this->storage->markProcessing($op->id)) { continue; } $this->processOne($op); } $this->storage->invalidateQueueCache(); }); } private function processOne(Operation $op): void { $progress = new Progress($op); $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; try { $result = $executor->execute($op, $progress); $op->state = 'completed'; $op->outcome = $result->outcome; $op->result = $result->result; $op->completedAt = current_time('mysql'); } catch (\Throwable $e) { $this->handleFailure($op, $e); } $this->storage->save($op); } private function handleFailure(Operation $op, \Throwable $e): void { $hash = md5($e->getMessage()); if ($op->lastErrorHash === $hash) { // Same error twice → permanent failure $op->state = 'completed'; $op->outcome = 'failed_permanent'; $op->completedAt = current_time('mysql'); } else { // New error → schedule retry $op->retries++; $op->lastErrorHash = $hash; $op->state = 'scheduled'; $op->scheduledAt = $this->calculateBackoff($op->retries); } $op->errorMessage = $e->getMessage(); JVB()->error()->log( '[Queue]:processOne', $e->getMessage(), [ 'operation_id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'retries' => $op->retries, ], $op->outcome === 'failed_permanent' ? 'critical' : 'warning' ); } private function calculateBackoff(int $attempt): string { $delay = min(30 * pow(2, $attempt - 1), 3600); $jitter = rand(0, (int)($delay * 0.1)); return date('Y-m-d H:i:s', time() + $delay + $jitter); } }