From 97e7c319d656a5f05489ca996e249e7359303d4d Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Sun, 31 May 2026 22:42:33 +0000
Subject: [PATCH] =Jakevan edits done?

---
 inc/managers/queue/Queue.php |  224 +++++++++++++++++++++++++++++++++----------------------
 1 files changed, 135 insertions(+), 89 deletions(-)

diff --git a/inc/managers/queue/Queue.php b/inc/managers/queue/Queue.php
index ce213f2..89a8ceb 100644
--- a/inc/managers/queue/Queue.php
+++ b/inc/managers/queue/Queue.php
@@ -4,26 +4,31 @@
 	exit;
 }
 
+use JVBase\managers\CustomTable;
 use WP_Error;
+use WP_REST_Request;
+use WP_REST_Response;
 
 class Queue
 {
 	private Storage $storage;
 	private Processor $processor;
 	private TypeRegistry $registry;
+	public Executor $executor;
 	private Locker $locker;
 
 	public function __construct()
 	{
 		$this->storage = new Storage();
 		$this->registry = new TypeRegistry();
-		$this->locker = new Locker('queue_processor', 300);
+		$this->locker = new Locker();
 
-		$executor = new FilteredExecutor($this->storage);
-		$this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker);
+		$this->executor = new FilteredExecutor();
+		$this->processor = new Processor($this->storage, $this->executor, $this->registry);
 
 		add_action('jvb_process_queue', [$this, 'checkQueue']);
 		add_action('jvb_queue_maintenance', [$this, 'maintenance']);
+		add_action('jvb_daily_snapshot', [$this->storage, 'snapshotDaily']);
 
 		if (!wp_next_scheduled('jvb_process_queue')) {
 			wp_schedule_event(time(), 'every-minute', 'jvb_process_queue');
@@ -31,6 +36,14 @@
 		if (!wp_next_scheduled('jvb_queue_maintenance')) {
 			wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
 		}
+		if (!wp_next_scheduled('jvb_daily_snapshot')) {
+			// Schedule for next 3am
+			$next3am = strtotime('tomorrow 3am', current_time('timestamp'));
+			wp_schedule_event($next3am, 'daily', 'jvb_daily_snapshot');
+		}
+
+		jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
+		add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
 	}
 
 	/**
@@ -41,6 +54,11 @@
 		return $this->registry;
 	}
 
+	public function storage(): Storage
+	{
+		return $this->storage;
+	}
+
 	/**
 	 * Queue a new operation or merge into existing
 	 *
@@ -60,52 +78,12 @@
 	{
 		try {
 			$incoming = $this->buildOperation($type, $userId, $data, $options);
-			$mergeable = $this->registry->getMergeable($type);
-			$existingById = $this->storage->find($incoming->id);
 
-			if ($existingById) {
-				// Operation with this ID already exists
-				if (in_array($existingById->state, ['pending', 'scheduled']) && $mergeable) {
-					// Still pending and mergeable, merge into it
-					$merged = $mergeable->merge($existingById, $incoming);
-					$this->storage->save($merged);
-					$this->runQueueOnShutdown();
-
-					return [
-						'success'          => true,
-						'operation_id'     => $merged->id,
-						'updated_existing' => true,
-					];
-				} else {
-					// Already processing/completed, or not mergeable - generate new ID
-					$incoming->id = 'u' . $userId . '_' . time() . '_' . uniqid();
-
-					JVB()->error()->log(
-						'[Queue]:add',
-						'Duplicate ID for non-mergeable operation, generated new ID',
-						[
-							'type' => $type,
-							'existing_state' => $existingById->state,
-						],
-						'warning'
-					);
-				}
-			}
-
-			if ($mergeable) {
-				$existing = $this->storage->findMergeable($type, $userId);
-
-				if ($existing && $mergeable->canMerge($existing, $incoming)) {
-					$merged = $mergeable->merge($existing, $incoming);
-					$this->storage->save($merged);
-					$this->runQueueOnShutdown();
-
-					return [
-						'success'          => true,
-						'operation_id'     => $merged->id,
-						'updated_existing' => true,
-					];
-				}
+			// Attempt pre-insert merge
+			$merged = $this->tryMerge($incoming);
+			if ($merged) {
+				$this->runQueueOnShutdown();
+				return $merged;
 			}
 
 			$this->storage->insert($incoming);
@@ -124,6 +102,59 @@
 	}
 
 	/**
+	 * Try to merge incoming operation into an existing pending/scheduled one.
+	 * Returns result array if merged, null if not.
+	 * @throws \Throwable
+	 */
+	private function tryMerge(Operation $incoming): ?array
+	{
+		$mergeable = $this->registry->getMergeable($incoming->type);
+		if (!$mergeable) {
+			return null;
+		}
+
+		$criteria = $mergeable->matchCriteria($incoming);
+
+		$existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
+		if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
+			return null;
+		}
+
+		$this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
+			$mergeable->merge($existing, $incoming);
+
+			$this->storage->replaceDependency(
+				$incoming->id,
+				$existing->id
+			);
+
+			// Merge dependency arrays safely
+			$existing->dependencies = array_values(array_unique(
+				array_merge($existing->dependencies, $incoming->dependencies)
+			));
+
+			// Prevent self dependency
+			$existing->dependencies = array_diff(
+				$existing->dependencies,
+				[$existing->id]
+			);
+
+			$this->storage->save($existing);
+
+			$incoming->state = 'completed';
+			$incoming->outcome = 'merged';
+			$incoming->merged_into = $existing->id;
+			$this->storage->saveFinal($incoming);
+		});
+
+		return [
+			'success'          => true,
+			'operation_id'     => $existing->id,
+			'updated_existing' => true,
+		];
+	}
+
+	/**
 	 * Alias for add() - backwards compatibility
 	 */
 	public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
