| | |
| | | private Storage $storage, |
| | | private Executor $defaultExecutor, |
| | | private TypeRegistry $registry, |
| | | private Locker $locker |
| | | ) {} |
| | | |
| | | public function run(): void |
| | | { |
| | | $this->locker->withLock(function () { |
| | | $ops = $this->storage->fetchRunnable(10); |
| | | if (!$this->hasAdequateResources()) { |
| | | error_log('[Processor] Insufficient resources to start processing'); |
| | | return; |
| | | } |
| | | |
| | | foreach ($ops as $op) { |
| | | if (!$this->storage->markProcessing($op->id)) { |
| | | continue; |
| | | $op = null; |
| | | $this->storage->withTransaction(function() use (&$op) { |
| | | $candidates = $this->storage->fetchRunnable(); |
| | | foreach ($candidates as $candidate) { |
| | | if ($candidate->state === 'completed') continue; |
| | | if (!$this->dependenciesSatisfied($candidate)) continue; |
| | | if ($this->storage->markProcessing($candidate->id)) { |
| | | $op = $candidate; |
| | | break; |
| | | } |
| | | |
| | | $this->processOne($op); |
| | | } |
| | | |
| | | $this->storage->invalidateQueueCache(); |
| | | }); |
| | | |
| | | if (!$op) return; |
| | | |
| | | $this->processOne($op); |
| | | usleep(10000); |
| | | |
| | | $this->storage->invalidateQueueCache(); |
| | | } |
| | | |
| | | |
| | | private function processOne(Operation $op): void |
| | | { |
| | | $progress = new Progress($op); |
| | | $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; |
| | | $op->startedAt = current_time('mysql'); |
| | | $op->state = 'processing'; |
| | | $this->storage->saveProgress($op); |
| | | |
| | | try { |
| | | $result = $executor->execute($op, $progress); |
| | | $chunkKey = $op->metadata['chunk_key'] ?? null; |
| | | |
| | | // No transaction wrapping — executor handles its own |
| | | $result = $chunkKey |
| | | ? $this->executeChunked($op, $executor, $progress, $chunkKey) |
| | | : $executor->execute($op, $progress); |
| | | |
| | | $op->state = 'completed'; |
| | | $op->outcome = $result->outcome; |
| | |
| | | $op->completedAt = current_time('mysql'); |
| | | |
| | | } catch (\Throwable $e) { |
| | | error_log("[Processor] Exception caught: " . $e->getMessage()); |
| | | $this->handleFailure($op, $e); |
| | | } |
| | | |
| | | $this->storage->save($op); |
| | | $this->saveOperation($op); |
| | | } |
| | | private function saveOperation(Operation $op): void |
| | | { |
| | | if ($op->state === 'completed') { |
| | | $this->storage->saveFinal($op); |
| | | } else { |
| | | // Retryable failure — save as scheduled/failed without requiring 'completed' state |
| | | $this->storage->save($op); |
| | | } |
| | | } |
| | | |
| | | private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result |
| | | { |
| | | $keys = (array) $chunkKey; |
| | | $chunkSize = $op->metadata['chunk_size'] ?? 10; |
| | | $offset = $op->metadata['chunk_offset'] ?? 0; |
| | | $chunks = $this->buildChunks($op->requestData, $keys, $chunkSize); |
| | | $totalChunks = count($chunks); |
| | | |
| | | $allResults = []; |
| | | |
| | | foreach ($chunks as $index => $chunk) { |
| | | if ($index < $offset) { |
| | | continue; |
| | | } |
| | | |
| | | // Resource check before each chunk |
| | | if (!$this->checkResourceLimits()) { |
| | | error_log("[Processor] Resource limits reached, pausing at chunk {$index}"); |
| | | |
| | | $op->metadata['chunk_offset'] = $index; |
| | | $op->state = 'scheduled'; |
| | | $op->scheduledAt = date('Y-m-d H:i:s', time() + 5); |
| | | $this->storage->save($op); |
| | | |
| | | return new Result( |
| | | outcome: 'pending', |
| | | result: [ |
| | | 'paused_at_chunk' => $index, |
| | | 'total_chunks' => $totalChunks, |
| | | 'partial_results' => $allResults, |
| | | ] |
| | | ); |
| | | } |
| | | |
| | | try { |
| | | $chunkOp = clone $op; |
| | | $chunkOp->requestData = array_merge( |
| | | array_diff_key($op->requestData, array_flip($keys)), |
| | | $chunk['data'] |
| | | ); |
| | | |
| | | $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) { |
| | | $result = $executor->execute($chunkOp, $progress); |
| | | $op->metadata['chunk_offset'] = $index + 1; |
| | | $this->storage->saveProgress($op); |
| | | return $result; |
| | | }; |
| | | |
| | | $chunkResult = $executeChunk(); |
| | | |
| | | // Merge results |
| | | if (!empty($chunkResult->result)) { |
| | | if (is_array($chunkResult->result)) { |
| | | $allResults = array_merge_recursive($allResults, $chunkResult->result); |
| | | } else { |
| | | $allResults = $chunkResult->result; |
| | | } |
| | | } |
| | | |
| | | } catch (\Throwable $e) { |
| | | error_log("[Processor] Chunk {$index} failed: " . $e->getMessage()); |
| | | |
| | | // Record failed items from this chunk |
| | | foreach ($chunk['data'] as $key => $items) { |
| | | foreach ($items as $item) { |
| | | $progress->failItem($item, $e->getMessage()); |
| | | } |
| | | } |
| | | |
| | | // Continue to next chunk rather than failing entire operation |
| | | // Remove this if you want fail-fast behavior |
| | | } |
| | | |
| | | // Delay between chunks (skip after last chunk) |
| | | if ($index < $totalChunks - 1) { |
| | | usleep(50000); // 50ms |
| | | } |
| | | } |
| | | |
| | | // Determine final outcome |
| | | $outcome = 'success'; |
| | | if (!empty($op->failedItems)) { |
| | | $failedCount = count($op->failedItems); |
| | | $outcome = $failedCount === $op->totalItems ? 'failed' : 'partial'; |
| | | } |
| | | |
| | | return new Result( |
| | | outcome: $outcome, |
| | | result: $allResults |
| | | ); |
| | | } |
| | | |
| | | private function buildChunks(array $data, array $keys, int $chunkSize): array |
| | | { |
| | | $chunks = []; |
| | | |
| | | foreach ($keys as $key) { |
| | | if (!isset($data[$key]) || !is_array($data[$key])) { |
| | | continue; |
| | | } |
| | | |
| | | $items = $data[$key]; |
| | | $itemChunks = array_chunk($items, $chunkSize, true); // preserve keys |
| | | |
| | | foreach ($itemChunks as $index => $chunkItems) { |
| | | if (!isset($chunks[$index])) { |
| | | $chunks[$index] = [ |
| | | 'data' => [], |
| | | 'count' => 0 |
| | | ]; |
| | | } |
| | | |
| | | $chunks[$index]['data'][$key] = $chunkItems; |
| | | $chunks[$index]['count'] += count($chunkItems); |
| | | } |
| | | } |
| | | |
| | | return $chunks; |
| | | } |
| | | |
| | | private function handleFailure(Operation $op, \Throwable $e): void |
| | |
| | | $jitter = rand(0, (int)($delay * 0.1)); |
| | | return date('Y-m-d H:i:s', time() + $delay + $jitter); |
| | | } |
| | | |
| | | private function checkResourceLimits(): bool |
| | | { |
| | | // Check memory (leave 20% buffer) |
| | | $memoryLimit = $this->getMemoryLimitBytes(); |
| | | $memoryUsage = memory_get_usage(true); |
| | | if ($memoryUsage > $memoryLimit * 0.8) { |
| | | error_log('[Processor] Memory limit approaching, pausing'); |
| | | return false; |
| | | } |
| | | |
| | | // Check time (leave 30s buffer for cleanup) |
| | | $maxTime = (int) ini_get('max_execution_time'); |
| | | if ($maxTime > 0) { |
| | | $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; |
| | | if ($elapsed > $maxTime - 30) { |
| | | error_log('[Processor] Time limit approaching, pausing'); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | private function getMemoryLimitBytes(): int |
| | | { |
| | | $limit = ini_get('memory_limit'); |
| | | if ($limit === '-1') { |
| | | return PHP_INT_MAX; |
| | | } |
| | | |
| | | $unit = strtolower(substr($limit, -1)); |
| | | $value = (int) $limit; |
| | | |
| | | return match($unit) { |
| | | 'g' => $value * 1024 * 1024 * 1024, |
| | | 'm' => $value * 1024 * 1024, |
| | | 'k' => $value * 1024, |
| | | default => $value, |
| | | }; |
| | | } |
| | | |
| | | private function hasAdequateResources(): bool |
| | | { |
| | | // Stricter thresholds for starting (50% memory, 60s minimum time) |
| | | $memoryLimit = $this->getMemoryLimitBytes(); |
| | | $memoryUsage = memory_get_usage(true); |
| | | if ($memoryUsage > $memoryLimit * 0.5) { |
| | | return false; |
| | | } |
| | | |
| | | $maxTime = (int) ini_get('max_execution_time'); |
| | | if ($maxTime > 0) { |
| | | $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; |
| | | if ($maxTime - $elapsed < 60) { // Need at least 60s |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | private function dependenciesSatisfied(Operation $op): bool |
| | | { |
| | | if (empty($op->dependencies)) { |
| | | return true; |
| | | } |
| | | foreach ($op->dependencies as $depId) { |
| | | $dep = $this->storage->find($depId); |
| | | |
| | | // Missing dependency = block (or decide to ignore; your call) |
| | | if (!$dep) { |
| | | return false; |
| | | } |
| | | |
| | | if ($dep->state !== 'completed') { |
| | | return false; |
| | | } |
| | | |
| | | if (!in_array($dep->outcome, ['success', 'partial'], true)) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | } |