wpdb = $wpdb; $this->table = $wpdb->prefix . BASE . '_operation_queue'; $this->cache = CacheManager::for('queue', DAY_IN_SECONDS); } public function hasProcessingOperations(): bool { return (bool) $this->wpdb->get_var( "SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1" ); } public function fetchRunnable(int $limit = 10): array { $now = current_time('mysql'); $rows = $this->wpdb->get_results($this->wpdb->prepare(" SELECT oq.* FROM {$this->table} oq WHERE oq.state IN ('pending', 'scheduled') AND oq.scheduled_at <= %s AND NOT EXISTS ( SELECT 1 FROM JSON_TABLE( COALESCE(NULLIF(oq.dependencies, 'null'), '[]'), '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$') ) AS deps JOIN {$this->table} dep ON dep.id = deps.dep_id WHERE dep.state != 'completed' OR dep.outcome NOT IN ('success', 'partial') ) ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at LIMIT %d ", $now, $limit)); return array_map([$this, 'rowToOperation'], $rows ?: []); } public function markProcessing(string $id): bool { $now = current_time('mysql'); $affected = $this->wpdb->query($this->wpdb->prepare(" UPDATE {$this->table} SET state = 'processing', started_at = %s, updated_at = %s WHERE id = %s AND state IN ('pending', 'scheduled') ", $now, $now, $id)); if ($affected > 0) { $op = $this->find($id); if ($op) { $this->invalidateUser($op->userId); } } return $affected > 0; } public function save(Operation $op): bool { $data = [ 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => $op->retries, 'last_error_hash' => $op->lastErrorHash, 'error_message' => $op->errorMessage, 'scheduled_at' => $op->scheduledAt, 'started_at' => $op->startedAt, 'completed_at' => $op->completedAt, 'metadata' => json_encode($op->metadata), 'result' => $op->result ? json_encode($op->result) : null, 'dependencies' => json_encode($op->dependencies), 'user_dismissed' => $op->userDismissed ? 1 : 0, 'updated_at' => current_time('mysql'), ]; $result = $this->wpdb->update($this->table, $data, ['id' => $op->id]); if ($result !== false) { $this->invalidateUser($op->userId); } return $result !== false; } public function insert(Operation $op): bool { $result = $this->wpdb->insert($this->table, [ 'id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => 0, 'last_error_hash' => null, 'error_message' => null, 'scheduled_at' => $op->scheduledAt ?? current_time('mysql'), 'started_at' => null, 'completed_at' => null, 'metadata' => json_encode($op->metadata), 'result' => null, 'dependencies' => json_encode($op->dependencies), 'user_dismissed' => 0, 'created_at' => current_time('mysql'), 'updated_at' => current_time('mysql'), ]); if ($result) { $this->invalidateUser($op->userId); } return $result !== false; } public function find(string $id): ?Operation { $row = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE id = %s", $id )); return $row ? $this->rowToOperation($row) : null; } public function findMergeable(string $type, int $userId): ?Operation { $row = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled') ORDER BY created_at DESC LIMIT 1", $type, $userId )); return $row ? $this->rowToOperation($row) : null; } public function getUserOperations(int $userId, array $filters = []): array { $where = ['user_id = %d']; $params = [$userId]; if (!empty($filters['state'])) { $where[] = 'state = %s'; $params[] = $filters['state']; } if (!empty($filters['type'])) { $where[] = 'type = %s'; $params[] = $filters['type']; } if (!empty($filters['outcome'])) { $outcomes = (array) $filters['outcome']; $placeholders = implode(',', array_fill(0, count($outcomes), '%s')); $where[] = "outcome IN ($placeholders)"; $params = array_merge($params, $outcomes); } if (!empty($filters['not_dismissed'])) { $where[] = 'user_dismissed = 0'; } if (!empty($filters['active'])) { $where[] = "state IN ('pending', 'scheduled', 'processing')"; } if (!empty($filters['has_errors'])) { $where[] = "(error_message IS NOT NULL OR failed_items IS NOT NULL)"; } if (!empty($filters['ids'])) { $ids = (array) $filters['ids']; $placeholders = implode(',', array_fill(0, count($ids), '%s')); $where[] = "id IN ($placeholders)"; $params = array_merge($params, $ids); } // Order by state priority, then created_at $orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC"; $limit = $filters['limit'] ?? 50; $params[] = $limit; $rows = $this->wpdb->get_results($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d", ...$params )); return array_map([$this, 'rowToOperation'], $rows ?: []); } public function getQueueInfo(): array { $cached = $this->cache->get(self::CACHE_QUEUE_INFO); if ($cached !== false) { return $cached; } $now = current_time('mysql'); $row = $this->wpdb->get_row($this->wpdb->prepare(" SELECT COUNT(*) as total, SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled FROM {$this->table} WHERE state IN ('pending', 'processing', 'scheduled') ", $now)); $info = [ 'total' => (int) ($row->total ?? 0), 'active' => (int) ($row->active ?? 0), 'ready' => (int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0), 'has_items' => ((int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0)) > 0, ]; $this->cache->set(self::CACHE_QUEUE_INFO, $info, 30); return $info; } private function rowToOperation(object $row): Operation { $op = new Operation(); $op->id = $row->id; $op->type = $row->type; $op->userId = (int) $row->user_id; $op->requestData = json_decode($row->request_data ?? '{}', true) ?: []; $op->metadata = json_decode($row->metadata ?? '{}', true) ?: []; $op->dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; $op->totalItems = (int) $row->total_items; $op->processedItems = (int) $row->processed_items; $op->failedItems = $row->failed_items ? json_decode($row->failed_items, true) : null; $op->priority = $row->priority; $op->state = $row->state; $op->outcome = $row->outcome; $op->retries = (int) $row->retries; $op->lastErrorHash = $row->last_error_hash; $op->errorMessage = $row->error_message; $op->scheduledAt = $row->scheduled_at; $op->startedAt = $row->started_at; $op->completedAt = $row->completed_at; $op->result = $row->result ? json_decode($row->result, true) : null; $op->userDismissed = (bool) $row->user_dismissed; return $op; } public function getQueueStatus(): array { $now = current_time('mysql'); $rows = $this->wpdb->get_results($this->wpdb->prepare(" SELECT state, COUNT(*) as count, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready FROM {$this->table} GROUP BY state ", $now), OBJECT_K); return [ 'pending' => (int) ($rows['pending']->count ?? 0), 'scheduled' => (int) ($rows['scheduled']->count ?? 0), 'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0), 'processing' => (int) ($rows['processing']->count ?? 0), 'completed' => (int) ($rows['completed']->count ?? 0), ]; } public function getUserStats(int $userId): array { $rows = $this->wpdb->get_results($this->wpdb->prepare(" SELECT state, outcome, COUNT(*) as count FROM {$this->table} WHERE user_id = %d GROUP BY state, outcome ", $userId)); $stats = [ 'pending' => 0, 'scheduled' => 0, 'processing' => 0, 'completed' => 0, 'failed' => 0, 'failed_permanent' => 0, ]; foreach ($rows as $row) { if ($row->state === 'completed') { // Map outcome to frontend status match($row->outcome) { 'failed' => $stats['failed'] += (int) $row->count, 'failed_permanent' => $stats['failed_permanent'] += (int) $row->count, default => $stats['completed'] += (int) $row->count, }; } else { // Combine pending/scheduled for frontend $key = $row->state === 'scheduled' ? 'pending' : $row->state; if (isset($stats[$key])) { $stats[$key] += (int) $row->count; } } } return $stats; } public function dismiss(string $id): bool { $result = $this->wpdb->update( $this->table, ['user_dismissed' => 1, 'updated_at' => current_time('mysql')], ['id' => $id] ); return $result !== false; } /** * Delete an operation from the queue */ public function delete(string $id): bool { $op = $this->find($id); $userId = $op?->userId; $result = $this->wpdb->delete($this->table, ['id' => $id]); if ($result && $userId) { $this->invalidateUser($userId); } return $result !== false; } public function invalidateQueueCache(): void { $this->cache->delete(self::CACHE_QUEUE_INFO); $this->cache->touch(); } private function invalidateUser(int $userId): void { CacheManager::invalidateAll("user_{$userId}"); $this->cache->delete(self::CACHE_QUEUE_INFO); } }