@@ -133,43 +164,18 @@
 
 	public function checkQueue(): void
 	{
-		if (!$this->storage->getQueueInfo()['has_items']) {
-			return;
-		}
+		$this->locker->withLock(function () {
+			$this->processor->run();
+		});
 
-		$this->processor->run();
 	}
 
 	public function maintenance(): void
 	{
-		if ($this->locker->isLocked()) {
-			return;
-		}
-
-		$this->cleanupStuck();
-	}
-
-	private function cleanupStuck(): void
-	{
-		// Operations stuck in processing > 30 min → reschedule
-		global $wpdb;
-		$table = $wpdb->prefix . BASE . '_operation_queue';
-
-		$stuck = $wpdb->get_results($wpdb->prepare("
-            SELECT id FROM {$table}
-            WHERE state = 'processing'
-            AND started_at < %s
-        ", date('Y-m-d H:i:s', strtotime('-30 minutes'))));
-
-		foreach ($stuck as $row) {
-			$op = $this->storage->find($row->id);
-			if ($op) {
-				$op->state = 'scheduled';
-				$op->scheduledAt = date('Y-m-d H:i:s', time() + 60);
-				$op->retries++;
-				$this->storage->save($op);
-			}
-		}
+		$this->locker->withLock(function () {
+			$this->storage->resetStuckOperations(30);
+		});
+		$this->runQueueOnShutdown();
 	}
 
 	// === Public Getters ===
@@ -206,6 +212,7 @@
 			'user_id'      => $op->userId,
 			'metadata'     => $op->metadata,
 			'dependencies' => $op->dependencies,
+			'merged_into'  => $op->merged_into,
 			default        => null,
 		};
 	}
@@ -266,6 +273,7 @@
 	 */
 	public function retry(string $id, int $userId): bool
 	{
+
 		$op = $this->get($id);
 		if (!$op || $op->userId !== $userId) {
 			return false;
@@ -282,7 +290,7 @@
 		$op->lastErrorHash = null;
 		$op->scheduledAt = current_time('mysql');
 		$op->retries++;
-
+		error_log('[Queue]Retrying operation '.print_r($op->id, true));
 		$saved = $this->storage->save($op);
 
 		if ($saved) {
@@ -323,15 +331,9 @@
 				default       => null,
 			};
 		}
-
-		return $this->storage->save($op);
+		error_log('[Queue]: updating operation '.print_r($op->id, true));
+		return $this->storage->saveProgress($op);
 	}
-
-	public function isLocked(): bool
-	{
-		return $this->locker->isLocked();
-	}
-
 	// === Private Helpers ===
 
 	private function buildOperation(string $type, int $userId, array $data, array $options): Operation
@@ -417,4 +419,48 @@
 
 		$this->checkQueue();
 	}
+
+	public function registerAdminAction():void
+	{
+		$admin = JVB()->admin();
+		$admin->registerAction(
+			'Restart Stuck Operations',
+			'restart-stuck-operations',
+			'manage_options',
+			'arrows-clockwise'
+		);
+		$admin->registerAction(
+			'Unlock Queue',
+			'unlock-operation-queue',
+			'manage_options',
+			'infinity'
+		);
+	}
+	/**
+	 * @param WP_REST_Response $response
+	 * @param string $action
+	 *
+	 * @return bool|WP_REST_Response
+	 */
+	public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
+	{
+		switch ($action) {
+			case 'unlock-operation-queue':
+				error_log('Unlocking Queue');
+				$this->locker->unlock();
+				return new WP_REST_Response([
+					'success'   => true,
+					'message'   => 'Unlocked Queue'
+				]);
+			case 'restart-stuck-operations':
+				error_log('Restarting stuck operations');
+				$this->maintenance();
+				return new WP_REST_Response([
+					'success'   => true,
+					'message'   => 'Restarted Stuck Operations'
+				]);
+			default:
+				return $response;
+		}
+	}
 }

--
Gitblit v1.10.0