Jake Vanderwerf
2026-02-17 a24a06002081ad71a78ffeff9072725ba39cf121
inc/managers/queue/Queue.php
@@ -5,22 +5,25 @@
}
use WP_Error;
use WP_REST_Request;
use WP_REST_Response;
class Queue
{
   private Storage $storage;
   private Processor $processor;
   private TypeRegistry $registry;
   public Executor $executor;
   private Locker $locker;
   public function __construct()
   {
      $this->storage = new Storage();
      $this->registry = new TypeRegistry();
      $this->locker = new Locker('queue_processor', 300);
      $this->locker = new Locker();
      $executor = new FilteredExecutor($this->storage);
      $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker);
      $this->executor = new FilteredExecutor();
      $this->processor = new Processor($this->storage, $this->executor, $this->registry);
      add_action('jvb_process_queue', [$this, 'checkQueue']);
      add_action('jvb_queue_maintenance', [$this, 'maintenance']);
@@ -31,6 +34,9 @@
      if (!wp_next_scheduled('jvb_queue_maintenance')) {
         wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
      }
      jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
      add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
   }
   /**
@@ -41,6 +47,11 @@
      return $this->registry;
   }
   public function storage(): Storage
   {
      return $this->storage;
   }
   /**
    * Queue a new operation or merge into existing
    *
@@ -60,22 +71,12 @@
   {
      try {
         $incoming = $this->buildOperation($type, $userId, $data, $options);
         $mergeable = $this->registry->getMergeable($type);
         if ($mergeable) {
            $existing = $this->storage->findMergeable($type, $userId);
            if ($existing && $mergeable->canMerge($existing, $incoming)) {
               $merged = $mergeable->merge($existing, $incoming);
               $this->storage->save($merged);
               $this->runQueueOnShutdown();
               return [
                  'success'          => true,
                  'operation_id'     => $merged->id,
                  'updated_existing' => true,
               ];
            }
         // Attempt pre-insert merge
         $merged = $this->tryMerge($incoming);
         if ($merged) {
            $this->runQueueOnShutdown();
            return $merged;
         }
         $this->storage->insert($incoming);
@@ -94,6 +95,59 @@
   }
   /**
    * Try to merge incoming operation into an existing pending/scheduled one.
    * Returns result array if merged, null if not.
    * @throws \Throwable
    */
   private function tryMerge(Operation $incoming): ?array
   {
      $mergeable = $this->registry->getMergeable($incoming->type);
      if (!$mergeable) {
         return null;
      }
      $criteria = $mergeable->matchCriteria($incoming);
      $existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
      if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
         return null;
      }
      $this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
         $mergeable->merge($existing, $incoming);
         $this->storage->replaceDependency(
            $incoming->id,
            $existing->id
         );
         // Merge dependency arrays safely
         $existing->dependencies = array_values(array_unique(
            array_merge($existing->dependencies, $incoming->dependencies)
         ));
         // Prevent self dependency
         $existing->dependencies = array_diff(
            $existing->dependencies,
            [$existing->id]
         );
         $this->storage->save($existing);
         $incoming->state = 'completed';
         $incoming->outcome = 'merged';
         $incoming->merged_into = $existing->id;
         $this->storage->saveFinal($incoming);
      });
      return [
         'success'          => true,
         'operation_id'     => $existing->id,
         'updated_existing' => true,
      ];
   }
   /**
    * Alias for add() - backwards compatibility
    */
   public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
@@ -103,43 +157,18 @@
   public function checkQueue(): void
   {
      if (!$this->storage->getQueueInfo()['has_items']) {
         return;
      }
      $this->locker->withLock(function () {
         $this->processor->run();
      });
      $this->processor->run();
   }
   public function maintenance(): void
   {
      if ($this->locker->isLocked()) {
         return;
      }
      $this->cleanupStuck();
   }
   private function cleanupStuck(): void
   {
      // Operations stuck in processing > 30 min → reschedule
      global $wpdb;
      $table = $wpdb->prefix . BASE . '_operation_queue';
      $stuck = $wpdb->get_results($wpdb->prepare("
            SELECT id FROM {$table}
            WHERE state = 'processing'
            AND started_at < %s
        ", date('Y-m-d H:i:s', strtotime('-30 minutes'))));
      foreach ($stuck as $row) {
         $op = $this->storage->find($row->id);
         if ($op) {
            $op->state = 'scheduled';
            $op->scheduledAt = date('Y-m-d H:i:s', time() + 60);
            $op->retries++;
            $this->storage->save($op);
         }
      }
      $this->locker->withLock(function () {
         $this->storage->resetStuckOperations(30);
      });
      $this->runQueueOnShutdown();
   }
   // === Public Getters ===
@@ -176,6 +205,7 @@
         'user_id'      => $op->userId,
         'metadata'     => $op->metadata,
         'dependencies' => $op->dependencies,
         'merged_into'  => $op->merged_into,
         default        => null,
      };
   }
@@ -236,6 +266,7 @@
    */
   public function retry(string $id, int $userId): bool
   {
      $op = $this->get($id);
      if (!$op || $op->userId !== $userId) {
         return false;
@@ -252,7 +283,7 @@
      $op->lastErrorHash = null;
      $op->scheduledAt = current_time('mysql');
      $op->retries++;
      error_log('[Queue]Retrying operation '.print_r($op->id, true));
      $saved = $this->storage->save($op);
      if ($saved) {
@@ -293,15 +324,9 @@
            default       => null,
         };
      }
      return $this->storage->save($op);
      error_log('[Queue]: updating operation '.print_r($op->id, true));
      return $this->storage->saveProgress($op);
   }
   public function isLocked(): bool
   {
      return $this->locker->isLocked();
   }
   // === Private Helpers ===
   private function buildOperation(string $type, int $userId, array $data, array $options): Operation
@@ -387,4 +412,48 @@
      $this->checkQueue();
   }
   public function registerAdminAction():void
   {
      $admin = JVB()->admin();
      $admin->registerAction(
         'Restart Stuck Operations',
         'restart-stuck-operations',
         'manage_options',
         'arrows-clockwise'
      );
      $admin->registerAction(
         'Unlock Queue',
         'unlock-operation-queue',
         'manage_options',
         'infinity'
      );
   }
   /**
    * @param WP_REST_Response $response
    * @param string $action
    *
    * @return bool|WP_REST_Response
    */
   public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
   {
      switch ($action) {
         case 'unlock-operation-queue':
            error_log('Unlocking Queue');
            $this->locker->unlock();
            return new WP_REST_Response([
               'success'   => true,
               'message'   => 'Unlocked Queue'
            ]);
         case 'restart-stuck-operations':
            error_log('Restarting stuck operations');
            $this->maintenance();
            return new WP_REST_Response([
               'success'   => true,
               'message'   => 'Restarted Stuck Operations'
            ]);
         default:
            return $response;
      }
   }
}