hasAdequateResources()) { error_log('[Processor] Insufficient resources to start processing'); return; } $ops = $this->storage->fetchRunnable(3); $lastOpId = null; foreach ($ops as $op) { if (!$this->storage->markProcessing($op->id)) { continue; } $lastOpId = $op->id; $this->processOne($op); usleep(10000); } $this->storage->invalidateQueueCache(); } private function processOne(Operation $op): void { $progress = new Progress($op); $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor; $op->startedAt = current_time('mysql'); $op->state = 'processing'; $this->storage->saveProgress($op); //Check to see if we can merge first $mergeable = $this->registry->getMergeable($op->type); if ($mergeable) { $existing = $this->storage->findMergeable( $op->type, $op->userId ); if ($existing && $mergeable->canMerge($existing, $op)) { $this->applyMerge($mergeable, $existing, $op); return; } } try { // Check if this operation should be chunked $chunkKey = $op->metadata['chunk_key'] ?? null; $result = $chunkKey ? $this->executeChunked($op, $executor, $progress, $chunkKey) : $this->storage->withTransaction(fn() => $executor->execute($op, $progress)); $op->state = 'completed'; $op->outcome = $result->outcome; $op->result = $result->result; $op->completedAt = current_time('mysql'); } catch (\Throwable $e) { error_log("[Processor] Exception caught: " . $e->getMessage()); $this->handleFailure($op, $e); } $this->storage->saveFinal($op); } private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result { $keys = (array) $chunkKey; $chunkSize = $op->metadata['chunk_size'] ?? 10; $offset = $op->metadata['chunk_offset'] ?? 0; $chunks = $this->buildChunks($op->requestData, $keys, $chunkSize); $totalChunks = count($chunks); $allResults = []; foreach ($chunks as $index => $chunk) { if ($index < $offset) { continue; } // Resource check before each chunk if (!$this->checkResourceLimits()) { error_log("[Processor] Resource limits reached, pausing at chunk {$index}"); $op->metadata['chunk_offset'] = $index; $op->state = 'scheduled'; $op->scheduledAt = date('Y-m-d H:i:s', time() + 5); $this->storage->save($op); return new Result( outcome: 'pending', result: [ 'paused_at_chunk' => $index, 'total_chunks' => $totalChunks, 'partial_results' => $allResults, ] ); } try { $chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) { // Clone operation with only this chunk's data $chunkOp = clone $op; $chunkOp->requestData = array_merge( array_diff_key($op->requestData, array_flip($keys)), $chunk['data'] ); // Execute this chunk $result = $executor->execute($chunkOp, $progress); // Update progress $op->metadata['chunk_offset'] = $index + 1; $this->storage->saveProgress($op); return $result; }); // Merge results if (!empty($chunkResult->result)) { if (is_array($chunkResult->result)) { $allResults = array_merge_recursive($allResults, $chunkResult->result); } else { $allResults = $chunkResult->result; } } } catch (\Throwable $e) { error_log("[Processor] Chunk {$index} failed: " . $e->getMessage()); // Record failed items from this chunk foreach ($chunk['data'] as $key => $items) { foreach ($items as $item) { $progress->failItem($item, $e->getMessage()); } } // Continue to next chunk rather than failing entire operation // Remove this if you want fail-fast behavior } // Delay between chunks (skip after last chunk) if ($index < $totalChunks - 1) { usleep(50000); // 50ms } } // Determine final outcome $outcome = 'success'; if (!empty($op->failedItems)) { $failedCount = count($op->failedItems); $outcome = $failedCount === $op->totalItems ? 'failed' : 'partial'; } return new Result( outcome: $outcome, result: $allResults ); } private function buildChunks(array $data, array $keys, int $chunkSize): array { $chunks = []; foreach ($keys as $key) { if (!isset($data[$key]) || !is_array($data[$key])) { continue; } $items = $data[$key]; $itemChunks = array_chunk($items, $chunkSize, true); // preserve keys foreach ($itemChunks as $index => $chunkItems) { if (!isset($chunks[$index])) { $chunks[$index] = [ 'data' => [], 'count' => 0 ]; } $chunks[$index]['data'][$key] = $chunkItems; $chunks[$index]['count'] += count($chunkItems); } } return $chunks; } private function handleFailure(Operation $op, \Throwable $e): void { $hash = md5($e->getMessage()); if ($op->lastErrorHash === $hash) { // Same error twice → permanent failure $op->state = 'completed'; $op->outcome = 'failed_permanent'; $op->completedAt = current_time('mysql'); } else { // New error → schedule retry $op->retries++; $op->lastErrorHash = $hash; $op->state = 'scheduled'; $op->scheduledAt = $this->calculateBackoff($op->retries); } $op->errorMessage = $e->getMessage(); JVB()->error()->log( '[Queue]:processOne', $e->getMessage(), [ 'operation_id' => $op->id, 'type' => $op->type, 'user_id' => $op->userId, 'retries' => $op->retries, ], $op->outcome === 'failed_permanent' ? 'critical' : 'warning' ); } private function calculateBackoff(int $attempt): string { $delay = min(30 * pow(2, $attempt - 1), 3600); $jitter = rand(0, (int)($delay * 0.1)); return date('Y-m-d H:i:s', time() + $delay + $jitter); } private function applyMerge( Mergeable $mergeable, Operation $target, Operation $incoming ): void { // Safety: only merge into actively processing ops if ($target->state !== 'processing') { return; } $this->storage->withTransaction(function () use ($mergeable, $target, $incoming) { $mergeable->merge($target, $incoming); $target->dependencies[] = $incoming->id; $target->dependencies = array_values(array_unique($target->dependencies)); $this->storage->saveProgress($target); $incoming->state = 'completed'; $incoming->outcome = 'success'; $incoming->completedAt = current_time('mysql'); $incoming->result = ['merged_into' => $target->id]; $this->storage->saveFinal($incoming); }); } private function checkResourceLimits(): bool { // Check memory (leave 20% buffer) $memoryLimit = $this->getMemoryLimitBytes(); $memoryUsage = memory_get_usage(true); if ($memoryUsage > $memoryLimit * 0.8) { error_log('[Processor] Memory limit approaching, pausing'); return false; } // Check time (leave 30s buffer for cleanup) $maxTime = (int) ini_get('max_execution_time'); if ($maxTime > 0) { $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; if ($elapsed > $maxTime - 30) { error_log('[Processor] Time limit approaching, pausing'); return false; } } return true; } private function getMemoryLimitBytes(): int { $limit = ini_get('memory_limit'); if ($limit === '-1') { return PHP_INT_MAX; } $unit = strtolower(substr($limit, -1)); $value = (int) $limit; return match($unit) { 'g' => $value * 1024 * 1024 * 1024, 'm' => $value * 1024 * 1024, 'k' => $value * 1024, default => $value, }; } private function hasAdequateResources(): bool { // Stricter thresholds for starting (50% memory, 60s minimum time) $memoryLimit = $this->getMemoryLimitBytes(); $memoryUsage = memory_get_usage(true); if ($memoryUsage > $memoryLimit * 0.5) { return false; } $maxTime = (int) ini_get('max_execution_time'); if ($maxTime > 0) { $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']; if ($maxTime - $elapsed < 60) { // Need at least 60s return false; } } return true; } }