From ed57c386db34d8693ca75311972d0929ebe5f488 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Mon, 01 Jun 2026 22:23:19 +0000
Subject: [PATCH] =Added some more Schema classes, allowed for override of  array in outputSchema for complex schema, as for timeline post types

---
 inc/managers/queue/Processor.php |  133 +++++++++++++++++++++-----------------------
 1 files changed, 64 insertions(+), 69 deletions(-)

diff --git a/inc/managers/queue/Processor.php b/inc/managers/queue/Processor.php
index d743f77..f54fadd 100644
--- a/inc/managers/queue/Processor.php
+++ b/inc/managers/queue/Processor.php
@@ -19,53 +19,43 @@
 			return;
 		}
 
-		$ops = $this->storage->fetchRunnable(3);
-
-		$lastOpId = null;
-		foreach ($ops as $op) {
-			if (!$this->storage->markProcessing($op->id)) {
-				continue;
+		$op = null;
+		$this->storage->withTransaction(function() use (&$op) {
+			$candidates = $this->storage->fetchRunnable();
+			foreach ($candidates as $candidate) {
+				if ($candidate->state === 'completed') continue;
+				if (!$this->dependenciesSatisfied($candidate)) continue;
+				if ($this->storage->markProcessing($candidate->id)) {
+					$op = $candidate;
+					break;
+				}
 			}
-			$lastOpId = $op->id;
-			$this->processOne($op);
-			usleep(10000);
-		}
+		});
+
+		if (!$op) return;
+
+		$this->processOne($op);
+		usleep(10000);
 
 		$this->storage->invalidateQueueCache();
 	}
 
+
 	private function processOne(Operation $op): void
 	{
 		$progress = new Progress($op);
-
 		$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
 		$op->startedAt = current_time('mysql');
 		$op->state = 'processing';
-
 		$this->storage->saveProgress($op);
 
-		//Check to see if we can merge first
-		$mergeable = $this->registry->getMergeable($op->type);
-
-		if ($mergeable) {
-			$existing = $this->storage->findMergeable(
-				$op->type,
-				$op->userId
-			);
-
-			if ($existing && $mergeable->canMerge($existing, $op)) {
-				$this->applyMerge($mergeable, $existing, $op);
-				return;
-			}
-		}
-
 		try {
-			// Check if this operation should be chunked
 			$chunkKey = $op->metadata['chunk_key'] ?? null;
 
+			// No transaction wrapping — executor handles its own
 			$result = $chunkKey
 				? $this->executeChunked($op, $executor, $progress, $chunkKey)
-				: $this->storage->withTransaction(fn() => $executor->execute($op, $progress));
+				: $executor->execute($op, $progress);
 
 			$op->state = 'completed';
 			$op->outcome = $result->outcome;
@@ -77,7 +67,16 @@
 			$this->handleFailure($op, $e);
 		}
 
-		$this->storage->saveFinal($op);
+		$this->saveOperation($op);
+	}
+	private function saveOperation(Operation $op): void
+	{
+		if ($op->state === 'completed') {
+			$this->storage->saveFinal($op);
+		} else {
+			// Retryable failure — save as scheduled/failed without requiring 'completed' state
+			$this->storage->save($op);
+		}
 	}
 
 	private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
@@ -115,23 +114,20 @@
 			}
 
 			try {
-				$chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) {
-					// Clone operation with only this chunk's data
-					$chunkOp = clone $op;
-					$chunkOp->requestData = array_merge(
-						array_diff_key($op->requestData, array_flip($keys)),
-						$chunk['data']
-					);
+				$chunkOp = clone $op;
+				$chunkOp->requestData = array_merge(
+					array_diff_key($op->requestData, array_flip($keys)),
+					$chunk['data']
+				);
 
-					// Execute this chunk
+				$executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
 					$result = $executor->execute($chunkOp, $progress);
-
-					// Update progress
 					$op->metadata['chunk_offset'] = $index + 1;
 					$this->storage->saveProgress($op);
-
 					return $result;
-				});
+				};
+
+				$chunkResult = $executeChunk();
 
 				// Merge results
 				if (!empty($chunkResult->result)) {
@@ -242,33 +238,6 @@
 		return date('Y-m-d H:i:s', time() + $delay + $jitter);
 	}
 
-	private function applyMerge(
-		Mergeable $mergeable,
-		Operation $target,
-		Operation $incoming
-	): void {
-		// Safety: only merge into actively processing ops
-		if ($target->state !== 'processing') {
-			return;
-		}
-
-		$this->storage->withTransaction(function () use ($mergeable, $target, $incoming) {
-			$mergeable->merge($target, $incoming);
-
-			$target->dependencies[] = $incoming->id;
-			$target->dependencies = array_values(array_unique($target->dependencies));
-
-			$this->storage->saveProgress($target);
-
-			$incoming->state = 'completed';
-			$incoming->outcome = 'success';
-			$incoming->completedAt = current_time('mysql');
-			$incoming->result = ['merged_into' => $target->id];
-
-			$this->storage->saveFinal($incoming);
-		});
-	}
-
 	private function checkResourceLimits(): bool
 	{
 		// Check memory (leave 20% buffer)
@@ -329,4 +298,30 @@
 
 		return true;
 	}
+
+	private function dependenciesSatisfied(Operation $op): bool
+	{
+		if (empty($op->dependencies)) {
+			return true;
+		}
+		foreach ($op->dependencies as $depId) {
+			$dep = $this->storage->find($depId);
+
+			// Missing dependency = block (or decide to ignore; your call)
+			if (!$dep) {
+				return false;
+			}
+
+			if ($dep->state !== 'completed') {
+				return false;
+			}
+
+			if (!in_array($dep->outcome, ['success', 'partial'], true)) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
 }

--
Gitblit v1.10.0