| | |
| | | ); |
| | | } |
| | | |
| | | public function fetchRunnable(int $limit = 10): array |
| | | public function fetchRunnable(int $offset = 0): array |
| | | { |
| | | $now = current_time('mysql'); |
| | | |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare(" |
| | | SELECT oq.* FROM {$this->table} oq |
| | | WHERE oq.state IN ('pending', 'scheduled') |
| | | AND oq.scheduled_at <= %s |
| | | AND NOT EXISTS ( |
| | | SELECT 1 |
| | | FROM JSON_TABLE( |
| | | COALESCE(NULLIF(oq.dependencies, 'null'), '[]'), |
| | | '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$') |
| | | ) AS deps |
| | | JOIN {$this->table} dep ON dep.id = deps.dep_id |
| | | WHERE dep.state != 'completed' |
| | | OR dep.outcome NOT IN ('success', 'partial') |
| | | ) |
| | | ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at |
| | | LIMIT %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $limit)); |
| | | $rows = $this->wpdb->get_results( |
| | | $this->wpdb->prepare(" |
| | | SELECT * |
| | | FROM {$this->table} |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND scheduled_at <= %s |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | scheduled_at |
| | | LIMIT 10 OFFSET %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $offset) |
| | | ); |
| | | |
| | | return array_map([$this, 'rowToOperation'], $rows ?: []); |
| | | $total = count($rows); |
| | | foreach ($rows as $row) { |
| | | $dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; |
| | | if (empty($dependencies)) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | $totalDep = count($dependencies); |
| | | $completed = []; |
| | | $notCompleted = []; |
| | | foreach ($dependencies as $dep) { |
| | | $dependency = $this->find($dep); |
| | | if ($dependency) { |
| | | if ($dependency->state === 'completed') { |
| | | $completed[] = $dep; |
| | | } else { |
| | | $notCompleted[] = $dep; |
| | | } |
| | | } |
| | | } |
| | | if (count($completed) === $totalDep) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | } |
| | | //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10 |
| | | if ($total === 10) { |
| | | return $this->fetchRunnable($offset + 10); |
| | | } |
| | | |
| | | //If, for whatever reason, nothing still is found, there likely are none |
| | | return []; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | public function markProcessing(string $id): bool |
| | | { |
| | | $now = current_time('mysql'); |
| | |
| | | 'metadata' => json_encode($op->metadata), |
| | | 'result' => $op->result ? json_encode($op->result) : null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'merged_into' => $op->merged_into, |
| | | 'user_dismissed' => $op->userDismissed ? 1 : 0, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | |
| | | return false; |
| | | } |
| | | |
| | | |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | | |
| | |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome?? 'success', |
| | | 'processed_items'=> $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ?? null, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | $this->invalidateQueueCache(); |
| | | |
| | | if ($updated === 0) { |
| | | return true; |
| | |
| | | error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | $this->invalidateQueueCache(); |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | |
| | | 'result' => null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'user_dismissed' => 0, |
| | | 'merged_into' => null, |
| | | 'created_at' => current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]); |
| | |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | } |
| | | |
| | | public function findMergeable(string $type, int $userId): ?Operation |
| | | public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation |
| | | { |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare( |
| | | "SELECT * FROM {$this->table} |
| | | WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled') |
| | | ORDER BY created_at DESC LIMIT 1", |
| | | $type, $userId |
| | | )); |
| | | $sql = "SELECT * FROM {$this->table} |
| | | WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')"; |
| | | $params = [$type, $userId]; |
| | | |
| | | foreach ($criteria as $key => $value) { |
| | | if ($value === null) { |
| | | continue; |
| | | } |
| | | $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s"; |
| | | $params[] = '$.' . $key; |
| | | $params[] = (string) $value; |
| | | } |
| | | |
| | | $sql .= " ORDER BY created_at DESC LIMIT 1"; |
| | | |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params)); |
| | | |
| | | $this->invalidateUser($userId); |
| | | |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | } |
| | |
| | | $op->startedAt = $row->started_at; |
| | | $op->completedAt = $row->completed_at; |
| | | $op->result = $row->result ? json_decode($row->result, true) : null; |
| | | $op->merged_into = $row->merged_into; |
| | | $op->userDismissed = (bool) $row->user_dismissed; |
| | | |
| | | return $op; |
| | |
| | | |
| | | private function invalidateUser(int $userId): void |
| | | { |
| | | $this->cache->forget($userId); |
| | | Cache::for($userId.'_queue')->flush(); |
| | | } |
| | | public function getLastError(): string |
| | | { |
| | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * @throws \Throwable |
| | | */ |
| | | public function replaceDependency(string $fromId, string $toId): int |
| | | { |
| | | return $this->withTransaction(function () use ($fromId, $toId) { |
| | | |
| | | // Only affect pending/scheduled operations |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET dependencies = REPLACE(dependencies, %s, %s), |
| | | updated_at = %s |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND dependencies LIKE %s |
| | | ", |
| | | '"' . $fromId . '"', |
| | | '"' . $toId . '"', |
| | | current_time('mysql'), |
| | | '%"' . $fromId . '"%' |
| | | )); |
| | | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | } |