| | |
| | | exit; |
| | | } |
| | | |
| | | use JVBase\managers\CacheManager; |
| | | use JVBase\managers\Cache; |
| | | use LogicException; |
| | | |
| | | class Storage |
| | | { |
| | | private \wpdb $wpdb; |
| | | private string $table; |
| | | private CacheManager $cache; |
| | | private Cache $cache; |
| | | |
| | | private const CACHE_USER_PREFIX = 'user_queue_'; |
| | | private const CACHE_QUEUE_INFO = 'queue_info'; |
| | | |
| | | public function __construct() |
| | |
| | | global $wpdb; |
| | | $this->wpdb = $wpdb; |
| | | $this->table = $wpdb->prefix . BASE . '_operation_queue'; |
| | | $this->cache = CacheManager::for('queue', DAY_IN_SECONDS); |
| | | $this->cache = Cache::for('queue', DAY_IN_SECONDS); |
| | | } |
| | | |
| | | public function hasProcessingOperations(): bool |
| | |
| | | ); |
| | | } |
| | | |
| | | public function fetchRunnable(int $limit = 10): array |
| | | public function fetchRunnable(int $offset = 0): array |
| | | { |
| | | $now = current_time('mysql'); |
| | | |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare(" |
| | | SELECT oq.* FROM {$this->table} oq |
| | | WHERE oq.state IN ('pending', 'scheduled') |
| | | AND oq.scheduled_at <= %s |
| | | AND NOT EXISTS ( |
| | | SELECT 1 |
| | | FROM JSON_TABLE( |
| | | COALESCE(NULLIF(oq.dependencies, 'null'), '[]'), |
| | | '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$') |
| | | ) AS deps |
| | | JOIN {$this->table} dep ON dep.id = deps.dep_id |
| | | WHERE dep.state != 'completed' |
| | | OR dep.outcome NOT IN ('success', 'partial') |
| | | ) |
| | | ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at |
| | | LIMIT %d |
| | | ", $now, $limit)); |
| | | $rows = $this->wpdb->get_results( |
| | | $this->wpdb->prepare(" |
| | | SELECT * |
| | | FROM {$this->table} |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND scheduled_at <= %s |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | scheduled_at |
| | | LIMIT 10 OFFSET %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $offset) |
| | | ); |
| | | |
| | | return array_map([$this, 'rowToOperation'], $rows ?: []); |
| | | $total = count($rows); |
| | | foreach ($rows as $row) { |
| | | $dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; |
| | | if (empty($dependencies)) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | $totalDep = count($dependencies); |
| | | $completed = []; |
| | | $notCompleted = []; |
| | | foreach ($dependencies as $dep) { |
| | | $dependency = $this->find($dep); |
| | | if ($dependency) { |
| | | if ($dependency->state === 'completed') { |
| | | $completed[] = $dep; |
| | | } else { |
| | | $notCompleted[] = $dep; |
| | | } |
| | | } |
| | | } |
| | | if (count($completed) === $totalDep) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | } |
| | | //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10 |
| | | if ($total === 10) { |
| | | return $this->fetchRunnable($offset + 10); |
| | | } |
| | | |
| | | //If, for whatever reason, nothing still is found, there likely are none |
| | | return []; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | public function markProcessing(string $id): bool |
| | | { |
| | | $now = current_time('mysql'); |
| | |
| | | 'metadata' => json_encode($op->metadata), |
| | | 'result' => $op->result ? json_encode($op->result) : null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'merged_into' => $op->merged_into, |
| | | 'user_dismissed' => $op->userDismissed ? 1 : 0, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | |
| | | return $result !== false; |
| | | } |
| | | |
| | | /** |
| | | * For saving a 'step' in an operation; usually after each chunk |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveProgress(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | $table = $this->table; |
| | | |
| | | $data = [ |
| | | 'processed_items' => $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null, |
| | | 'result' => ($op->result) ? json_encode($op->result) :null, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | |
| | | // IMPORTANT: never touch terminal state |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | |
| | | |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * The operation is complete, let's progress to 'completed' |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveFinal(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | if (($op->state?? null) !== 'completed') { |
| | | throw new LogicException('saveFinal called without completed state'); |
| | | } |
| | | |
| | | $table = $this->table; |
| | | |
| | | $data = [ |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome?? 'success', |
| | | 'processed_items'=> $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | |
| | | // HARD GUARD: cannot overwrite completed |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | |
| | | if ($updated === 0) { |
| | | return true; |
| | | } |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | $this->invalidateQueueCache(); |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | public function insert(Operation $op): bool |
| | | { |
| | | $result = $this->wpdb->insert($this->table, [ |
| | |
| | | 'result' => null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'user_dismissed' => 0, |
| | | 'merged_into' => null, |
| | | 'created_at' => current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]); |
| | |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | } |
| | | |
| | | public function findMergeable(string $type, int $userId): ?Operation |
| | | public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation |
| | | { |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare( |
| | | "SELECT * FROM {$this->table} |
| | | WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled') |
| | | ORDER BY created_at DESC LIMIT 1", |
| | | $type, $userId |
| | | )); |
| | | $sql = "SELECT * FROM {$this->table} |
| | | WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')"; |
| | | $params = [$type, $userId]; |
| | | |
| | | foreach ($criteria as $key => $value) { |
| | | if ($value === null) { |
| | | continue; |
| | | } |
| | | $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s"; |
| | | $params[] = '$.' . $key; |
| | | $params[] = (string) $value; |
| | | } |
| | | |
| | | $sql .= " ORDER BY created_at DESC LIMIT 1"; |
| | | |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params)); |
| | | |
| | | $this->invalidateUser($userId); |
| | | |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | } |
| | |
| | | $op->startedAt = $row->started_at; |
| | | $op->completedAt = $row->completed_at; |
| | | $op->result = $row->result ? json_decode($row->result, true) : null; |
| | | $op->merged_into = $row->merged_into; |
| | | $op->userDismissed = (bool) $row->user_dismissed; |
| | | |
| | | return $op; |
| | |
| | | |
| | | public function invalidateQueueCache(): void |
| | | { |
| | | $this->cache->delete(self::CACHE_QUEUE_INFO); |
| | | $this->cache->touch(); |
| | | $this->cache->forget(self::CACHE_QUEUE_INFO); |
| | | } |
| | | |
| | | private function invalidateUser(int $userId): void |
| | | { |
| | | CacheManager::invalidateAll("user_{$userId}"); |
| | | $this->cache->delete(self::CACHE_QUEUE_INFO); |
| | | Cache::for($userId.'_queue')->flush(); |
| | | } |
| | | public function getLastError(): string |
| | | { |
| | | return $this->wpdb->last_error; |
| | | } |
| | | |
| | | public function withTransaction(callable $callback): mixed |
| | | { |
| | | $this->wpdb->query('START TRANSACTION'); |
| | | |
| | | try { |
| | | $result = $callback(); |
| | | $this->wpdb->query('COMMIT'); |
| | | return $result; |
| | | } catch (\Throwable $e) { |
| | | $this->wpdb->query('ROLLBACK'); |
| | | error_log('[Storage] Transaction rolled back: ' . $e->getMessage()); |
| | | throw $e; |
| | | } |
| | | } |
| | | |
| | | public function resetStuckOperations(int $stuckMinutes = 30): int |
| | | { |
| | | return $this->withTransaction(function () use ($stuckMinutes) { |
| | | $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes")); |
| | | |
| | | // Get IDs first for logging |
| | | $stuckIds = $this->wpdb->get_col($this->wpdb->prepare(" |
| | | SELECT id FROM {$this->table} |
| | | WHERE state = 'processing' |
| | | AND started_at < %s |
| | | FOR UPDATE |
| | | ", $cutoff)); |
| | | |
| | | if (empty($stuckIds)) { |
| | | return 0; |
| | | } |
| | | |
| | | // Reset them |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET state = 'scheduled', |
| | | scheduled_at = %s, |
| | | retries = retries + 1, |
| | | updated_at = %s |
| | | WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ") |
| | | ", |
| | | date('Y-m-d H:i:s', time() + 60), |
| | | current_time('mysql'), |
| | | ...$stuckIds |
| | | )); |
| | | |
| | | // Optional: Log to audit table |
| | | // $this->logStuckReset($stuckIds); |
| | | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * @throws \Throwable |
| | | */ |
| | | public function replaceDependency(string $fromId, string $toId): int |
| | | { |
| | | return $this->withTransaction(function () use ($fromId, $toId) { |
| | | |
| | | // Only affect pending/scheduled operations |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET dependencies = REPLACE(dependencies, %s, %s), |
| | | updated_at = %s |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND dependencies LIKE %s |
| | | ", |
| | | '"' . $fromId . '"', |
| | | '"' . $toId . '"', |
| | | current_time('mysql'), |
| | | '%"' . $fromId . '"%' |
| | | )); |
| | | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | } |