hasAdequateResources()) { error_log('[Processor] Insufficient resources to start processing'); return; } $ops = $this->storage->fetchRunnable(); if (empty($ops)) { return; } foreach ($ops as $op) { if ($op->state === 'completed') { return; } if (!$this->dependenciesSatisfied($op)) { continue; } if (!$this->storage->markProcessing($op->id)) { continue; } $this->processOne($op); usleep(10000); } $this->storage->invalidateQueueCache(); } private function processOne(Operation $op): void { if (get_transient(BASE.$op->id)) { return; } set_transient(BASE.$op->id, true, 500); $progress = new Progress($op); $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; $op->startedAt = current_time('mysql'); $op->state = 'processing'; $this->storage->saveProgress($op); try { $chunkKey = $op->metadata['chunk_key'] ?? null; // No transaction wrapping — executor handles its own $result = $chunkKey ? $this->executeChunked($op, $executor, $progress, $chunkKey) : $executor->execute($op, $progress); $op->state = 'completed'; $op->outcome = $result->outcome; $op->result = $result->result; $op->completedAt = current_time('mysql'); } catch (\Throwable $e) { error_log("[Processor] Exception caught: " . $e->getMessage()); $this->handleFailure($op, $e); } $this->saveOperation($op); } private function saveOperation(Operation $op): void { if ($op->state === 'completed') { $this->storage->saveFinal($op); } else { // Retryable failure — save as scheduled/failed without requiring 'completed' state $this->storage->save($op); } } private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result { $keys = (array) $chunkKey; $chunkSize = $op->metadata['chunk_size'] ?? 10; $offset = $op->metadata['chunk_offset'] ?? 0; $chunks = $this->buildChunks($op->requestData, $keys, $chunkSize); $totalChunks = count($chunks); $allResults = []; foreach ($chunks as $index => $chunk) { if ($index < $offset) { continue; } // Resource check before each chunk if (!$this->checkResourceLimits()) { error_log("[Processor] Resource limits reached, pausing at chunk {$index}"); $op->metadata['chunk_offset'] = $index; $op->state = 'scheduled'; $op->scheduledAt = date('Y-m-d H:i:s', time() + 5); $this->storage->save($op); return new Result( outcome: 'pending', result: [ 'paused_at_chunk' => $index, 'total_chunks' => $totalChunks, 'partial_results' => $allResults, ] ); } try { $chunkOp = clone $op; $chunkOp->requestData = array_merge( array_diff_key($op->requestData, array_flip($keys)), $chunk['data'] ); $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) { $result = $executor->execute($chunkOp, $progress); $op->metadata['chunk_offset'] = $index + 1; $this->storage->saveProgress($op); return $result; }; $chunkResult = $executeChunk(); // Merge results if (!empty($chunkResult->result)) { if (is_array($chunkResult->result)) { $allResults = array_merge_recursive($allResults, $chunkResult->result); } else { $allResults = $chunkResult->result; } } } catch (\Throwable $e) { error_log("[Processor] Chunk {$index} failed: " . $e->getMessage()); // Record failed items from this chunk foreach ($chunk['data'] as $key => $items) { foreach ($items as $item) { $progress->failItem($item, $e->getMessage()); } } // Continue to next chunk rather than failing entire operation // Remove this if you want fail-fast behavior } // Delay between chunks (skip after last chunk) if ($index < $totalChunks - 1) { usleep(50000); // 50ms } } // Determine final outcome $outcome = 'success'; if (!empty($op->failedItems)) { $failedCount = count($op->failedItems); $outcome = $failedCount === $op->totalItems ? 'failed' : 'partial'; } return new Result( outcome: $outcome, result: $allResults ); } private function buildChunks(array $data, array $keys, int $chunkSize): array { $chunks = []; foreach ($keys as $key) { if (!isset($data[$key]) || !is_array($data[$key])) { continue; } $items = $data[$key]; $itemChunks = array_chunk($items, $chunkSize, true); // preserve keys foreach ($itemChunks as $index => $chunkItems) { if (!isset($chunks[$index])) { $chunks[$index] = [ 'data' => [], 'count' => 0 ]; } $chunks[$index]['data'][$key] = $chunkItems; $chunks[$index]['count'] += count($chunkItems); } } return $chunks; } private function handleFailure(Operation $op, \Throwable $e): void { $hash = md5($e->getMessage()); if ($op->lastErrorHash === $hash) { // Same error twice → permanent failure $op->state = 'completed'; $op->outcome = 'failed_permanent'; $op->completedAt = current_time('mysql'); } else { // New error → schedule retry $op->retries++; $op->lastErrorHash = $hash; $op->state = 'scheduled'; $op->scheduledAt = $this->calculateBackoff($op->retries); } $op->errorMessage = $e->getMessage(); JVB()->error()->log( '[Queue]:processOne', $e->getMessage(), [ 'operation_id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'retries' => $op->retries, ], $op->outcome === 'failed_permanent' ? 'critical' : 'warning' ); } private function calculateBackoff(int $attempt): string { $delay = min(30 * pow(2, $attempt - 1), 3600); $jitter = rand(0, (int)($delay * 0.1)); return date('Y-m-d H:i:s', time() + $delay + $jitter); } private function checkResourceLimits(): bool { // Check memory (leave 20% buffer) $memoryLimit = $this->getMemoryLimitBytes(); $memoryUsage = memory_get_usage(true); if ($memoryUsage > $memoryLimit * 0.8) { error_log('[Processor] Memory limit approaching, pausing'); return false; } // Check time (leave 30s buffer for cleanup) $maxTime = (int) ini_get('max_execution_time'); if ($maxTime > 0) { $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; if ($elapsed > $maxTime - 30) { error_log('[Processor] Time limit approaching, pausing'); return false; } } return true; } private function getMemoryLimitBytes(): int { $limit = ini_get('memory_limit'); if ($limit === '-1') { return PHP_INT_MAX; } $unit = strtolower(substr($limit, -1)); $value = (int) $limit; return match($unit) { 'g' => $value * 1024 * 1024 * 1024, 'm' => $value * 1024 * 1024, 'k' => $value * 1024, default => $value, }; } private function hasAdequateResources(): bool { // Stricter thresholds for starting (50% memory, 60s minimum time) $memoryLimit = $this->getMemoryLimitBytes(); $memoryUsage = memory_get_usage(true); if ($memoryUsage > $memoryLimit * 0.5) { return false; } $maxTime = (int) ini_get('max_execution_time'); if ($maxTime > 0) { $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; if ($maxTime - $elapsed < 60) { // Need at least 60s return false; } } return true; } private function dependenciesSatisfied(Operation $op): bool { if (empty($op->dependencies)) { return true; } foreach ($op->dependencies as $depId) { $dep = $this->storage->find($depId); // Missing dependency = block (or decide to ignore; your call) if (!$dep) { return false; } if ($dep->state !== 'completed') { return false; } if (!in_array($dep->outcome, ['success', 'partial'], true)) { return false; } } return true; } }