defineTables(); $this->cache = Cache::for('queue', DAY_IN_SECONDS); } public function hasProcessingOperations(): bool { return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1"); } public function fetchRunnable(int $offset = 0): array { $now = current_time('mysql'); $rows = $this->table->queryResults(" SELECT * FROM {table} WHERE state IN ('pending', 'scheduled') AND scheduled_at <= %s ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at LIMIT 10 OFFSET %d FOR UPDATE SKIP LOCKED ", [$now, $offset]); $total = count($rows); foreach ($rows as $row) { $dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; if (empty($dependencies)) { return [$this->rowToOperation($row)]; } $totalDep = count($dependencies); $completed = []; $notCompleted = []; foreach ($dependencies as $dep) { $dependency = $this->find($dep); if ($dependency) { if ($dependency->state === 'completed') { $completed[] = $dep; } else { $notCompleted[] = $dep; } } } if (count($completed) === $totalDep) { return [$this->rowToOperation($row)]; } } //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10 if ($total === 10) { return $this->fetchRunnable($offset + 10); } //If, for whatever reason, nothing still is found, there likely are none return []; } public function markProcessing(string $id): bool { $now = current_time('mysql'); $affected = $this->table->query(" UPDATE {table} SET state = 'processing', started_at = %s, updated_at = %s WHERE id = %s AND state IN ('pending', 'scheduled') ", [$now, $now, $id]); if ($affected > 0) { $op = $this->find($id); if ($op) $this->invalidateUser($op->userId); } return $affected > 0; } public function save(Operation $op): bool { $result = $this->table->update([ 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => $op->retries, 'last_error_hash' => $op->lastErrorHash, 'error_message' => $op->errorMessage, 'completed_at' => $op->completedAt, 'metadata' => json_encode($op->metadata), 'result' => $op->result ? json_encode($op->result) : null, 'dependencies' => json_encode($op->dependencies), 'merged_into' => $op->merged_into, 'user_dismissed' => $op->userDismissed ? 1 : 0, ], ['id' => $op->id] ); if ($result !== false) $this->invalidateUser($op->userId); return $result !== false; } /** * For saving a 'step' in an operation; usually after each chunk * @param Operation $op * @return bool */ public function saveProgress(Operation $op): bool { $result = $this->table->update([ 'processed_items' => $op->processedItems ?? 0, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'metadata' => $op->metadata ? json_encode($op->metadata) : null, 'result' => $op->result ? json_encode($op->result) : null, ], ['id' => $op->id, 'state' => 'processing']); // state guard preserved if ($result === false) return false; $this->invalidateUser($op->userId); return true; } /** * The operation is complete, let's progress to 'completed' * @param Operation $op * @return bool */ public function saveFinal(Operation $op): bool { if ($op->state !== 'completed') { throw new LogicException('saveFinal called without completed state'); } $result = $this->table->update([ 'state' => 'completed', 'outcome' => $op->outcome ?? 'success', 'processed_items' => $op->processedItems ?? 0, 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, 'result' => isset($op->result) ? wp_json_encode($op->result) : null, 'completed_at' => $op->completedAt ?? current_time('mysql'), ], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved if ($result === 0) return true; // already completed, not an error if ($result === false) return false; $this->invalidateQueueCache(); $this->invalidateUser($op->userId); return true; } public function insert(Operation $op): bool { $result = $this->table->insert([ 'id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'request_data' => json_encode($op->requestData), 'total_items' => $op->totalItems, 'processed_items' => $op->processedItems, 'failed_items' => null, 'priority' => $op->priority, 'state' => $op->state, 'outcome' => $op->outcome, 'retries' => 0, 'scheduled_at' => $op->scheduledAt ?? current_time('mysql'), 'metadata' => json_encode($op->metadata), 'dependencies' => json_encode($op->dependencies), 'merged_into' => null, ]); if ($result) { $this->invalidateUser($op->userId); } return $result !== false; } public function find(string $id): ?Operation { $row = $this->table->get(['id' => $id]); return $row ? $this->rowToOperation($row) : null; } public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation { $sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')"; $params = [$type, $userId]; foreach ($criteria as $key => $value) { if ($value === null) continue; $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s"; $params[] = '$.' . $key; $params[] = (string) $value; } $sql .= " ORDER BY created_at DESC LIMIT 1"; $rows = $this->table->queryResults($sql, $params); return !empty($rows) ? $this->rowToOperation($rows[0]) : null; } public function getUserOperations(int $userId, array $filters = []): array { $where = ['user_id = %d']; $params = [$userId]; if (!empty($filters['state'])) { $where[] = 'state = %s'; $params[] = $filters['state']; } if (!empty($filters['type'])) { $where[] = 'type = %s'; $params[] = $filters['type']; } if (!empty($filters['outcome'])) { $outcomes = (array) $filters['outcome']; $placeholders = implode(',', array_fill(0, count($outcomes), '%s')); $where[] = "outcome IN ($placeholders)"; $params = array_merge($params, $outcomes); } if (!empty($filters['not_dismissed'])) { $where[] = 'user_dismissed = 0'; } if (!empty($filters['active'])) { $where[] = "state IN ('pending', 'scheduled', 'processing')"; } if (!empty($filters['has_errors'])) { $where[] = "(error_message IS NOT NULL OR failed_items IS NOT NULL)"; } if (!empty($filters['ids'])) { $ids = (array) $filters['ids']; $placeholders = implode(',', array_fill(0, count($ids), '%s')); $where[] = "id IN ($placeholders)"; $params = array_merge($params, $ids); } // Order by state priority, then created_at $orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC"; $params[] = $filters['limit'] ?? 50; $rows = $this->table->queryResults( "SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d", $params ); return array_map([$this, 'rowToOperation'], $rows ?: []); } public function getQueueInfo(): array { $cached = $this->cache->get(self::CACHE_QUEUE_INFO); if ($cached !== false) return $cached; $now = current_time('mysql'); $row = $this->table->queryResults(" SELECT COUNT(*) as total, SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled FROM {table} WHERE state IN ('pending', 'processing', 'scheduled') ", [$now]); $row = $row[0] ?? null; $info = [ 'total' => (int) ($row->total ?? 0), 'active' => (int) ($row->active ?? 0), 'ready' => (int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0), 'has_items' => ((int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0)) > 0, ]; $this->cache->set(self::CACHE_QUEUE_INFO, $info, 30); return $info; } private function rowToOperation(object $row): Operation { $op = new Operation(); $op->id = $row->id; $op->type = $row->type; $op->userId = (int) $row->user_id; $op->requestData = json_decode($row->request_data ?? '{}', true) ?: []; $op->metadata = json_decode($row->metadata ?? '{}', true) ?: []; $op->dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; $op->totalItems = (int) $row->total_items; $op->processedItems = (int) $row->processed_items; $op->failedItems = $row->failed_items ? json_decode($row->failed_items, true) : null; $op->priority = $row->priority; $op->state = $row->state; $op->outcome = $row->outcome; $op->retries = (int) $row->retries; $op->lastErrorHash = $row->last_error_hash; $op->errorMessage = $row->error_message; $op->scheduledAt = $row->scheduled_at; $op->startedAt = $row->started_at; $op->completedAt = $row->completed_at; $op->result = $row->result ? json_decode($row->result, true) : null; $op->merged_into = $row->merged_into; $op->userDismissed = (bool) $row->user_dismissed; return $op; } public function getQueueStatus(): array { $now = current_time('mysql'); $rows = $this->table->queryResults(" SELECT state, COUNT(*) as count, SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready FROM {table} GROUP BY state ", [$now]); $indexed = array_column($rows, null, 'state'); return [ 'pending' => (int) ($indexed['pending']->count ?? 0), 'scheduled' => (int) ($indexed['scheduled']->count ?? 0), 'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0), 'processing' => (int) ($indexed['processing']->count ?? 0), 'completed' => (int) ($indexed['completed']->count ?? 0), ]; } public function getUserStats(int $userId): array { $rows = $this->table->queryResults( "SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome", [$userId] ); $stats = [ 'pending' => 0, 'scheduled' => 0, 'processing' => 0, 'completed' => 0, 'failed' => 0, 'failed_permanent' => 0, ]; foreach ($rows as $row) { if ($row->state === 'completed') { // Map outcome to frontend status match($row->outcome) { 'failed' => $stats['failed'] += (int) $row->count, 'failed_permanent' => $stats['failed_permanent'] += (int) $row->count, default => $stats['completed'] += (int) $row->count, }; } else { // Combine pending/scheduled for frontend $key = $row->state === 'scheduled' ? 'pending' : $row->state; if (isset($stats[$key])) { $stats[$key] += (int) $row->count; } } } return $stats; } public function dismiss(string $id): bool { return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false; } /** * Delete an operation from the queue */ public function delete(string $id): bool { $op = $this->find($id); $result = $this->table->delete(['id' => $id]); if ($result && $op) $this->invalidateUser($op->userId); return $result !== false; } public function invalidateQueueCache(): void { $this->cache->forget(self::CACHE_QUEUE_INFO); } private function invalidateUser(int $userId): void { Cache::for($userId.'_queue')->flush(); } public function withTransaction(callable $callback): mixed { $this->table->startTransaction(); try { $result = $callback(); $this->table->commit(); return $result; } catch (\Throwable $e) { $this->table->rollback(); error_log('[Storage] Transaction rolled back: ' . $e->getMessage()); throw $e; } } public function resetStuckOperations(int $stuckMinutes = 30): int { return $this->withTransaction(function () use ($stuckMinutes) { $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes")); $stuckIds = $this->table->queryResults( "SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE", [$cutoff] ); $stuckIds = array_column($stuckIds, 'id'); if (empty($stuckIds)) return 0; $placeholders = implode(',', array_fill(0, count($stuckIds), '%s')); return (int) $this->table->query( "UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})", array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds) ); }); } /** * @throws \Throwable */ public function replaceDependency(string $fromId, string $toId): int { return $this->withTransaction(function () use ($fromId, $toId) { return (int) $this->table->query( "UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s", ['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%'] ); }); } public function defineTables():void { $queue = CustomTable::for('_operation_queue'); $queue->setColumns([ 'id' => 'VARCHAR(64) NOT NULL', 'type' => 'VARCHAR(50) NOT NULL', 'user_id' => $queue->getUserIDType().' NOT NULL', 'request_data' => 'JSON NOT NULL CHECK (JSON_VALID(request_data))', 'total_items' => 'INT(11) NOT NULL DEFAULT 1', 'processed_items' => 'INT(11) DEFAULT 0', 'failed_items' => 'JSON', 'priority' => 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'', 'state' => 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'', 'outcome' => 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'', 'retries' => 'INT(11) DEFAULT 0', 'last_error_hash'=> 'CHAR(32) DEFAULT NULL', 'error_message' => 'TEXT', 'scheduled_at' => 'DATETIME DEFAULT NULL', 'started_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', 'completed_at' => 'DATETIME DEFAULT NULL', 'metadata' => 'JSON DEFAULT NULL', 'result' => 'JSON', 'dependencies' => 'JSON', 'merged_into' => 'VARCHAR(64) DEFAULT NULL', 'user_dismissed'=> 'tinyint(1) DEFAULT 0', 'created_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', 'updated_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', ]); $queue->setKeys([ ['key' => 'PRIMARY', 'value' => '(`id`)'], '`idx_run_queue` (`state`, `priority`, `scheduled_at`)', '`idx_user_ops` (`user_id`, `state`)', '`idx_user_type_pending` (`user_id`, `type`, `state`)', '`idx_completed_at` (`completed_at`)', '`idx_processing_stuck` (`state`, `started_at`)' ]); $queue->defineTable(); $this->table = $queue; $stats = CustomTable::for('stats__operation_queue'); $stats->setColumns([ 'id' => 'BIGINT unsigned AUTO_INCREMENT', 'date' => 'DATE NOT NULL', 'type' => 'VARCHAR(50) NOT NULL', // Only store what can't be queried from the main table later 'peak_queue_size' => 'INT NOT NULL DEFAULT 0', 'peak_memory_bytes' => 'BIGINT DEFAULT NULL', // Snapshot totals for post-purge historical view 'total_operations' => 'INT NOT NULL DEFAULT 0', 'successful_operations' => 'INT NOT NULL DEFAULT 0', 'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0', 'total_items_processed' => 'INT NOT NULL DEFAULT 0', 'created_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', 'updated_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', ]); $stats->setKeys([ ['key' => 'PRIMARY', 'value' => '(`id`)'], ['key' => 'UNIQUE', 'value' => '(`date`, `type`)'], '`date_idx` (`date`)', '`type_idx` (`type`)' ]); $stats->defineTable(); $this->stats = $stats; } public function snapshotDaily(): void { $today = current_time('Y-m-d'); $types = $this->table->queryResults( "SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s", [$today] ); foreach ($types as $row) { $stats = $this->table->queryResults(" SELECT COUNT(*) as total, SUM(outcome = 'success') as successful, SUM(outcome = 'failed_permanent') as failed_permanent, SUM(processed_items) as items_processed FROM {table} WHERE type = %s AND DATE(completed_at) = %s ", [$row->type, $today]); $s = $stats[0] ?? null; if (!$s) continue; $this->stats->table->query(" INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed) VALUES (%s, %s, %d, %d, %d, %d) ON DUPLICATE KEY UPDATE total_operations = VALUES(total_operations), successful_operations = VALUES(successful_operations), failed_permanent_operations = VALUES(failed_permanent_operations), total_items_processed = VALUES(total_items_processed), updated_at = NOW() ", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]); } } }