storage = new Storage(); $this->registry = new TypeRegistry(); $this->locker = new Locker('queue_processor', 300); $executor = new FilteredExecutor($this->storage); $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker); add_action('jvb_process_queue', [$this, 'checkQueue']); add_action('jvb_queue_maintenance', [$this, 'maintenance']); if (!wp_next_scheduled('jvb_process_queue')) { wp_schedule_event(time(), 'every-minute', 'jvb_process_queue'); } if (!wp_next_scheduled('jvb_queue_maintenance')) { wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance'); } } /** * Access type registry for registering operation configs */ public function registry(): TypeRegistry { return $this->registry; } /** * Queue a new operation or merge into existing * * @param string $type Operation type * @param int $userId User ID * @param array $data Request data * @param array $options { * @type string $priority 'low', 'normal', 'high' * @type int $delay Seconds to delay processing * @type string $scheduled Specific datetime to process * @type array|string $depends_on Operation IDs this depends on * @type string|array $chunk_key Key(s) to chunk (overrides registry) * @type int $chunk_size Chunk size (overrides registry) * } */ public function add(string $type, int $userId, array $data, array $options = []): array|WP_Error { try { $incoming = $this->buildOperation($type, $userId, $data, $options); $mergeable = $this->registry->getMergeable($type); $existingById = $this->storage->find($incoming->id); if ($existingById) { // Operation with this ID already exists if (in_array($existingById->state, ['pending', 'scheduled']) && $mergeable) { // Still pending and mergeable, merge into it $merged = $mergeable->merge($existingById, $incoming); $this->storage->save($merged); $this->runQueueOnShutdown(); return [ 'success' => true, 'operation_id' => $merged->id, 'updated_existing' => true, ]; } else { // Already processing/completed, or not mergeable - generate new ID $incoming->id = 'u' . $userId . '_' . time() . '_' . uniqid(); JVB()->error()->log( '[Queue]:add', 'Duplicate ID for non-mergeable operation, generated new ID', [ 'type' => $type, 'existing_state' => $existingById->state, ], 'warning' ); } } if ($mergeable) { $existing = $this->storage->findMergeable($type, $userId); if ($existing && $mergeable->canMerge($existing, $incoming)) { $merged = $mergeable->merge($existing, $incoming); $this->storage->save($merged); $this->runQueueOnShutdown(); return [ 'success' => true, 'operation_id' => $merged->id, 'updated_existing' => true, ]; } } $this->storage->insert($incoming); $this->runQueueOnShutdown(); return [ 'success' => true, 'operation_id' => $incoming->id, 'updated_existing' => false, ]; } catch (\Exception $e) { JVB()->error()->log('queue', $e->getMessage(), $data, 'high'); return new WP_Error('queue_failed', $e->getMessage()); } } /** * Alias for add() - backwards compatibility */ public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error { return $this->add($type, $userId, $data, $options); } public function checkQueue(): void { if (!$this->storage->getQueueInfo()['has_items']) { return; } $this->processor->run(); } public function maintenance(): void { if ($this->locker->isLocked()) { return; } $this->cleanupStuck(); } private function cleanupStuck(): void { // Operations stuck in processing > 30 min → reschedule global $wpdb; $table = $wpdb->prefix . BASE . '_operation_queue'; $stuck = $wpdb->get_results($wpdb->prepare(" SELECT id FROM {$table} WHERE state = 'processing' AND started_at < %s ", date('Y-m-d H:i:s', strtotime('-30 minutes')))); foreach ($stuck as $row) { $op = $this->storage->find($row->id); if ($op) { $op->state = 'scheduled'; $op->scheduledAt = date('Y-m-d H:i:s', time() + 60); $op->retries++; $this->storage->save($op); } } } // === Public Getters === public function get(string $id): ?Operation { return $this->storage->find($id); } /** * Alias for get() - backwards compatibility */ public function getOperation(string $id): ?Operation { return $this->get($id); } /** * Get a specific value from an operation - backwards compatibility */ public function getOperationValue(string $id, string $column, bool $decodeJson = true): mixed { $op = $this->get($id); if (!$op) { return null; } return match($column) { 'result' => $op->result, 'request_data' => $op->requestData, 'state' => $op->state, 'outcome' => $op->outcome, 'type' => $op->type, 'user_id' => $op->userId, 'metadata' => $op->metadata, 'dependencies' => $op->dependencies, default => null, }; } public function getUserOperations(int $userId, array $filters = []): array { return $this->storage->getUserOperations($userId, $filters); } public function getStatus(): array { return $this->storage->getQueueStatus(); } public function getUserStats(int $userId): array { return $this->storage->getUserStats($userId); } public function getInfo(): array { return $this->storage->getQueueInfo(); } public function dismiss(string $id): bool { return $this->storage->dismiss($id); } /** * Cancel a pending/scheduled operation (deletes it) * * @param string $id Operation ID * @param int $userId User ID (for ownership verification) * @return bool True if cancelled */ public function cancel(string $id, int $userId): bool { $op = $this->get($id); if (!$op || $op->userId !== $userId) { return false; } // Can only cancel pending or scheduled operations if (!in_array($op->state, ['pending', 'scheduled'])) { return false; } return $this->storage->delete($id); } /** * Retry a failed operation * * @param string $id Operation ID * @param int $userId User ID (for ownership verification) * @return bool True if reset for retry */ public function retry(string $id, int $userId): bool { $op = $this->get($id); if (!$op || $op->userId !== $userId) { return false; } // Can only retry completed operations with failed outcomes if ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent'])) { return false; } $op->state = 'pending'; $op->outcome = 'pending'; $op->errorMessage = null; $op->lastErrorHash = null; $op->scheduledAt = current_time('mysql'); $op->retries++; $saved = $this->storage->save($op); if ($saved) { $this->runQueueOnShutdown(); } return $saved; } /** * Update an operation's data or metadata * * @param string $id Operation ID * @param array $updates Fields to update (requestData, metadata, etc.) * @param int|null $userId Optional user ID for ownership verification * @return bool True if updated */ public function update(string $id, array $updates, ?int $userId = null): bool { $op = $this->get($id); if (!$op) { return false; } if ($userId !== null && $op->userId !== $userId) { return false; } // Apply allowed updates foreach ($updates as $field => $value) { match($field) { 'requestData' => $op->requestData = $value, 'metadata' => $op->metadata = array_merge($op->metadata, $value), 'priority' => $op->priority = $value, 'state' => $op->state = $value, 'outcome' => $op->outcome = $value, 'result' => $op->result = $value, default => null, }; } return $this->storage->save($op); } public function isLocked(): bool { return $this->locker->isLocked(); } // === Private Helpers === private function buildOperation(string $type, int $userId, array $data, array $options): Operation { $op = new Operation(); // Use provided operation_id or generate one $op->id = !empty($options['operation_id']) ? $options['operation_id'] : 'u' . $userId . '_' . uniqid('op_'); $op->type = $type; $op->userId = $userId; $op->requestData = $data; $op->priority = $options['priority'] ?? 'normal'; $op->state = !empty($options['delay']) || !empty($options['scheduled']) ? 'scheduled' : 'pending'; $op->scheduledAt = $this->calculateScheduledTime($options); // Chunk config: explicit options override registry $chunkConfig = null; if (!empty($options['chunk_key'])) { $chunkConfig = [ 'key' => $options['chunk_key'], 'size' => $options['chunk_size'] ?? 10, ]; } else { $chunkConfig = $this->registry->getChunkConfig($type); } if ($chunkConfig) { $op->metadata['chunk_key'] = $chunkConfig['key']; $op->metadata['chunk_size'] = $chunkConfig['size']; $op->totalItems = $this->countItems($data, $chunkConfig['key']); } // Dependencies if (!empty($options['depends_on'])) { $op->dependencies = is_string($options['depends_on']) ? explode(',', $options['depends_on']) : $options['depends_on']; } return $op; } private function countItems(array $data, string|array $keys): int { $keys = (array) $keys; $total = 0; foreach ($keys as $key) { if (isset($data[$key]) && is_array($data[$key])) { $total += count($data[$key]); } } return max(1, $total); } private function calculateScheduledTime(array $options): string { if (!empty($options['delay'])) { return date('Y-m-d H:i:s', current_time('timestamp') + (int)$options['delay']); } if (!empty($options['scheduled'])) { return $options['scheduled']; } return current_time('mysql'); } private function runQueueOnShutdown(): void { if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) { add_action('shutdown', [$this, 'processQueueOnShutdown'], 100); } } public function processQueueOnShutdown(): void { remove_action('shutdown', [$this, 'processQueueOnShutdown']); if (function_exists('fastcgi_finish_request')) { fastcgi_finish_request(); } $this->checkQueue(); } }