| | |
| | | ); |
| | | } |
| | | |
| | | public function fetchRunnable(int $limit = 10): array |
| | | public function fetchRunnable(int $offset = 0): array |
| | | { |
| | | $now = current_time('mysql'); |
| | | |
| | |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | scheduled_at |
| | | LIMIT %d |
| | | LIMIT 10 OFFSET %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $limit) |
| | | ", $now, $offset) |
| | | ); |
| | | |
| | | return array_map([$this, 'rowToOperation'], $rows ?: []); |
| | | $total = count($rows); |
| | | foreach ($rows as $row) { |
| | | $dependencies = json_decode($row->dependencies ?? '[]', true) ?: []; |
| | | if (empty($dependencies)) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | $totalDep = count($dependencies); |
| | | $completed = []; |
| | | $notCompleted = []; |
| | | foreach ($dependencies as $dep) { |
| | | $dependency = $this->find($dep); |
| | | if ($dependency) { |
| | | if ($dependency->state === 'completed') { |
| | | $completed[] = $dep; |
| | | } else { |
| | | $notCompleted[] = $dep; |
| | | } |
| | | } |
| | | } |
| | | if (count($completed) === $totalDep) { |
| | | return [$this->rowToOperation($row)]; |
| | | } |
| | | } |
| | | //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10 |
| | | if ($total === 10) { |
| | | return $this->fetchRunnable($offset + 10); |
| | | } |
| | | |
| | | //If, for whatever reason, nothing still is found, there likely are none |
| | | return []; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | public function markProcessing(string $id): bool |
| | | { |
| | | $now = current_time('mysql'); |
| | |
| | | 'metadata' => json_encode($op->metadata), |
| | | 'result' => $op->result ? json_encode($op->result) : null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'merged_into' => $op->merged_into, |
| | | 'user_dismissed' => $op->userDismissed ? 1 : 0, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome?? 'success', |
| | | 'processed_items'=> $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ?? null, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | |
| | | 'result' => null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'user_dismissed' => 0, |
| | | 'merged_into' => null, |
| | | 'created_at' => current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]); |
| | |
| | | $op->startedAt = $row->started_at; |
| | | $op->completedAt = $row->completed_at; |
| | | $op->result = $row->result ? json_decode($row->result, true) : null; |
| | | $op->merged_into = $row->merged_into; |
| | | $op->userDismissed = (bool) $row->user_dismissed; |
| | | |
| | | return $op; |
| | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * @throws \Throwable |
| | | */ |
| | | public function replaceDependency(string $fromId, string $toId): int |
| | | { |
| | | return $this->withTransaction(function () use ($fromId, $toId) { |
| | | |
| | | // Only affect pending/scheduled operations |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET dependencies = REPLACE(dependencies, %s, %s), |
| | | updated_at = %s |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND dependencies LIKE %s |
| | | ", |
| | | '"' . $fromId . '"', |
| | | '"' . $toId . '"', |
| | | current_time('mysql'), |
| | | '%"' . $fromId . '"%' |
| | | )); |
| | | |
| | | return (int) $affected; |
| | | }); |
| | | } |
| | | |
| | | } |