From d7e7d248cbe41cd7a9ef9c2fb022b6c4831f99a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Sun, 31 May 2026 15:22:56 +0000
Subject: [PATCH] =jakevan complete

---
 inc/managers/queue/Processor.php |  260 +++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 248 insertions(+), 12 deletions(-)

diff --git a/inc/managers/queue/Processor.php b/inc/managers/queue/Processor.php
index 0f02650..f54fadd 100644
--- a/inc/managers/queue/Processor.php
+++ b/inc/managers/queue/Processor.php
@@ -10,33 +10,52 @@
 		private Storage $storage,
 		private Executor $defaultExecutor,
 		private TypeRegistry $registry,
-		private Locker $locker
 	) {}
 
 	public function run(): void
 	{
-		$this->locker->withLock(function () {
-			$ops = $this->storage->fetchRunnable(10);
+		if (!$this->hasAdequateResources()) {
+			error_log('[Processor] Insufficient resources to start processing');
+			return;
+		}
 
-			foreach ($ops as $op) {
-				if (!$this->storage->markProcessing($op->id)) {
-					continue;
+		$op = null;
+		$this->storage->withTransaction(function() use (&$op) {
+			$candidates = $this->storage->fetchRunnable();
+			foreach ($candidates as $candidate) {
+				if ($candidate->state === 'completed') continue;
+				if (!$this->dependenciesSatisfied($candidate)) continue;
+				if ($this->storage->markProcessing($candidate->id)) {
+					$op = $candidate;
+					break;
 				}
-
-				$this->processOne($op);
 			}
-
-			$this->storage->invalidateQueueCache();
 		});
+
+		if (!$op) return;
+
+		$this->processOne($op);
+		usleep(10000);
+
+		$this->storage->invalidateQueueCache();
 	}
 
+
 	private function processOne(Operation $op): void
 	{
 		$progress = new Progress($op);
 		$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
+		$op->startedAt = current_time('mysql');
+		$op->state = 'processing';
+		$this->storage->saveProgress($op);
 
 		try {
-			$result = $executor->execute($op, $progress);
+			$chunkKey = $op->metadata['chunk_key'] ?? null;
+
+			// No transaction wrapping — executor handles its own
+			$result = $chunkKey
+				? $this->executeChunked($op, $executor, $progress, $chunkKey)
+				: $executor->execute($op, $progress);
 
 			$op->state = 'completed';
 			$op->outcome = $result->outcome;
@@ -44,10 +63,140 @@
 			$op->completedAt = current_time('mysql');
 
 		} catch (\Throwable $e) {
+			error_log("[Processor] Exception caught: " . $e->getMessage());
 			$this->handleFailure($op, $e);
 		}
 
-		$this->storage->save($op);
+		$this->saveOperation($op);
+	}
+	private function saveOperation(Operation $op): void
+	{
+		if ($op->state === 'completed') {
+			$this->storage->saveFinal($op);
+		} else {
+			// Retryable failure — save as scheduled/failed without requiring 'completed' state
+			$this->storage->save($op);
+		}
+	}
+
+	private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
+	{
+		$keys = (array) $chunkKey;
+		$chunkSize = $op->metadata['chunk_size'] ?? 10;
+		$offset = $op->metadata['chunk_offset'] ?? 0;
+		$chunks = $this->buildChunks($op->requestData, $keys, $chunkSize);
+		$totalChunks = count($chunks);
+
+		$allResults = [];
+
+		foreach ($chunks as $index => $chunk) {
+			if ($index < $offset) {
+				continue;
+			}
+
+			// Resource check before each chunk
+			if (!$this->checkResourceLimits()) {
+				error_log("[Processor] Resource limits reached, pausing at chunk {$index}");
+
+				$op->metadata['chunk_offset'] = $index;
+				$op->state = 'scheduled';
+				$op->scheduledAt = date('Y-m-d H:i:s', time() + 5);
+				$this->storage->save($op);
+
+				return new Result(
+					outcome: 'pending',
+					result: [
+						'paused_at_chunk' => $index,
+						'total_chunks' => $totalChunks,
+						'partial_results' => $allResults,
+					]
+				);
+			}
+
+			try {
+				$chunkOp = clone $op;
+				$chunkOp->requestData = array_merge(
+					array_diff_key($op->requestData, array_flip($keys)),
+					$chunk['data']
+				);
+
+				$executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
+					$result = $executor->execute($chunkOp, $progress);
+					$op->metadata['chunk_offset'] = $index + 1;
+					$this->storage->saveProgress($op);
+					return $result;
+				};
+
+				$chunkResult = $executeChunk();
+
+				// Merge results
+				if (!empty($chunkResult->result)) {
+					if (is_array($chunkResult->result)) {
+						$allResults = array_merge_recursive($allResults, $chunkResult->result);
+					} else {
+						$allResults = $chunkResult->result;
+					}
+				}
+
+			} catch (\Throwable $e) {
+				error_log("[Processor] Chunk {$index} failed: " . $e->getMessage());
+
+				// Record failed items from this chunk
+				foreach ($chunk['data'] as $key => $items) {
+					foreach ($items as $item) {
+						$progress->failItem($item, $e->getMessage());
+					}
+				}
+
+				// Continue to next chunk rather than failing entire operation
+				// Remove this if you want fail-fast behavior
+			}
+
+			// Delay between chunks (skip after last chunk)
+			if ($index < $totalChunks - 1) {
+				usleep(50000); // 50ms
+			}
+		}
+
+		// Determine final outcome
+		$outcome = 'success';
+		if (!empty($op->failedItems)) {
+			$failedCount = count($op->failedItems);
+			$outcome = $failedCount === $op->totalItems ? 'failed' : 'partial';
+		}
+
+		return new Result(
+			outcome: $outcome,
+			result: $allResults
+		);
+	}
+
+	private function buildChunks(array $data, array $keys, int $chunkSize): array
+	{
+		$chunks = [];
+
+		foreach ($keys as $key) {
+			if (!isset($data[$key]) || !is_array($data[$key])) {
+				continue;
+			}
+
+			$items = $data[$key];
+			$itemChunks = array_chunk($items, $chunkSize, true); // preserve keys
+
+			foreach ($itemChunks as $index => $chunkItems) {
+				if (!isset($chunks[$index])) {
+					$chunks[$index] = [
+						'data' => [],
+						'count' => 0
+					];
+				}
+
+				$chunks[$index]['data'][$key] = $chunkItems;
+				$chunks[$index]['count'] += count($chunkItems);
+			}
+		}
+
+		return $chunks;
 	}
 
 	private function handleFailure(Operation $op, \Throwable $e): void
@@ -88,4 +237,91 @@
 		$jitter = rand(0, (int)($delay * 0.1));
 		return date('Y-m-d H:i:s', time() + $delay + $jitter);
 	}
+
+	private function checkResourceLimits(): bool
+	{
+		// Check memory (leave 20% buffer)
+		$memoryLimit = $this->getMemoryLimitBytes();
+		$memoryUsage = memory_get_usage(true);
+		if ($memoryUsage > $memoryLimit * 0.8) {
+			error_log('[Processor] Memory limit approaching, pausing');
+			return false;
+		}
+
+		// Check time (leave 30s buffer for cleanup)
+		$maxTime = (int) ini_get('max_execution_time');
+		if ($maxTime > 0) {
+			$elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
+			if ($elapsed > $maxTime - 30) {
+				error_log('[Processor] Time limit approaching, pausing');
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	private function getMemoryLimitBytes(): int
+	{
+		$limit = ini_get('memory_limit');
+		if ($limit === '-1') {
+			return PHP_INT_MAX;
+		}
+
+		$unit = strtolower(substr($limit, -1));
+		$value = (int) $limit;
+
+		return match($unit) {
+			'g' => $value * 1024 * 1024 * 1024,
+			'm' => $value * 1024 * 1024,
+			'k' => $value * 1024,
+			default => $value,
+		};
+	}
+
+	private function hasAdequateResources(): bool
+	{
+		// Stricter thresholds for starting (50% memory, 60s minimum time)
+		$memoryLimit = $this->getMemoryLimitBytes();
+		$memoryUsage = memory_get_usage(true);
+		if ($memoryUsage > $memoryLimit * 0.5) {
+			return false;
+		}
+
+		$maxTime = (int) ini_get('max_execution_time');
+		if ($maxTime > 0) {
+			$elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
+			if ($maxTime - $elapsed < 60) { // Need at least 60s
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	private function dependenciesSatisfied(Operation $op): bool
+	{
+		if (empty($op->dependencies)) {
+			return true;
+		}
+		foreach ($op->dependencies as $depId) {
+			$dep = $this->storage->find($depId);
+
+			// Missing dependency = block (or decide to ignore; your call)
+			if (!$dep) {
+				return false;
+			}
+
+			if ($dep->state !== 'completed') {
+				return false;
+			}
+
+			if (!in_array($dep->outcome, ['success', 'partial'], true)) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
 }

--
Gitblit v1.10.0