| | |
| | | return; |
| | | } |
| | | |
| | | $ops = $this->storage->fetchRunnable(3); |
| | | |
| | | $lastOpId = null; |
| | | foreach ($ops as $op) { |
| | | if (!$this->storage->markProcessing($op->id)) { |
| | | continue; |
| | | $op = null; |
| | | $this->storage->withTransaction(function() use (&$op) { |
| | | $candidates = $this->storage->fetchRunnable(); |
| | | foreach ($candidates as $candidate) { |
| | | if ($candidate->state === 'completed') continue; |
| | | if (!$this->dependenciesSatisfied($candidate)) continue; |
| | | if ($this->storage->markProcessing($candidate->id)) { |
| | | $op = $candidate; |
| | | break; |
| | | } |
| | | } |
| | | $lastOpId = $op->id; |
| | | $this->processOne($op); |
| | | usleep(10000); |
| | | } |
| | | }); |
| | | |
| | | if (!$op) return; |
| | | |
| | | $this->processOne($op); |
| | | usleep(10000); |
| | | |
| | | $this->storage->invalidateQueueCache(); |
| | | } |
| | | |
| | | |
| | | private function processOne(Operation $op): void |
| | | { |
| | | $progress = new Progress($op); |
| | | |
| | | $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; |
| | | $op->startedAt = current_time('mysql'); |
| | | $op->state = 'processing'; |
| | | |
| | | $this->storage->saveProgress($op); |
| | | |
| | | //Check to see if we can merge first |
| | | $mergeable = $this->registry->getMergeable($op->type); |
| | | |
| | | if ($mergeable) { |
| | | $existing = $this->storage->findMergeable( |
| | | $op->type, |
| | | $op->userId |
| | | ); |
| | | |
| | | if ($existing && $mergeable->canMerge($existing, $op)) { |
| | | $this->applyMerge($mergeable, $existing, $op); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | try { |
| | | // Check if this operation should be chunked |
| | | $chunkKey = $op->metadata['chunk_key'] ?? null; |
| | | |
| | | // No transaction wrapping — executor handles its own |
| | | $result = $chunkKey |
| | | ? $this->executeChunked($op, $executor, $progress, $chunkKey) |
| | | : $this->storage->withTransaction(fn() => $executor->execute($op, $progress)); |
| | | : $executor->execute($op, $progress); |
| | | |
| | | $op->state = 'completed'; |
| | | $op->outcome = $result->outcome; |
| | |
| | | $this->handleFailure($op, $e); |
| | | } |
| | | |
| | | $this->storage->saveFinal($op); |
| | | $this->saveOperation($op); |
| | | } |
| | | private function saveOperation(Operation $op): void |
| | | { |
| | | if ($op->state === 'completed') { |
| | | $this->storage->saveFinal($op); |
| | | } else { |
| | | // Retryable failure — save as scheduled/failed without requiring 'completed' state |
| | | $this->storage->save($op); |
| | | } |
| | | } |
| | | |
| | | private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result |
| | |
| | | } |
| | | |
| | | try { |
| | | $chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) { |
| | | // Clone operation with only this chunk's data |
| | | $chunkOp = clone $op; |
| | | $chunkOp->requestData = array_merge( |
| | | array_diff_key($op->requestData, array_flip($keys)), |
| | | $chunk['data'] |
| | | ); |
| | | $chunkOp = clone $op; |
| | | $chunkOp->requestData = array_merge( |
| | | array_diff_key($op->requestData, array_flip($keys)), |
| | | $chunk['data'] |
| | | ); |
| | | |
| | | // Execute this chunk |
| | | $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) { |
| | | $result = $executor->execute($chunkOp, $progress); |
| | | |
| | | // Update progress |
| | | $op->metadata['chunk_offset'] = $index + 1; |
| | | $this->storage->saveProgress($op); |
| | | |
| | | return $result; |
| | | }); |
| | | }; |
| | | |
| | | $chunkResult = $executeChunk(); |
| | | |
| | | // Merge results |
| | | if (!empty($chunkResult->result)) { |
| | |
| | | return date('Y-m-d H:i:s', time() + $delay + $jitter); |
| | | } |
| | | |
| | | private function applyMerge( |
| | | Mergeable $mergeable, |
| | | Operation $target, |
| | | Operation $incoming |
| | | ): void { |
| | | // Safety: only merge into actively processing ops |
| | | if ($target->state !== 'processing') { |
| | | return; |
| | | } |
| | | |
| | | $this->storage->withTransaction(function () use ($mergeable, $target, $incoming) { |
| | | $mergeable->merge($target, $incoming); |
| | | |
| | | $target->dependencies[] = $incoming->id; |
| | | $target->dependencies = array_values(array_unique($target->dependencies)); |
| | | |
| | | $this->storage->saveProgress($target); |
| | | |
| | | $incoming->state = 'completed'; |
| | | $incoming->outcome = 'success'; |
| | | $incoming->completedAt = current_time('mysql'); |
| | | $incoming->result = ['merged_into' => $target->id]; |
| | | |
| | | $this->storage->saveFinal($incoming); |
| | | }); |
| | | } |
| | | |
| | | private function checkResourceLimits(): bool |
| | | { |
| | | // Check memory (leave 20% buffer) |
| | |
| | | |
| | | return true; |
| | | } |
| | | |
| | | private function dependenciesSatisfied(Operation $op): bool |
| | | { |
| | | if (empty($op->dependencies)) { |
| | | return true; |
| | | } |
| | | foreach ($op->dependencies as $depId) { |
| | | $dep = $this->storage->find($depId); |
| | | |
| | | // Missing dependency = block (or decide to ignore; your call) |
| | | if (!$dep) { |
| | | return false; |
| | | } |
| | | |
| | | if ($dep->state !== 'completed') { |
| | | return false; |
| | | } |
| | | |
| | | if (!in_array($dep->outcome, ['success', 'partial'], true)) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | } |