Jake Vanderwerf
2026-02-04 2127b1bdd73ecd2423e443992da4b442f5a3c1a3
inc/managers/queue/Queue.php
@@ -17,10 +17,10 @@
   {
      $this->storage = new Storage();
      $this->registry = new TypeRegistry();
      $this->locker = new Locker('queue_processor', 300);
      $this->locker = new Locker();
      $executor = new FilteredExecutor($this->storage);
      $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker);
      $this->processor = new Processor($this->storage, $executor, $this->registry);
      add_action('jvb_process_queue', [$this, 'checkQueue']);
      add_action('jvb_queue_maintenance', [$this, 'maintenance']);
@@ -41,6 +41,11 @@
      return $this->registry;
   }
   public function storage(): Storage
   {
      return $this->storage;
   }
   /**
    * Queue a new operation or merge into existing
    *
@@ -60,53 +65,6 @@
   {
      try {
         $incoming = $this->buildOperation($type, $userId, $data, $options);
         $mergeable = $this->registry->getMergeable($type);
         $existingById = $this->storage->find($incoming->id);
         if ($existingById) {
            // Operation with this ID already exists
            if (in_array($existingById->state, ['pending', 'scheduled']) && $mergeable) {
               // Still pending and mergeable, merge into it
               $merged = $mergeable->merge($existingById, $incoming);
               $this->storage->save($merged);
               $this->runQueueOnShutdown();
               return [
                  'success'          => true,
                  'operation_id'     => $merged->id,
                  'updated_existing' => true,
               ];
            } else {
               // Already processing/completed, or not mergeable - generate new ID
               $incoming->id = 'u' . $userId . '_' . time() . '_' . uniqid();
               JVB()->error()->log(
                  '[Queue]:add',
                  'Duplicate ID for non-mergeable operation, generated new ID',
                  [
                     'type' => $type,
                     'existing_state' => $existingById->state,
                  ],
                  'warning'
               );
            }
         }
         if ($mergeable) {
            $existing = $this->storage->findMergeable($type, $userId);
            if ($existing && $mergeable->canMerge($existing, $incoming)) {
               $merged = $mergeable->merge($existing, $incoming);
               $this->storage->save($merged);
               $this->runQueueOnShutdown();
               return [
                  'success'          => true,
                  'operation_id'     => $merged->id,
                  'updated_existing' => true,
               ];
            }
         }
         $this->storage->insert($incoming);
         $this->runQueueOnShutdown();
@@ -133,43 +91,18 @@
   public function checkQueue(): void
   {
      if (!$this->storage->getQueueInfo()['has_items']) {
         return;
      }
      $this->locker->withLock(function () {
         $this->processor->run();
      });
      $this->processor->run();
   }
   public function maintenance(): void
   {
      if ($this->locker->isLocked()) {
         return;
      }
      $this->locker->withLock(function () {
         $this->storage->resetStuckOperations(30);
      });
      $this->cleanupStuck();
   }
   private function cleanupStuck(): void
   {
      // Operations stuck in processing > 30 min → reschedule
      global $wpdb;
      $table = $wpdb->prefix . BASE . '_operation_queue';
      $stuck = $wpdb->get_results($wpdb->prepare("
            SELECT id FROM {$table}
            WHERE state = 'processing'
            AND started_at < %s
        ", date('Y-m-d H:i:s', strtotime('-30 minutes'))));
      foreach ($stuck as $row) {
         $op = $this->storage->find($row->id);
         if ($op) {
            $op->state = 'scheduled';
            $op->scheduledAt = date('Y-m-d H:i:s', time() + 60);
            $op->retries++;
            $this->storage->save($op);
         }
      }
   }
   // === Public Getters ===
@@ -266,6 +199,7 @@
    */
   public function retry(string $id, int $userId): bool
   {
      $op = $this->get($id);
      if (!$op || $op->userId !== $userId) {
         return false;
@@ -282,7 +216,7 @@
      $op->lastErrorHash = null;
      $op->scheduledAt = current_time('mysql');
      $op->retries++;
      error_log('[Queue]Retrying operation '.print_r($op->id, true));
      $saved = $this->storage->save($op);
      if ($saved) {
@@ -323,15 +257,9 @@
            default       => null,
         };
      }
      return $this->storage->save($op);
      error_log('[Queue]: updating operation '.print_r($op->id, true));
      return $this->storage->saveProgress($op);
   }
   public function isLocked(): bool
   {
      return $this->locker->isLocked();
   }
   // === Private Helpers ===
   private function buildOperation(string $type, int $userId, array $data, array $options): Operation