| | |
| | | |
| | | public function run(): void |
| | | { |
| | | if (get_transient(BASE.'queue_running')) { |
| | | return; |
| | | } |
| | | set_transient(BASE.'queue_running', true, 60); |
| | | if (!$this->hasAdequateResources()) { |
| | | error_log('[Processor] Insufficient resources to start processing'); |
| | | return; |
| | | } |
| | | |
| | | $ops = $this->storage->fetchRunnable(); |
| | | if (empty($ops)) { |
| | | return; |
| | | } |
| | | foreach ($ops as $op) { |
| | | if ($op->state === 'completed') { |
| | | return; |
| | | $op = null; |
| | | $this->storage->withTransaction(function() use (&$op) { |
| | | $candidates = $this->storage->fetchRunnable(); |
| | | foreach ($candidates as $candidate) { |
| | | if ($candidate->state === 'completed') continue; |
| | | if (!$this->dependenciesSatisfied($candidate)) continue; |
| | | if ($this->storage->markProcessing($candidate->id)) { |
| | | $op = $candidate; |
| | | break; |
| | | } |
| | | } |
| | | if (!$this->dependenciesSatisfied($op)) { |
| | | continue; |
| | | } |
| | | if (!$this->storage->markProcessing($op->id)) { |
| | | continue; |
| | | } |
| | | $this->processOne($op); |
| | | usleep(10000); |
| | | } |
| | | }); |
| | | |
| | | if (!$op) return; |
| | | |
| | | $this->processOne($op); |
| | | usleep(10000); |
| | | |
| | | $this->storage->invalidateQueueCache(); |
| | | } |
| | | |
| | | |
| | | private function processOne(Operation $op): void |
| | | { |
| | | if (get_transient(BASE.$op->id)) { |
| | | return; |
| | | } |
| | | set_transient(BASE.$op->id, true, 500); |
| | | $progress = new Progress($op); |
| | | |
| | | $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; |
| | | $op->startedAt = current_time('mysql'); |
| | | $op->state = 'processing'; |
| | | |
| | | $this->storage->saveProgress($op); |
| | | |
| | | try { |