| | |
| | | } |
| | | |
| | | use WP_Error; |
| | | use WP_REST_Request; |
| | | use WP_REST_Response; |
| | | |
| | | class Queue |
| | | { |
| | | private Storage $storage; |
| | | private Processor $processor; |
| | | private TypeRegistry $registry; |
| | | public Executor $executor; |
| | | private Locker $locker; |
| | | |
| | | public function __construct() |
| | | { |
| | | $this->storage = new Storage(); |
| | | $this->registry = new TypeRegistry(); |
| | | $this->locker = new Locker('queue_processor', 300); |
| | | $this->locker = new Locker(); |
| | | |
| | | $executor = new FilteredExecutor($this->storage); |
| | | $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker); |
| | | $this->executor = new FilteredExecutor(); |
| | | $this->processor = new Processor($this->storage, $this->executor, $this->registry); |
| | | |
| | | add_action('jvb_process_queue', [$this, 'checkQueue']); |
| | | add_action('jvb_queue_maintenance', [$this, 'maintenance']); |
| | |
| | | if (!wp_next_scheduled('jvb_queue_maintenance')) { |
| | | wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance'); |
| | | } |
| | | |
| | | jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']); |
| | | add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3); |
| | | } |
| | | |
| | | /** |
| | |
| | | return $this->registry; |
| | | } |
| | | |
| | | public function storage(): Storage |
| | | { |
| | | return $this->storage; |
| | | } |
| | | |
| | | /** |
| | | * Queue a new operation or merge into existing |
| | | * |
| | |
| | | { |
| | | try { |
| | | $incoming = $this->buildOperation($type, $userId, $data, $options); |
| | | $mergeable = $this->registry->getMergeable($type); |
| | | |
| | | if ($mergeable) { |
| | | $existing = $this->storage->findMergeable($type, $userId); |
| | | |
| | | if ($existing && $mergeable->canMerge($existing, $incoming)) { |
| | | $merged = $mergeable->merge($existing, $incoming); |
| | | $this->storage->save($merged); |
| | | $this->runQueueOnShutdown(); |
| | | |
| | | return [ |
| | | 'success' => true, |
| | | 'operation_id' => $merged->id, |
| | | 'updated_existing' => true, |
| | | ]; |
| | | } |
| | | // Attempt pre-insert merge |
| | | $merged = $this->tryMerge($incoming); |
| | | if ($merged) { |
| | | $this->runQueueOnShutdown(); |
| | | return $merged; |
| | | } |
| | | |
| | | $this->storage->insert($incoming); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Try to merge incoming operation into an existing pending/scheduled one. |
| | | * Returns result array if merged, null if not. |
| | | * @throws \Throwable |
| | | */ |
| | | private function tryMerge(Operation $incoming): ?array |
| | | { |
| | | $mergeable = $this->registry->getMergeable($incoming->type); |
| | | if (!$mergeable) { |
| | | return null; |
| | | } |
| | | |
| | | $criteria = $mergeable->matchCriteria($incoming); |
| | | |
| | | $existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria); |
| | | if (!$existing || !$mergeable->canMerge($existing, $incoming)) { |
| | | return null; |
| | | } |
| | | |
| | | $this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) { |
| | | $mergeable->merge($existing, $incoming); |
| | | |
| | | $this->storage->replaceDependency( |
| | | $incoming->id, |
| | | $existing->id |
| | | ); |
| | | |
| | | // Merge dependency arrays safely |
| | | $existing->dependencies = array_values(array_unique( |
| | | array_merge($existing->dependencies, $incoming->dependencies) |
| | | )); |
| | | |
| | | // Prevent self dependency |
| | | $existing->dependencies = array_diff( |
| | | $existing->dependencies, |
| | | [$existing->id] |
| | | ); |
| | | |
| | | $this->storage->save($existing); |
| | | |
| | | $incoming->state = 'completed'; |
| | | $incoming->outcome = 'merged'; |
| | | $incoming->merged_into = $existing->id; |
| | | $this->storage->saveFinal($incoming); |
| | | }); |
| | | |
| | | return [ |
| | | 'success' => true, |
| | | 'operation_id' => $existing->id, |
| | | 'updated_existing' => true, |
| | | ]; |
| | | } |
| | | |
| | | /** |
| | | * Alias for add() - backwards compatibility |
| | | */ |
| | | public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error |
| | |
| | | |
| | | public function checkQueue(): void |
| | | { |
| | | if (!$this->storage->getQueueInfo()['has_items']) { |
| | | return; |
| | | } |
| | | $this->locker->withLock(function () { |
| | | $this->processor->run(); |
| | | }); |
| | | |
| | | $this->processor->run(); |
| | | } |
| | | |
| | | public function maintenance(): void |
| | | { |
| | | if ($this->locker->isLocked()) { |
| | | return; |
| | | } |
| | | |
| | | $this->cleanupStuck(); |
| | | } |
| | | |
| | | private function cleanupStuck(): void |
| | | { |
| | | // Operations stuck in processing > 30 min → reschedule |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . BASE . '_operation_queue'; |
| | | |
| | | $stuck = $wpdb->get_results($wpdb->prepare(" |
| | | SELECT id FROM {$table} |
| | | WHERE state = 'processing' |
| | | AND started_at < %s |
| | | ", date('Y-m-d H:i:s', strtotime('-30 minutes')))); |
| | | |
| | | foreach ($stuck as $row) { |
| | | $op = $this->storage->find($row->id); |
| | | if ($op) { |
| | | $op->state = 'scheduled'; |
| | | $op->scheduledAt = date('Y-m-d H:i:s', time() + 60); |
| | | $op->retries++; |
| | | $this->storage->save($op); |
| | | } |
| | | } |
| | | $this->locker->withLock(function () { |
| | | $this->storage->resetStuckOperations(30); |
| | | }); |
| | | $this->runQueueOnShutdown(); |
| | | } |
| | | |
| | | // === Public Getters === |
| | |
| | | 'user_id' => $op->userId, |
| | | 'metadata' => $op->metadata, |
| | | 'dependencies' => $op->dependencies, |
| | | 'merged_into' => $op->merged_into, |
| | | default => null, |
| | | }; |
| | | } |
| | |
| | | */ |
| | | public function retry(string $id, int $userId): bool |
| | | { |
| | | |
| | | $op = $this->get($id); |
| | | if (!$op || $op->userId !== $userId) { |
| | | return false; |
| | |
| | | $op->lastErrorHash = null; |
| | | $op->scheduledAt = current_time('mysql'); |
| | | $op->retries++; |
| | | |
| | | error_log('[Queue]Retrying operation '.print_r($op->id, true)); |
| | | $saved = $this->storage->save($op); |
| | | |
| | | if ($saved) { |
| | |
| | | default => null, |
| | | }; |
| | | } |
| | | |
| | | return $this->storage->save($op); |
| | | error_log('[Queue]: updating operation '.print_r($op->id, true)); |
| | | return $this->storage->saveProgress($op); |
| | | } |
| | | |
| | | public function isLocked(): bool |
| | | { |
| | | return $this->locker->isLocked(); |
| | | } |
| | | |
| | | // === Private Helpers === |
| | | |
| | | private function buildOperation(string $type, int $userId, array $data, array $options): Operation |
| | |
| | | |
| | | $this->checkQueue(); |
| | | } |
| | | |
| | | public function registerAdminAction():void |
| | | { |
| | | $admin = JVB()->admin(); |
| | | $admin->registerAction( |
| | | 'Restart Stuck Operations', |
| | | 'restart-stuck-operations', |
| | | 'manage_options', |
| | | 'arrows-clockwise' |
| | | ); |
| | | $admin->registerAction( |
| | | 'Unlock Queue', |
| | | 'unlock-operation-queue', |
| | | 'manage_options', |
| | | 'infinity' |
| | | ); |
| | | } |
| | | /** |
| | | * @param WP_REST_Response $response |
| | | * @param string $action |
| | | * |
| | | * @return bool|WP_REST_Response |
| | | */ |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | | error_log('Unlocking Queue'); |
| | | $this->locker->unlock(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | | 'message' => 'Unlocked Queue' |
| | | ]); |
| | | case 'restart-stuck-operations': |
| | | error_log('Restarting stuck operations'); |
| | | $this->maintenance(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | | 'message' => 'Restarted Stuck Operations' |
| | | ]); |
| | | default: |
| | | return $response; |
| | | } |
| | | } |
| | | } |