Jake Vanderwerf
2026-05-13 226b50642af0895948fbaa623a9b7180399a63b6
inc/managers/queue/Processor.php
@@ -14,48 +14,39 @@
   public function run(): void
   {
      if (get_transient(BASE.'queue_running')) {
         return;
      }
      set_transient(BASE.'queue_running', true, 60);
      if (!$this->hasAdequateResources()) {
         error_log('[Processor] Insufficient resources to start processing');
         return;
      }
      $ops = $this->storage->fetchRunnable();
      if (empty($ops)) {
         return;
      }
      foreach ($ops as $op) {
         if ($op->state === 'completed') {
            return;
      $op = null;
      $this->storage->withTransaction(function() use (&$op) {
         $candidates = $this->storage->fetchRunnable();
         foreach ($candidates as $candidate) {
            if ($candidate->state === 'completed') continue;
            if (!$this->dependenciesSatisfied($candidate)) continue;
            if ($this->storage->markProcessing($candidate->id)) {
               $op = $candidate;
               break;
            }
         }
         if (!$this->dependenciesSatisfied($op)) {
            continue;
         }
         if (!$this->storage->markProcessing($op->id)) {
            continue;
         }
         $this->processOne($op);
         usleep(10000);
      }
      });
      if (!$op) return;
      $this->processOne($op);
      usleep(10000);
      $this->storage->invalidateQueueCache();
   }
   private function processOne(Operation $op): void
   {
      if (get_transient(BASE.$op->id)) {
         return;
      }
      set_transient(BASE.$op->id, true, 500);
      $progress = new Progress($op);
      $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
      $op->startedAt = current_time('mysql');
      $op->state = 'processing';
      $this->storage->saveProgress($op);
      try {