| | |
| | | } |
| | | |
| | | use JVBase\managers\Cache; |
| | | use LogicException; |
| | | |
| | | class Storage |
| | | { |
| | |
| | | $now = current_time('mysql'); |
| | | |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare(" |
| | | SELECT oq.* FROM {$this->table} oq |
| | | WHERE oq.state IN ('pending', 'scheduled') |
| | | AND oq.scheduled_at <= %s |
| | | AND NOT EXISTS ( |
| | | SELECT 1 |
| | | FROM JSON_TABLE( |
| | | COALESCE(NULLIF(oq.dependencies, 'null'), '[]'), |
| | | '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$') |
| | | ) AS deps |
| | | JOIN {$this->table} dep ON dep.id = deps.dep_id |
| | | WHERE dep.state != 'completed' |
| | | OR dep.outcome NOT IN ('success', 'partial') |
| | | ) |
| | | ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at |
| | | LIMIT %d |
| | | ", $now, $limit)); |
| | | SELECT oq.* FROM {$this->table} oq |
| | | WHERE oq.state IN ('pending', 'scheduled') |
| | | AND oq.scheduled_at <= %s |
| | | AND NOT EXISTS ( |
| | | SELECT 1 |
| | | FROM JSON_TABLE( |
| | | COALESCE(NULLIF(oq.dependencies, 'null'), '[]'), |
| | | '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$') |
| | | ) AS deps |
| | | JOIN {$this->table} dep ON dep.id = deps.dep_id |
| | | WHERE dep.state != 'completed' |
| | | OR dep.outcome NOT IN ('success', 'partial') |
| | | ) |
| | | ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at |
| | | LIMIT %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $limit)); |
| | | |
| | | return array_map([$this, 'rowToOperation'], $rows ?: []); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | public function markProcessing(string $id): bool |
| | | { |
| | | $now = current_time('mysql'); |
| | |
| | | return $result !== false; |
| | | } |
| | | |
| | | /** |
| | | * For saving a 'step' in an operation; usually after each chunk |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveProgress(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | $table = $this->table; |
| | | |
| | | $data = [ |
| | | 'processed_items' => $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null, |
| | | 'result' => ($op->result) ? json_encode($op->result) :null, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | |
| | | // IMPORTANT: never touch terminal state |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * The operation is complete, let's progress to 'completed' |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveFinal(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | if (($op->state?? null) !== 'completed') { |
| | | throw new LogicException('saveFinal called without completed state'); |
| | | } |
| | | |
| | | $table = $this->table; |
| | | |
| | | $data = [ |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome?? 'success', |
| | | 'processed_items'=> $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ?? null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | |
| | | // HARD GUARD: cannot overwrite completed |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | $this->invalidateQueueCache(); |
| | | |
| | | if ($updated === 0) { |
| | | return true; |
| | | } |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | public function insert(Operation $op): bool |
| | | { |
| | | $result = $this->wpdb->insert($this->table, [ |
| | |
| | | { |
| | | $this->cache->forget($userId); |
| | | } |
| | | public function getLastError(): string |
| | | { |
| | | return $this->wpdb->last_error; |
| | | } |
| | | |
| | | public function withTransaction(callable $callback): mixed |
| | | { |
| | | $this->wpdb->query('START TRANSACTION'); |
| | | |
| | | try { |
| | | $result = $callback(); |
| | | $this->wpdb->query('COMMIT'); |
| | | return $result; |
| | | } catch (\Throwable $e) { |
| | | $this->wpdb->query('ROLLBACK'); |
| | | error_log('[Storage] Transaction rolled back: ' . $e->getMessage()); |
| | | throw $e; |
| | | } |
| | | } |
| | | |
| | | public function resetStuckOperations(int $stuckMinutes = 30): int |
| | | { |
| | | return $this->withTransaction(function () use ($stuckMinutes) { |
| | | $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes")); |
| | | |
| | | // Get IDs first for logging |
| | | $stuckIds = $this->wpdb->get_col($this->wpdb->prepare(" |
| | | SELECT id FROM {$this->table} |
| | | WHERE state = 'processing' |
| | | AND started_at < %s |
| | | FOR UPDATE |
| | | ", $cutoff)); |
| | | |
| | | if (empty($stuckIds)) { |
| | | return 0; |
| | | } |
| | | |
| | | // Reset them |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET state = 'scheduled', |
| | | scheduled_at = %s, |
| | | retries = retries + 1, |
| | | updated_at = %s |
| | | WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ") |
| | | ", |
| | | date('Y-m-d H:i:s', time() + 60), |
| | | current_time('mysql'), |
| | | ...$stuckIds |
| | | )); |
| | | |
| | | // Optional: Log to audit table |
| | | // $this->logStuckReset($stuckIds); |
| | | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | } |