From 2127b1bdd73ecd2423e443992da4b442f5a3c1a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Wed, 04 Feb 2026 21:19:25 +0000
Subject: [PATCH] =Major overhaul of MetaManager.php -> Meta.php and RestRouteManager.php -> Rest.php. Seems to work for JakeVan

---
 inc/managers/queue/Storage.php |  182 +++++++++++++++++++++++++++++++++++++++------
 1 files changed, 157 insertions(+), 25 deletions(-)

diff --git a/inc/managers/queue/Storage.php b/inc/managers/queue/Storage.php
index 2aa9ab6..99cd8c3 100644
--- a/inc/managers/queue/Storage.php
+++ b/inc/managers/queue/Storage.php
@@ -4,15 +4,15 @@
 	exit;
 }
 
-use JVBase\managers\CacheManager;
+use JVBase\managers\Cache;
+use LogicException;
 
 class Storage
 {
 	private \wpdb $wpdb;
 	private string $table;
-	private CacheManager $cache;
+	private Cache $cache;
 
-	private const CACHE_USER_PREFIX = 'user_queue_';
 	private const CACHE_QUEUE_INFO = 'queue_info';
 
 	public function __construct()
@@ -20,7 +20,7 @@
 		global $wpdb;
 		$this->wpdb = $wpdb;
 		$this->table = $wpdb->prefix . BASE . '_operation_queue';
-		$this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
+		$this->cache = Cache::for('queue', DAY_IN_SECONDS);
 	}
 
 	public function hasProcessingOperations(): bool
@@ -34,29 +34,27 @@
 	{
 		$now = current_time('mysql');
 
-		$rows = $this->wpdb->get_results($this->wpdb->prepare("
-        SELECT oq.* FROM {$this->table} oq
-        WHERE oq.state IN ('pending', 'scheduled')
-          AND oq.scheduled_at <= %s
-          AND NOT EXISTS (
-              SELECT 1
-              FROM JSON_TABLE(
-                  COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
-                  '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
-              ) AS deps
-              JOIN {$this->table} dep ON dep.id = deps.dep_id
-              WHERE dep.state != 'completed'
-                 OR dep.outcome NOT IN ('success', 'partial')
-          )
-        ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
-        LIMIT %d
-    ", $now, $limit));
+		$rows = $this->wpdb->get_results(
+			$this->wpdb->prepare("
+            SELECT *
+            FROM {$this->table}
+            WHERE state IN ('pending', 'scheduled')
+              AND scheduled_at <= %s
+            ORDER BY
+              FIELD(priority, 'high', 'normal', 'low'),
+              scheduled_at
+            LIMIT %d
+            FOR UPDATE SKIP LOCKED
+        ", $now, $limit)
+		);
 
 		return array_map([$this, 'rowToOperation'], $rows ?: []);
 	}
 
 
 
+
+
 	public function markProcessing(string $id): bool
 	{
 		$now = current_time('mysql');
@@ -109,6 +107,85 @@
 		return $result !== false;
 	}
 
+	/**
+	 * For saving a 'step' in an operation; usually after each chunk
+	 * @param Operation $op
+	 * @return bool
+	 */
+	public function saveProgress(Operation $op): bool {
+		global $wpdb;
+
+		$table = $this->table;
+
+		$data = [
+			'processed_items' => $op->processedItems ?? 0,
+			'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
+			'metadata'        => ($op->metadata) ? json_encode($op->metadata) : null,
+			'result'          => ($op->result) ? json_encode($op->result) :null,
+			'updated_at'      => current_time('mysql'),
+		];
+
+		// IMPORTANT: never touch terminal state
+		$where = [
+			'id'    => $op->id,
+			'state' => 'processing',
+		];
+
+		$updated = $wpdb->update($table, $data, $where);
+
+		if ($updated === false) {
+			error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * The operation is complete, let's progress to 'completed'
+	 * @param Operation $op
+	 * @return bool
+	 */
+	public function saveFinal(Operation $op): bool {
+		global $wpdb;
+
+		if (($op->state?? null) !== 'completed') {
+			throw new LogicException('saveFinal called without completed state');
+		}
+
+		$table = $this->table;
+
+		$data = [
+			'state'          => 'completed',
+			'outcome'        => $op->outcome?? 'success',
+			'processed_items'=> $op->processedItems ?? 0,
+			'failed_items'   => $op->failedItems    ?? null,
+			'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
+			'completed_at'   => $op->completedAt ?? current_time('mysql'),
+			'updated_at'     => current_time('mysql'),
+		];
+
+		// HARD GUARD: cannot overwrite completed
+		$where = [
+			'id'    => $op->id,
+			'state' => 'processing',
+		];
+
+		$updated = $wpdb->update($table, $data, $where);
+		$this->invalidateQueueCache();
+
+		if ($updated === 0) {
+			return true;
+		}
+
+		if ($updated === false) {
+			error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
+			return false;
+		}
+
+		return true;
+	}
+
 	public function insert(Operation $op): bool
 	{
 		$result = $this->wpdb->insert($this->table, [
@@ -360,13 +437,68 @@
 
 	public function invalidateQueueCache(): void
 	{
-		$this->cache->delete(self::CACHE_QUEUE_INFO);
-		$this->cache->touch();
+		$this->cache->forget(self::CACHE_QUEUE_INFO);
 	}
 
 	private function invalidateUser(int $userId): void
 	{
-		CacheManager::invalidateAll("user_{$userId}");
-		$this->cache->delete(self::CACHE_QUEUE_INFO);
+		$this->cache->forget($userId);
+	}
+	public function getLastError(): string
+	{
+		return $this->wpdb->last_error;
+	}
+
+	public function withTransaction(callable $callback): mixed
+	{
+		$this->wpdb->query('START TRANSACTION');
+
+		try {
+			$result = $callback();
+			$this->wpdb->query('COMMIT');
+			return $result;
+		} catch (\Throwable $e) {
+			$this->wpdb->query('ROLLBACK');
+			error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
+			throw $e;
+		}
+	}
+
+	public function resetStuckOperations(int $stuckMinutes = 30): int
+	{
+		return $this->withTransaction(function () use ($stuckMinutes) {
+			$cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
+
+			// Get IDs first for logging
+			$stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
+            SELECT id FROM {$this->table}
+            WHERE state = 'processing'
+              AND started_at < %s
+            FOR UPDATE
+        ", $cutoff));
+
+			if (empty($stuckIds)) {
+				return 0;
+			}
+
+			// Reset them
+			$affected = $this->wpdb->query($this->wpdb->prepare("
+            UPDATE {$this->table}
+            SET state = 'scheduled',
+                scheduled_at = %s,
+                retries = retries + 1,
+                updated_at = %s
+            WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
+        ",
+				date('Y-m-d H:i:s', time() + 60),
+				current_time('mysql'),
+				...$stuckIds
+			));
+
+			// Optional: Log to audit table
+			// $this->logStuckReset($stuckIds);
+
+			return (int) $affected;
+		});
 	}
 }

--
Gitblit v1.10.0