wpdb = $wpdb; $this->table = $wpdb->prefix . BASE . '_operation_queue'; $this->cache = Cache::for('queue', DAY_IN_SECONDS); } public function hasProcessingOperations(): bool { return (bool) $this->wpdb->get_var( "SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1" ); } public function fetchRunnable(int $offset = 0): array { $now = current_time('mysql'); $rows = $this->wpdb->get_results( $this->wpdb->prepare(" SELECT * FROM {$this->table} WHERE state IN ('pending', 'scheduled') AND scheduled_at <= %s ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at LIMIT 10 OFFSET %d FOR UPDATE SKIP LOCKED ", $now, $offset) ); $total = count($rows); foreach ($rows as $row) { $dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; if (empty($dependencies)) { return [$this->rowToOperation($row)]; } $totalDep = count($dependencies); $completed = []; $notCompleted = []; foreach ($dependencies as $dep) { $dependency = $this->find($dep); if ($dependency) { if ($dependency->state === 'completed') { $completed[] = $dep; } else { $notCompleted[] = $dep; } } } if (count($completed) === $totalDep) { return [$this->rowToOperation($row)]; } } //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10 if ($total === 10) { return $this->fetchRunnable($offset + 10); } //If, for whatever reason, nothing still is found, there likely are none return []; } public function markProcessing(string $id): bool { $now = current_time('mysql'); $affected = $this->wpdb->query($this->wpdb->prepare(" UPDATE {$this->table} SET state = 'processing', started_at = %s, updated_at = %s WHERE id = %s AND state IN ('pending', 'scheduled') ", $now, $now, $id)); if ($affected > 0) { $op = $this->find($id); if ($op) { $this->invalidateUser($op->userId); } } return $affected > 0; } public function save(Operation $op): bool { $data = [ 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => $op->retries, 'last_error_hash' => $op->lastErrorHash, 'error_message' => $op->errorMessage, 'scheduled_at' => $op->scheduledAt, 'started_at' => $op->startedAt, 'completed_at' => $op->completedAt, 'metadata' => json_encode($op->metadata), 'result' => $op->result ? json_encode($op->result) : null, 'dependencies' => json_encode($op->dependencies), 'merged_into' => $op->merged_into, 'user_dismissed' => $op->userDismissed ? 1 : 0, 'updated_at' => current_time('mysql'), ]; $result = $this->wpdb->update($this->table, $data, ['id' => $op->id]); if ($result !== false) { $this->invalidateUser($op->userId); } return $result !== false; } /** * For saving a 'step' in an operation; usually after each chunk * @param Operation $op * @return bool */ public function saveProgress(Operation $op): bool { global $wpdb; $table = $this->table; $data = [ 'processed_items' => $op->processedItems ?? 0, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null, 'result' => ($op->result) ? json_encode($op->result) :null, 'updated_at' => current_time('mysql'), ]; // IMPORTANT: never touch terminal state $where = [ 'id' => $op->id, 'state' => 'processing', ]; $updated = $wpdb->update($table, $data, $where); if ($updated === false) { error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error); return false; } $this->invalidateUser($op->userId); return true; } /** * The operation is complete, let's progress to 'completed' * @param Operation $op * @return bool */ public function saveFinal(Operation $op): bool { global $wpdb; if (($op->state?? null) !== 'completed') { throw new LogicException('saveFinal called without completed state'); } $table = $this->table; $data = [ 'state' => 'completed', 'outcome' => $op->outcome?? 'success', 'processed_items'=> $op->processedItems ?? 0, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'result' => isset($op->result) ? wp_json_encode($op->result) : null, 'completed_at' => $op->completedAt ?? current_time('mysql'), 'updated_at' => current_time('mysql'), ]; // HARD GUARD: cannot overwrite completed $where = [ 'id' => $op->id, 'state' => 'processing', ]; $updated = $wpdb->update($table, $data, $where); if ($updated === 0) { return true; } if ($updated === false) { error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error); return false; } $this->invalidateQueueCache(); $this->invalidateUser($op->userId); return true; } public function insert(Operation $op): bool { $result = $this->wpdb->insert($this->table, [ 'id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => 0, 'last_error_hash' => null, 'error_message' => null, 'scheduled_at' => $op->scheduledAt ?? current_time('mysql'), 'started_at' => null, 'completed_at' => null, 'metadata' => json_encode($op->metadata), 'result' => null, 'dependencies' => json_encode($op->dependencies), 'user_dismissed' => 0, 'merged_into' => null, 'created_at' => current_time('mysql'), 'updated_at' => current_time('mysql'), ]); if ($result) { $this->invalidateUser($op->userId); } return $result !== false; } public function find(string $id): ?Operation { $row = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE id = %s", $id )); return $row ? $this->rowToOperation($row) : null; } public function findMergeable(string $type, int $userId): ?Operation { $row = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled') ORDER BY created_at DESC LIMIT 1", $type, $userId )); $this->invalidateUser($userId); return $row ? $this->rowToOperation($row) : null; } public function getUserOperations(int $userId, array $filters = []): array { $where = ['user_id = %d']; $params = [$userId]; if (!empty($filters['state'])) { $where[] = 'state = %s'; $params[] = $filters['state']; } if (!empty($filters['type'])) { $where[] = 'type = %s'; $params[] = $filters['type']; } if (!empty($filters['outcome'])) { $outcomes = (array) $filters['outcome']; $placeholders = implode(',', array_fill(0, count($outcomes), '%s')); $where[] = "outcome IN ($placeholders)"; $params = array_merge($params, $outcomes); } if (!empty($filters['not_dismissed'])) { $where[] = 'user_dismissed = 0'; } if (!empty($filters['active'])) { $where[] = "state IN ('pending', 'scheduled', 'processing')"; } if (!empty($filters['has_errors'])) { $where[] = "(error_message IS NOT NULL OR failed_items IS NOT NULL)"; } if (!empty($filters['ids'])) { $ids = (array) $filters['ids']; $placeholders = implode(',', array_fill(0, count($ids), '%s')); $where[] = "id IN ($placeholders)"; $params = array_merge($params, $ids); } // Order by state priority, then created_at $orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC"; $limit = $filters['limit'] ?? 50; $params[] = $limit; $rows = $this->wpdb->get_results($this->wpdb->prepare( "SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d", ...$params )); return array_map([$this, 'rowToOperation'], $rows ?: []); } public function getQueueInfo(): array { $cached = $this->cache->get(self::CACHE_QUEUE_INFO); if ($cached !== false) { return $cached; } $now = current_time('mysql'); $row = $this->wpdb->get_row($this->wpdb->prepare(" SELECT COUNT(*) as total, SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled FROM {$this->table} WHERE state IN ('pending', 'processing', 'scheduled') ", $now)); $info = [ 'total' => (int) ($row->total ?? 0), 'active' => (int) ($row->active ?? 0), 'ready' => (int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0), 'has_items' => ((int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0)) > 0, ]; $this->cache->set(self::CACHE_QUEUE_INFO, $info, 30); return $info; } private function rowToOperation(object $row): Operation { $op = new Operation(); $op->id = $row->id; $op->type = $row->type; $op->userId = (int) $row->user_id; $op->requestData = json_decode($row->request_data ?? '{}', true) ?: []; $op->metadata = json_decode($row->metadata ?? '{}', true) ?: []; $op->dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; $op->totalItems = (int) $row->total_items; $op->processedItems = (int) $row->processed_items; $op->failedItems = $row->failed_items ? json_decode($row->failed_items, true) : null; $op->priority = $row->priority; $op->state = $row->state; $op->outcome = $row->outcome; $op->retries = (int) $row->retries; $op->lastErrorHash = $row->last_error_hash; $op->errorMessage = $row->error_message; $op->scheduledAt = $row->scheduled_at; $op->startedAt = $row->started_at; $op->completedAt = $row->completed_at; $op->result = $row->result ? json_decode($row->result, true) : null; $op->merged_into = $row->merged_into; $op->userDismissed = (bool) $row->user_dismissed; return $op; } public function getQueueStatus(): array { $now = current_time('mysql'); $rows = $this->wpdb->get_results($this->wpdb->prepare(" SELECT state, COUNT(*) as count, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready FROM {$this->table} GROUP BY state ", $now), OBJECT_K); return [ 'pending' => (int) ($rows['pending']->count ?? 0), 'scheduled' => (int) ($rows['scheduled']->count ?? 0), 'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0), 'processing' => (int) ($rows['processing']->count ?? 0), 'completed' => (int) ($rows['completed']->count ?? 0), ]; } public function getUserStats(int $userId): array { $rows = $this->wpdb->get_results($this->wpdb->prepare(" SELECT state, outcome, COUNT(*) as count FROM {$this->table} WHERE user_id = %d GROUP BY state, outcome ", $userId)); $stats = [ 'pending' => 0, 'scheduled' => 0, 'processing' => 0, 'completed' => 0, 'failed' => 0, 'failed_permanent' => 0, ]; foreach ($rows as $row) { if ($row->state === 'completed') { // Map outcome to frontend status match($row->outcome) { 'failed' => $stats['failed'] += (int) $row->count, 'failed_permanent' => $stats['failed_permanent'] += (int) $row->count, default => $stats['completed'] += (int) $row->count, }; } else { // Combine pending/scheduled for frontend $key = $row->state === 'scheduled' ? 'pending' : $row->state; if (isset($stats[$key])) { $stats[$key] += (int) $row->count; } } } return $stats; } public function dismiss(string $id): bool { $result = $this->wpdb->update( $this->table, ['user_dismissed' => 1, 'updated_at' => current_time('mysql')], ['id' => $id] ); return $result !== false; } /** * Delete an operation from the queue */ public function delete(string $id): bool { $op = $this->find($id); $userId = $op?->userId; $result = $this->wpdb->delete($this->table, ['id' => $id]); if ($result && $userId) { $this->invalidateUser($userId); } return $result !== false; } public function invalidateQueueCache(): void { $this->cache->forget(self::CACHE_QUEUE_INFO); } private function invalidateUser(int $userId): void { Cache::for($userId.'_queue')->flush(); } public function getLastError(): string { return $this->wpdb->last_error; } public function withTransaction(callable $callback): mixed { $this->wpdb->query('START TRANSACTION'); try { $result = $callback(); $this->wpdb->query('COMMIT'); return $result; } catch (\Throwable $e) { $this->wpdb->query('ROLLBACK'); error_log('[Storage] Transaction rolled back: ' . $e->getMessage()); throw $e; } } public function resetStuckOperations(int $stuckMinutes = 30): int { return $this->withTransaction(function () use ($stuckMinutes) { $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes")); // Get IDs first for logging $stuckIds = $this->wpdb->get_col($this->wpdb->prepare(" SELECT id FROM {$this->table} WHERE state = 'processing' AND started_at < %s FOR UPDATE ", $cutoff)); if (empty($stuckIds)) { return 0; } // Reset them $affected = $this->wpdb->query($this->wpdb->prepare(" UPDATE {$this->table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ") ", date('Y-m-d H:i:s', time() + 60), current_time('mysql'), ...$stuckIds )); // Optional: Log to audit table // $this->logStuckReset($stuckIds); return (int) $affected; }); } /** * @throws \Throwable */ public function replaceDependency(string $fromId, string $toId): int { return $this->withTransaction(function () use ($fromId, $toId) { // Only affect pending/scheduled operations $affected = $this->wpdb->query($this->wpdb->prepare(" UPDATE {$this->table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s ", '"' . $fromId . '"', '"' . $toId . '"', current_time('mysql'), '%"' . $fromId . '"%' )); return (int) $affected; }); } }