Jake Vanderwerf
2026-02-17 a24a06002081ad71a78ffeff9072725ba39cf121
inc/managers/queue/Queue.php
@@ -5,12 +5,15 @@
}
use WP_Error;
use WP_REST_Request;
use WP_REST_Response;
class Queue
{
   private Storage $storage;
   private Processor $processor;
   private TypeRegistry $registry;
   public Executor $executor;
   private Locker $locker;
   public function __construct()
@@ -19,8 +22,8 @@
      $this->registry = new TypeRegistry();
      $this->locker = new Locker();
      $executor = new FilteredExecutor($this->storage);
      $this->processor = new Processor($this->storage, $executor, $this->registry);
      $this->executor = new FilteredExecutor();
      $this->processor = new Processor($this->storage, $this->executor, $this->registry);
      add_action('jvb_process_queue', [$this, 'checkQueue']);
      add_action('jvb_queue_maintenance', [$this, 'maintenance']);
@@ -31,6 +34,9 @@
      if (!wp_next_scheduled('jvb_queue_maintenance')) {
         wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
      }
      jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
      add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
   }
   /**
@@ -66,6 +72,13 @@
      try {
         $incoming = $this->buildOperation($type, $userId, $data, $options);
         // Attempt pre-insert merge
         $merged = $this->tryMerge($incoming);
         if ($merged) {
            $this->runQueueOnShutdown();
            return $merged;
         }
         $this->storage->insert($incoming);
         $this->runQueueOnShutdown();
@@ -82,6 +95,59 @@
   }
   /**
    * Try to merge incoming operation into an existing pending/scheduled one.
    * Returns result array if merged, null if not.
    * @throws \Throwable
    */
   private function tryMerge(Operation $incoming): ?array
   {
      $mergeable = $this->registry->getMergeable($incoming->type);
      if (!$mergeable) {
         return null;
      }
      $criteria = $mergeable->matchCriteria($incoming);
      $existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
      if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
         return null;
      }
      $this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
         $mergeable->merge($existing, $incoming);
         $this->storage->replaceDependency(
            $incoming->id,
            $existing->id
         );
         // Merge dependency arrays safely
         $existing->dependencies = array_values(array_unique(
            array_merge($existing->dependencies, $incoming->dependencies)
         ));
         // Prevent self dependency
         $existing->dependencies = array_diff(
            $existing->dependencies,
            [$existing->id]
         );
         $this->storage->save($existing);
         $incoming->state = 'completed';
         $incoming->outcome = 'merged';
         $incoming->merged_into = $existing->id;
         $this->storage->saveFinal($incoming);
      });
      return [
         'success'          => true,
         'operation_id'     => $existing->id,
         'updated_existing' => true,
      ];
   }
   /**
    * Alias for add() - backwards compatibility
    */
   public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
@@ -102,7 +168,7 @@
      $this->locker->withLock(function () {
         $this->storage->resetStuckOperations(30);
      });
      $this->runQueueOnShutdown();
   }
   // === Public Getters ===
@@ -139,6 +205,7 @@
         'user_id'      => $op->userId,
         'metadata'     => $op->metadata,
         'dependencies' => $op->dependencies,
         'merged_into'  => $op->merged_into,
         default        => null,
      };
   }
@@ -345,4 +412,48 @@
      $this->checkQueue();
   }
   public function registerAdminAction():void
   {
      $admin = JVB()->admin();
      $admin->registerAction(
         'Restart Stuck Operations',
         'restart-stuck-operations',
         'manage_options',
         'arrows-clockwise'
      );
      $admin->registerAction(
         'Unlock Queue',
         'unlock-operation-queue',
         'manage_options',
         'infinity'
      );
   }
   /**
    * @param WP_REST_Response $response
    * @param string $action
    *
    * @return bool|WP_REST_Response
    */
   public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
   {
      switch ($action) {
         case 'unlock-operation-queue':
            error_log('Unlocking Queue');
            $this->locker->unlock();
            return new WP_REST_Response([
               'success'   => true,
               'message'   => 'Unlocked Queue'
            ]);
         case 'restart-stuck-operations':
            error_log('Restarting stuck operations');
            $this->maintenance();
            return new WP_REST_Response([
               'success'   => true,
               'message'   => 'Restarted Stuck Operations'
            ]);
         default:
            return $response;
      }
   }
}