From ed57c386db34d8693ca75311972d0929ebe5f488 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Mon, 01 Jun 2026 22:23:19 +0000
Subject: [PATCH] =Added some more Schema classes, allowed for override of  array in outputSchema for complex schema, as for timeline post types

---
 inc/managers/queue/Queue.php |  119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 118 insertions(+), 1 deletions(-)

diff --git a/inc/managers/queue/Queue.php b/inc/managers/queue/Queue.php
index 08f3212..89a8ceb 100644
--- a/inc/managers/queue/Queue.php
+++ b/inc/managers/queue/Queue.php
@@ -4,7 +4,10 @@
 	exit;
 }
 
+use JVBase\managers\CustomTable;
 use WP_Error;
+use WP_REST_Request;
+use WP_REST_Response;
 
 class Queue
 {
@@ -25,6 +28,7 @@
 
 		add_action('jvb_process_queue', [$this, 'checkQueue']);
 		add_action('jvb_queue_maintenance', [$this, 'maintenance']);
+		add_action('jvb_daily_snapshot', [$this->storage, 'snapshotDaily']);
 
 		if (!wp_next_scheduled('jvb_process_queue')) {
 			wp_schedule_event(time(), 'every-minute', 'jvb_process_queue');
@@ -32,6 +36,14 @@
 		if (!wp_next_scheduled('jvb_queue_maintenance')) {
 			wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
 		}
+		if (!wp_next_scheduled('jvb_daily_snapshot')) {
+			// Schedule for next 3am
+			$next3am = strtotime('tomorrow 3am', current_time('timestamp'));
+			wp_schedule_event($next3am, 'daily', 'jvb_daily_snapshot');
+		}
+
+		jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
+		add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
 	}
 
 	/**
@@ -67,6 +79,13 @@
 		try {
 			$incoming = $this->buildOperation($type, $userId, $data, $options);
 
+			// Attempt pre-insert merge
+			$merged = $this->tryMerge($incoming);
+			if ($merged) {
+				$this->runQueueOnShutdown();
+				return $merged;
+			}
+
 			$this->storage->insert($incoming);
 			$this->runQueueOnShutdown();
 
@@ -83,6 +102,59 @@
 	}
 
 	/**
+	 * Try to merge incoming operation into an existing pending/scheduled one.
+	 * Returns result array if merged, null if not.
+	 * @throws \Throwable
+	 */
+	private function tryMerge(Operation $incoming): ?array
+	{
+		$mergeable = $this->registry->getMergeable($incoming->type);
+		if (!$mergeable) {
+			return null;
+		}
+
+		$criteria = $mergeable->matchCriteria($incoming);
+
+		$existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
+		if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
+			return null;
+		}
+
+		$this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
+			$mergeable->merge($existing, $incoming);
+
+			$this->storage->replaceDependency(
+				$incoming->id,
+				$existing->id
+			);
+
+			// Merge dependency arrays safely
+			$existing->dependencies = array_values(array_unique(
+				array_merge($existing->dependencies, $incoming->dependencies)
+			));
+
+			// Prevent self dependency
+			$existing->dependencies = array_diff(
+				$existing->dependencies,
+				[$existing->id]
+			);
+
+			$this->storage->save($existing);
+
+			$incoming->state = 'completed';
+			$incoming->outcome = 'merged';
+			$incoming->merged_into = $existing->id;
+			$this->storage->saveFinal($incoming);
+		});
+
+		return [
+			'success'          => true,
+			'operation_id'     => $existing->id,
+			'updated_existing' => true,
+		];
+	}
+
+	/**
 	 * Alias for add() - backwards compatibility
 	 */
 	public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
@@ -103,7 +175,7 @@
 		$this->locker->withLock(function () {
 			$this->storage->resetStuckOperations(30);
 		});
-
+		$this->runQueueOnShutdown();
 	}
 
 	// === Public Getters ===
@@ -140,6 +212,7 @@
 			'user_id'      => $op->userId,
 			'metadata'     => $op->metadata,
 			'dependencies' => $op->dependencies,
+			'merged_into'  => $op->merged_into,
 			default        => null,
 		};
 	}
@@ -346,4 +419,48 @@
 
 		$this->checkQueue();
 	}
+
+	public function registerAdminAction():void
+	{
+		$admin = JVB()->admin();
+		$admin->registerAction(
+			'Restart Stuck Operations',
+			'restart-stuck-operations',
+			'manage_options',
+			'arrows-clockwise'
+		);
+		$admin->registerAction(
+			'Unlock Queue',
+			'unlock-operation-queue',
+			'manage_options',
+			'infinity'
+		);
+	}
+	/**
+	 * @param WP_REST_Response $response
+	 * @param string $action
+	 *
+	 * @return bool|WP_REST_Response
+	 */
+	public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
+	{
+		switch ($action) {
+			case 'unlock-operation-queue':
+				error_log('Unlocking Queue');
+				$this->locker->unlock();
+				return new WP_REST_Response([
+					'success'   => true,
+					'message'   => 'Unlocked Queue'
+				]);
+			case 'restart-stuck-operations':
+				error_log('Restarting stuck operations');
+				$this->maintenance();
+				return new WP_REST_Response([
+					'success'   => true,
+					'message'   => 'Restarted Stuck Operations'
+				]);
+			default:
+				return $response;
+		}
+	}
 }

--
Gitblit v1.10.0