| | |
| | | exit; |
| | | } |
| | | |
| | | use JVBase\managers\CustomTable; |
| | | use WP_Error; |
| | | use WP_REST_Request; |
| | | use WP_REST_Response; |
| | | |
| | | class Queue |
| | | { |
| | | private Storage $storage; |
| | | private Processor $processor; |
| | | private TypeRegistry $registry; |
| | | public Executor $executor; |
| | | private Locker $locker; |
| | | |
| | | public function __construct() |
| | |
| | | $this->registry = new TypeRegistry(); |
| | | $this->locker = new Locker(); |
| | | |
| | | $executor = new FilteredExecutor(); |
| | | $this->processor = new Processor($this->storage, $executor, $this->registry); |
| | | $this->executor = new FilteredExecutor(); |
| | | $this->processor = new Processor($this->storage, $this->executor, $this->registry); |
| | | |
| | | add_action('jvb_process_queue', [$this, 'checkQueue']); |
| | | add_action('jvb_queue_maintenance', [$this, 'maintenance']); |
| | | add_action('jvb_daily_snapshot', [$this->storage, 'snapshotDaily']); |
| | | |
| | | if (!wp_next_scheduled('jvb_process_queue')) { |
| | | wp_schedule_event(time(), 'every-minute', 'jvb_process_queue'); |
| | |
| | | if (!wp_next_scheduled('jvb_queue_maintenance')) { |
| | | wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance'); |
| | | } |
| | | if (!wp_next_scheduled('jvb_daily_snapshot')) { |
| | | // Schedule for next 3am |
| | | $next3am = strtotime('tomorrow 3am', current_time('timestamp')); |
| | | wp_schedule_event($next3am, 'daily', 'jvb_daily_snapshot'); |
| | | } |
| | | |
| | | jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']); |
| | | add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3); |
| | | } |
| | | |
| | | /** |
| | |
| | | try { |
| | | $incoming = $this->buildOperation($type, $userId, $data, $options); |
| | | |
| | | // Attempt pre-insert merge |
| | | $merged = $this->tryMerge($incoming); |
| | | if ($merged) { |
| | | $this->runQueueOnShutdown(); |
| | | return $merged; |
| | | } |
| | | |
| | | $this->storage->insert($incoming); |
| | | $this->runQueueOnShutdown(); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Try to merge incoming operation into an existing pending/scheduled one. |
| | | * Returns result array if merged, null if not. |
| | | * @throws \Throwable |
| | | */ |
| | | private function tryMerge(Operation $incoming): ?array |
| | | { |
| | | $mergeable = $this->registry->getMergeable($incoming->type); |
| | | if (!$mergeable) { |
| | | return null; |
| | | } |
| | | |
| | | $criteria = $mergeable->matchCriteria($incoming); |
| | | |
| | | $existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria); |
| | | if (!$existing || !$mergeable->canMerge($existing, $incoming)) { |
| | | return null; |
| | | } |
| | | |
| | | $this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) { |
| | | $mergeable->merge($existing, $incoming); |
| | | |
| | | $this->storage->replaceDependency( |
| | | $incoming->id, |
| | | $existing->id |
| | | ); |
| | | |
| | | // Merge dependency arrays safely |
| | | $existing->dependencies = array_values(array_unique( |
| | | array_merge($existing->dependencies, $incoming->dependencies) |
| | | )); |
| | | |
| | | // Prevent self dependency |
| | | $existing->dependencies = array_diff( |
| | | $existing->dependencies, |
| | | [$existing->id] |
| | | ); |
| | | |
| | | $this->storage->save($existing); |
| | | |
| | | $incoming->state = 'completed'; |
| | | $incoming->outcome = 'merged'; |
| | | $incoming->merged_into = $existing->id; |
| | | $this->storage->saveFinal($incoming); |
| | | }); |
| | | |
| | | return [ |
| | | 'success' => true, |
| | | 'operation_id' => $existing->id, |
| | | 'updated_existing' => true, |
| | | ]; |
| | | } |
| | | |
| | | /** |
| | | * Alias for add() - backwards compatibility |
| | | */ |
| | | public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error |
| | |
| | | $this->locker->withLock(function () { |
| | | $this->storage->resetStuckOperations(30); |
| | | }); |
| | | |
| | | $this->runQueueOnShutdown(); |
| | | } |
| | | |
| | | // === Public Getters === |
| | |
| | | 'user_id' => $op->userId, |
| | | 'metadata' => $op->metadata, |
| | | 'dependencies' => $op->dependencies, |
| | | 'merged_into' => $op->merged_into, |
| | | default => null, |
| | | }; |
| | | } |
| | |
| | | |
| | | $this->checkQueue(); |
| | | } |
| | | |
| | | public function registerAdminAction():void |
| | | { |
| | | $admin = JVB()->admin(); |
| | | $admin->registerAction( |
| | | 'Restart Stuck Operations', |
| | | 'restart-stuck-operations', |
| | | 'manage_options', |
| | | 'arrows-clockwise' |
| | | ); |
| | | $admin->registerAction( |
| | | 'Unlock Queue', |
| | | 'unlock-operation-queue', |
| | | 'manage_options', |
| | | 'infinity' |
| | | ); |
| | | } |
| | | /** |
| | | * @param WP_REST_Response $response |
| | | * @param string $action |
| | | * |
| | | * @return bool|WP_REST_Response |
| | | */ |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | | error_log('Unlocking Queue'); |
| | | $this->locker->unlock(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | | 'message' => 'Unlocked Queue' |
| | | ]); |
| | | case 'restart-stuck-operations': |
| | | error_log('Restarting stuck operations'); |
| | | $this->maintenance(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | | 'message' => 'Restarted Stuck Operations' |
| | | ]); |
| | | default: |
| | | return $response; |
| | | } |
| | | } |
| | | } |