From 226b50642af0895948fbaa623a9b7180399a63b6 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Wed, 13 May 2026 19:15:48 +0000
Subject: [PATCH] =Queue fixes

---
 inc/managers/queue/Storage.php |  442 ++++++++++++++++++++++++++----------------------------
 1 files changed, 213 insertions(+), 229 deletions(-)

diff --git a/inc/managers/queue/Storage.php b/inc/managers/queue/Storage.php
index 40426e6..2a5d540 100644
--- a/inc/managers/queue/Storage.php
+++ b/inc/managers/queue/Storage.php
@@ -5,48 +5,39 @@
 }
 
 use JVBase\managers\Cache;
+use JVBase\managers\CustomTable;
 use LogicException;
 
 class Storage
 {
-	private \wpdb $wpdb;
-	private string $table;
+	private CustomTable $table;
+	private CustomTable $stats;
 	private Cache $cache;
 
 	private const CACHE_QUEUE_INFO = 'queue_info';
 
 	public function __construct()
 	{
-		global $wpdb;
-		$this->wpdb = $wpdb;
-		$this->table = $wpdb->prefix . BASE . '_operation_queue';
+		$this->defineTables();
 		$this->cache = Cache::for('queue', DAY_IN_SECONDS);
 	}
 
 	public function hasProcessingOperations(): bool
 	{
-		return (bool) $this->wpdb->get_var(
-			"SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1"
-		);
+		return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1");
 	}
 
 	public function fetchRunnable(int $offset = 0): array
 	{
 		$now = current_time('mysql');
-
-		$rows = $this->wpdb->get_results(
-			$this->wpdb->prepare("
-            SELECT *
-            FROM {$this->table}
-            WHERE state IN ('pending', 'scheduled')
-              AND scheduled_at <= %s
-            ORDER BY
-              FIELD(priority, 'high', 'normal', 'low'),
-              scheduled_at
-            LIMIT 10 OFFSET %d
-            FOR UPDATE SKIP LOCKED
-        ", $now, $offset)
-		);
+		$rows = $this->table->queryResults("
+        SELECT * FROM {table}
+        WHERE state IN ('pending', 'scheduled')
+          AND scheduled_at <= %s
+        ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at
+        LIMIT 10 OFFSET %d
+        FOR UPDATE SKIP LOCKED
+    ", [$now, $offset]);
 
 		$total = count($rows);
 		foreach ($rows as $row) {
@@ -88,26 +79,22 @@
 	public function markProcessing(string $id): bool
 	{
 		$now = current_time('mysql');
-
-		$affected = $this->wpdb->query($this->wpdb->prepare("
-            UPDATE {$this->table}
-            SET state = 'processing', started_at = %s, updated_at = %s
-            WHERE id = %s AND state IN ('pending', 'scheduled')
-        ", $now, $now, $id));
+		$affected = $this->table->query("
+        UPDATE {table}
+        SET state = 'processing', started_at = %s, updated_at = %s
+        WHERE id = %s AND state IN ('pending', 'scheduled')
+    ", [$now, $now, $id]);
 
 		if ($affected > 0) {
 			$op = $this->find($id);
-			if ($op) {
-				$this->invalidateUser($op->userId);
-			}
+			if ($op) $this->invalidateUser($op->userId);
 		}
-
 		return $affected > 0;
 	}
 
 	public function save(Operation $op): bool
 	{
-		$data = [
+		$result = $this->table->update([
 			'request_data'    => json_encode($op->requestData),
 			'total_items'     => $op->totalItems,
 			'processed_items' => $op->processedItems,
@@ -118,23 +105,17 @@
 			'retries'         => $op->retries,
 			'last_error_hash' => $op->lastErrorHash,
 			'error_message'   => $op->errorMessage,
-			'scheduled_at'    => $op->scheduledAt,
-			'started_at'      => $op->startedAt,
 			'completed_at'    => $op->completedAt,
 			'metadata'        => json_encode($op->metadata),
 			'result'          => $op->result ? json_encode($op->result) : null,
 			'dependencies'    => json_encode($op->dependencies),
 			'merged_into'     => $op->merged_into,
 			'user_dismissed'  => $op->userDismissed ? 1 : 0,
-			'updated_at'      => current_time('mysql'),
-		];
+		],
+			['id' => $op->id]
+		);
 
-		$result = $this->wpdb->update($this->table, $data, ['id' => $op->id]);
-
-		if ($result !== false) {
-			$this->invalidateUser($op->userId);
-		}
-
+		if ($result !== false) $this->invalidateUser($op->userId);
 		return $result !== false;
 	}
 
@@ -143,35 +124,17 @@
 	 * @param Operation $op
 	 * @return bool
 	 */
-	public function saveProgress(Operation $op): bool {
-		global $wpdb;
-
-		$table = $this->table;
-
-		$data = [
+	public function saveProgress(Operation $op): bool
+	{
+		$result = $this->table->update([
 			'processed_items' => $op->processedItems ?? 0,
 			'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
-			'metadata'        => ($op->metadata) ? json_encode($op->metadata) : null,
-			'result'          => ($op->result) ? json_encode($op->result) :null,
-			'updated_at'      => current_time('mysql'),
-		];
+			'metadata'        => $op->metadata ? json_encode($op->metadata) : null,
+			'result'          => $op->result ? json_encode($op->result) : null,
+		], ['id' => $op->id, 'state' => 'processing']); // state guard preserved
 
-		// IMPORTANT: never touch terminal state
-		$where = [
-			'id'    => $op->id,
-			'state' => 'processing',
-		];
-
-		$updated = $wpdb->update($table, $data, $where);
-
-		if ($updated === false) {
-			error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
-			return false;
-		}
-
-
+		if ($result === false) return false;
 		$this->invalidateUser($op->userId);
-
 		return true;
 	}
 
@@ -180,50 +143,32 @@
 	 * @param Operation $op
 	 * @return bool
 	 */
-	public function saveFinal(Operation $op): bool {
-		global $wpdb;
-
-		if (($op->state?? null) !== 'completed') {
+	public function saveFinal(Operation $op): bool
+	{
+		if ($op->state !== 'completed') {
 			throw new LogicException('saveFinal called without completed state');
 		}
 
-		$table = $this->table;
+		$result = $this->table->update([
+			'state'           => 'completed',
+			'outcome'         => $op->outcome ?? 'success',
+			'processed_items' => $op->processedItems ?? 0,
+			'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
+			'result'          => isset($op->result) ? wp_json_encode($op->result) : null,
+			'completed_at'    => $op->completedAt ?? current_time('mysql'),
+		], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved
 
-		$data = [
-			'state'          => 'completed',
-			'outcome'        => $op->outcome?? 'success',
-			'processed_items'=> $op->processedItems ?? 0,
-			'failed_items' 	 => $op->failedItems ? json_encode($op->failedItems) : null,
-			'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
-			'completed_at'   => $op->completedAt ?? current_time('mysql'),
-			'updated_at'     => current_time('mysql'),
-		];
+		if ($result === 0) return true; // already completed, not an error
+		if ($result === false) return false;
 
-		// HARD GUARD: cannot overwrite completed
-		$where = [
-			'id'    => $op->id,
-			'state' => 'processing',
-		];
-
-		$updated = $wpdb->update($table, $data, $where);
-
-		if ($updated === 0) {
-			return true;
-		}
-
-		if ($updated === false) {
-			error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
-			return false;
-		}
 		$this->invalidateQueueCache();
 		$this->invalidateUser($op->userId);
-
 		return true;
 	}
 
 	public function insert(Operation $op): bool
 	{
-		$result = $this->wpdb->insert($this->table, [
+		$result = $this->table->insert([
 			'id'              => $op->id,
 			'type'            => $op->type,
 			'user_id'         => $op->userId,
@@ -235,18 +180,10 @@
 			'state'           => $op->state,
 			'outcome'         => $op->outcome,
 			'retries'         => 0,
-			'last_error_hash' => null,
-			'error_message'   => null,
 			'scheduled_at'    => $op->scheduledAt ?? current_time('mysql'),
-			'started_at'      => null,
-			'completed_at'    => null,
 			'metadata'        => json_encode($op->metadata),
-			'result'          => null,
 			'dependencies'    => json_encode($op->dependencies),
-			'user_dismissed'  => 0,
 			'merged_into'  	  => null,
-			'created_at'      => current_time('mysql'),
-			'updated_at'      => current_time('mysql'),
 		]);
 
 		if ($result) {
@@ -258,36 +195,25 @@
 
 	public function find(string $id): ?Operation
 	{
-		$row = $this->wpdb->get_row($this->wpdb->prepare(
-			"SELECT * FROM {$this->table} WHERE id = %s",
-			$id
-		));
-
+		$row = $this->table->get(['id' => $id]);
 		return $row ? $this->rowToOperation($row) : null;
 	}
 
 	public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
 	{
-		$sql = "SELECT * FROM {$this->table}
-            WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
+		$sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
 		$params = [$type, $userId];
 
 		foreach ($criteria as $key => $value) {
-			if ($value === null) {
-				continue;
-			}
+			if ($value === null) continue;
 			$sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
 			$params[] = '$.' . $key;
 			$params[] = (string) $value;
 		}
-
 		$sql .= " ORDER BY created_at DESC LIMIT 1";
 
-		$row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params));
-
-		$this->invalidateUser($userId);
-
-		return $row ? $this->rowToOperation($row) : null;
+		$rows = $this->table->queryResults($sql, $params);
+		return !empty($rows) ? $this->rowToOperation($rows[0]) : null;
 	}
 
 	public function getUserOperations(int $userId, array $filters = []): array
@@ -328,14 +254,12 @@
 		// Order by state priority, then created_at
 		$orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC";
 
-		$limit = $filters['limit'] ?? 50;
-		$params[] = $limit;
+		$params[] = $filters['limit'] ?? 50;
 
-		$rows = $this->wpdb->get_results($this->wpdb->prepare(
-			"SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) .
-			" ORDER BY {$orderBy} LIMIT %d",
-			...$params
-		));
+		$rows = $this->table->queryResults(
+			"SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d",
+			$params
+		);
 
 		return array_map([$this, 'rowToOperation'], $rows ?: []);
 	}
@@ -343,19 +267,17 @@
 	public function getQueueInfo(): array
 	{
 		$cached = $this->cache->get(self::CACHE_QUEUE_INFO);
-		if ($cached !== false) {
-			return $cached;
-		}
+		if ($cached !== false) return $cached;
 
 		$now = current_time('mysql');
-		$row = $this->wpdb->get_row($this->wpdb->prepare("
-            SELECT
-                COUNT(*) as total,
-                SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
-                SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
-            FROM {$this->table}
-            WHERE state IN ('pending', 'processing', 'scheduled')
-        ", $now));
+		$row = $this->table->queryResults("
+        SELECT COUNT(*) as total,
+            SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
+            SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
+        FROM {table}
+        WHERE state IN ('pending', 'processing', 'scheduled')
+    ", [$now]);
+		$row = $row[0] ?? null;
 
 		$info = [
 			'total'     => (int) ($row->total ?? 0),
@@ -399,33 +321,28 @@
 	public function getQueueStatus(): array
 	{
 		$now = current_time('mysql');
+		$rows = $this->table->queryResults("
+        SELECT state, COUNT(*) as count,
+            SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
+        FROM {table} GROUP BY state
+    ", [$now]);
 
-		$rows = $this->wpdb->get_results($this->wpdb->prepare("
-            SELECT
-                state,
-                COUNT(*) as count,
-                SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
-            FROM {$this->table}
-            GROUP BY state
-        ", $now), OBJECT_K);
-
+		$indexed = array_column($rows, null, 'state');
 		return [
-			'pending'         => (int) ($rows['pending']->count ?? 0),
-			'scheduled'       => (int) ($rows['scheduled']->count ?? 0),
-			'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0),
-			'processing'      => (int) ($rows['processing']->count ?? 0),
-			'completed'       => (int) ($rows['completed']->count ?? 0),
+			'pending'         => (int) ($indexed['pending']->count ?? 0),
+			'scheduled'       => (int) ($indexed['scheduled']->count ?? 0),
+			'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0),
+			'processing'      => (int) ($indexed['processing']->count ?? 0),
+			'completed'       => (int) ($indexed['completed']->count ?? 0),
 		];
 	}
 
 	public function getUserStats(int $userId): array
 	{
-		$rows = $this->wpdb->get_results($this->wpdb->prepare("
-            SELECT state, outcome, COUNT(*) as count
-            FROM {$this->table}
-            WHERE user_id = %d
-            GROUP BY state, outcome
-        ", $userId));
+		$rows = $this->table->queryResults(
+			"SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome",
+			[$userId]
+		);
 
 		$stats = [
 			'pending' => 0,
@@ -458,13 +375,7 @@
 
 	public function dismiss(string $id): bool
 	{
-		$result = $this->wpdb->update(
-			$this->table,
-			['user_dismissed' => 1, 'updated_at' => current_time('mysql')],
-			['id' => $id]
-		);
-
-		return $result !== false;
+		return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false;
 	}
 
 	/**
@@ -473,14 +384,8 @@
 	public function delete(string $id): bool
 	{
 		$op = $this->find($id);
-		$userId = $op?->userId;
-
-		$result = $this->wpdb->delete($this->table, ['id' => $id]);
-
-		if ($result && $userId) {
-			$this->invalidateUser($userId);
-		}
-
+		$result = $this->table->delete(['id' => $id]);
+		if ($result && $op) $this->invalidateUser($op->userId);
 		return $result !== false;
 	}
 
@@ -493,21 +398,16 @@
 	{
 		Cache::for($userId.'_queue')->flush();
 	}
-	public function getLastError(): string
-	{
-		return $this->wpdb->last_error;
-	}
 
 	public function withTransaction(callable $callback): mixed
 	{
-		$this->wpdb->query('START TRANSACTION');
-
+		$this->table->startTransaction();
 		try {
 			$result = $callback();
-			$this->wpdb->query('COMMIT');
+			$this->table->commit();
 			return $result;
 		} catch (\Throwable $e) {
-			$this->wpdb->query('ROLLBACK');
+			$this->table->rollback();
 			error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
 			throw $e;
 		}
@@ -518,36 +418,19 @@
 		return $this->withTransaction(function () use ($stuckMinutes) {
 			$cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
 
-			// Get IDs first for logging
-			$stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
-            SELECT id FROM {$this->table}
-            WHERE state = 'processing'
-              AND started_at < %s
-            FOR UPDATE
-        ", $cutoff));
+			$stuckIds = $this->table->queryResults(
+				"SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE",
+				[$cutoff]
+			);
+			$stuckIds = array_column($stuckIds, 'id');
 
-			if (empty($stuckIds)) {
-				return 0;
-			}
+			if (empty($stuckIds)) return 0;
 
-			// Reset them
-			$affected = $this->wpdb->query($this->wpdb->prepare("
-            UPDATE {$this->table}
-            SET state = 'scheduled',
-                scheduled_at = %s,
-                retries = retries + 1,
-                updated_at = %s
-            WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
-        ",
-				date('Y-m-d H:i:s', time() + 60),
-				current_time('mysql'),
-				...$stuckIds
-			));
-
-			// Optional: Log to audit table
-			// $this->logStuckReset($stuckIds);
-
-			return (int) $affected;
+			$placeholders = implode(',', array_fill(0, count($stuckIds), '%s'));
+			return (int) $this->table->query(
+				"UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})",
+				array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds)
+			);
 		});
 	}
 
@@ -557,23 +440,124 @@
 	public function replaceDependency(string $fromId, string $toId): int
 	{
 		return $this->withTransaction(function () use ($fromId, $toId) {
-
-			// Only affect pending/scheduled operations
-			$affected = $this->wpdb->query($this->wpdb->prepare("
-            UPDATE {$this->table}
-            SET dependencies = REPLACE(dependencies, %s, %s),
-                updated_at = %s
-            WHERE state IN ('pending', 'scheduled')
-              AND dependencies LIKE %s
-        ",
-				'"' . $fromId . '"',
-				'"' . $toId . '"',
-				current_time('mysql'),
-				'%"' . $fromId . '"%'
-			));
-
-			return (int) $affected;
+			return (int) $this->table->query(
+				"UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s",
+				['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%']
+			);
 		});
 	}
 
+	public function defineTables():void
+	{
+		$queue = CustomTable::for('_operation_queue');
+		$queue->setColumns([
+			'id'			=> 'VARCHAR(64) NOT NULL',
+			'type'			=> 'VARCHAR(50) NOT NULL',
+			'user_id'		=> $queue->getUserIDType().' NOT NULL',
+
+			'request_data'	=> 'JSON NOT NULL CHECK (JSON_VALID(request_data))',
+
+			'total_items'	=> 'INT(11) NOT NULL DEFAULT 1',
+			'processed_items'	=> 'INT(11) DEFAULT 0',
+			'failed_items'	=> 'JSON',
+
+			'priority'		=> 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'',
+			'state'			=> 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'',
+			'outcome'		=> 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'',
+
+			'retries'		=> 'INT(11) DEFAULT 0',
+			'last_error_hash'=> 'CHAR(32) DEFAULT NULL',
+			'error_message'	=> 'TEXT',
+
+			'scheduled_at'	=> 'DATETIME DEFAULT NULL',
+			'started_at'	=> 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+			'completed_at'	=> 'DATETIME DEFAULT NULL',
+
+			'metadata'		=> 'JSON DEFAULT NULL',
+			'result'		=> 'JSON',
+			'dependencies'	=> 'JSON',
+			'merged_into'	=> 'VARCHAR(64) DEFAULT NULL',
+
+			'user_dismissed'=> 'tinyint(1) DEFAULT 0',
+			'created_at'	=> 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+			'updated_at'	=> 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+		]);
+
+		$queue->setKeys([
+			['key' => 'PRIMARY', 'value' => '(`id`)'],
+			'`idx_run_queue` (`state`, `priority`, `scheduled_at`)',
+			'`idx_user_ops` (`user_id`, `state`)',
+			'`idx_user_type_pending` (`user_id`, `type`, `state`)',
+			'`idx_completed_at` (`completed_at`)',
+			'`idx_processing_stuck` (`state`, `started_at`)'
+		]);
+
+		$queue->defineTable();
+		$this->table = $queue;
+
+		$stats = CustomTable::for('stats__operation_queue');
+		$stats->setColumns([
+			'id'            => 'BIGINT unsigned AUTO_INCREMENT',
+			'date'          => 'DATE NOT NULL',
+			'type'          => 'VARCHAR(50) NOT NULL',
+
+			// Only store what can't be queried from the main table later
+			'peak_queue_size'   => 'INT NOT NULL DEFAULT 0',
+			'peak_memory_bytes' => 'BIGINT DEFAULT NULL',
+
+			// Snapshot totals for post-purge historical view
+			'total_operations'          => 'INT NOT NULL DEFAULT 0',
+			'successful_operations'     => 'INT NOT NULL DEFAULT 0',
+			'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0',
+			'total_items_processed'     => 'INT NOT NULL DEFAULT 0',
+
+			'created_at'    => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
+			'updated_at'    => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
+		]);
+
+		$stats->setKeys([
+			['key' => 'PRIMARY', 'value' => '(`id`)'],
+			['key' => 'UNIQUE', 'value' => '(`date`, `type`)'],
+			'`date_idx` (`date`)',
+			'`type_idx` (`type`)'
+		]);
+		$stats->defineTable();
+
+		$this->stats = $stats;
+	}
+
+	public function snapshotDaily(): void
+	{
+		$today = current_time('Y-m-d');
+		$types = $this->table->queryResults(
+			"SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s",
+			[$today]
+		);
+
+		foreach ($types as $row) {
+			$stats = $this->table->queryResults("
+            SELECT
+                COUNT(*) as total,
+                SUM(outcome = 'success') as successful,
+                SUM(outcome = 'failed_permanent') as failed_permanent,
+                SUM(processed_items) as items_processed
+            FROM {table}
+            WHERE type = %s AND DATE(completed_at) = %s
+        ", [$row->type, $today]);
+
+			$s = $stats[0] ?? null;
+			if (!$s) continue;
+
+			$this->stats->table->query("
+            INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed)
+            VALUES (%s, %s, %d, %d, %d, %d)
+            ON DUPLICATE KEY UPDATE
+                total_operations = VALUES(total_operations),
+                successful_operations = VALUES(successful_operations),
+                failed_permanent_operations = VALUES(failed_permanent_operations),
+                total_items_processed = VALUES(total_items_processed),
+                updated_at = NOW()
+        ", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]);
+		}
+	}
 }

--
Gitblit v1.10.0