From 2127b1bdd73ecd2423e443992da4b442f5a3c1a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Wed, 04 Feb 2026 21:19:25 +0000
Subject: [PATCH] =Major overhaul of MetaManager.php -> Meta.php and RestRouteManager.php -> Rest.php. Seems to work for JakeVan
---
inc/managers/queue/Storage.php | 182 +++++++++++++++++++++++++++++++++++++++------
1 files changed, 157 insertions(+), 25 deletions(-)
diff --git a/inc/managers/queue/Storage.php b/inc/managers/queue/Storage.php
index 2aa9ab6..99cd8c3 100644
--- a/inc/managers/queue/Storage.php
+++ b/inc/managers/queue/Storage.php
@@ -4,15 +4,15 @@
exit;
}
-use JVBase\managers\CacheManager;
+use JVBase\managers\Cache;
+use LogicException;
class Storage
{
private \wpdb $wpdb;
private string $table;
- private CacheManager $cache;
+ private Cache $cache;
- private const CACHE_USER_PREFIX = 'user_queue_';
private const CACHE_QUEUE_INFO = 'queue_info';
public function __construct()
@@ -20,7 +20,7 @@
global $wpdb;
$this->wpdb = $wpdb;
$this->table = $wpdb->prefix . BASE . '_operation_queue';
- $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
+ $this->cache = Cache::for('queue', DAY_IN_SECONDS);
}
public function hasProcessingOperations(): bool
@@ -34,29 +34,27 @@
{
$now = current_time('mysql');
- $rows = $this->wpdb->get_results($this->wpdb->prepare("
- SELECT oq.* FROM {$this->table} oq
- WHERE oq.state IN ('pending', 'scheduled')
- AND oq.scheduled_at <= %s
- AND NOT EXISTS (
- SELECT 1
- FROM JSON_TABLE(
- COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
- '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
- ) AS deps
- JOIN {$this->table} dep ON dep.id = deps.dep_id
- WHERE dep.state != 'completed'
- OR dep.outcome NOT IN ('success', 'partial')
- )
- ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
- LIMIT %d
- ", $now, $limit));
+ $rows = $this->wpdb->get_results(
+ $this->wpdb->prepare("
+ SELECT *
+ FROM {$this->table}
+ WHERE state IN ('pending', 'scheduled')
+ AND scheduled_at <= %s
+ ORDER BY
+ FIELD(priority, 'high', 'normal', 'low'),
+ scheduled_at
+ LIMIT %d
+ FOR UPDATE SKIP LOCKED
+ ", $now, $limit)
+ );
return array_map([$this, 'rowToOperation'], $rows ?: []);
}
+
+
public function markProcessing(string $id): bool
{
$now = current_time('mysql');
@@ -109,6 +107,85 @@
return $result !== false;
}
+ /**
+ * For saving a 'step' in an operation; usually after each chunk
+ * @param Operation $op
+ * @return bool
+ */
+ public function saveProgress(Operation $op): bool {
+ global $wpdb;
+
+ $table = $this->table;
+
+ $data = [
+ 'processed_items' => $op->processedItems ?? 0,
+ 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
+ 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null,
+ 'result' => ($op->result) ? json_encode($op->result) :null,
+ 'updated_at' => current_time('mysql'),
+ ];
+
+ // IMPORTANT: never touch terminal state
+ $where = [
+ 'id' => $op->id,
+ 'state' => 'processing',
+ ];
+
+ $updated = $wpdb->update($table, $data, $where);
+
+ if ($updated === false) {
+ error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * The operation is complete, let's progress to 'completed'
+ * @param Operation $op
+ * @return bool
+ */
+ public function saveFinal(Operation $op): bool {
+ global $wpdb;
+
+ if (($op->state?? null) !== 'completed') {
+ throw new LogicException('saveFinal called without completed state');
+ }
+
+ $table = $this->table;
+
+ $data = [
+ 'state' => 'completed',
+ 'outcome' => $op->outcome?? 'success',
+ 'processed_items'=> $op->processedItems ?? 0,
+ 'failed_items' => $op->failedItems ?? null,
+ 'result' => isset($op->result) ? wp_json_encode($op->result) : null,
+ 'completed_at' => $op->completedAt ?? current_time('mysql'),
+ 'updated_at' => current_time('mysql'),
+ ];
+
+ // HARD GUARD: cannot overwrite completed
+ $where = [
+ 'id' => $op->id,
+ 'state' => 'processing',
+ ];
+
+ $updated = $wpdb->update($table, $data, $where);
+ $this->invalidateQueueCache();
+
+ if ($updated === 0) {
+ return true;
+ }
+
+ if ($updated === false) {
+ error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
+ return false;
+ }
+
+ return true;
+ }
+
public function insert(Operation $op): bool
{
$result = $this->wpdb->insert($this->table, [
@@ -360,13 +437,68 @@
public function invalidateQueueCache(): void
{
- $this->cache->delete(self::CACHE_QUEUE_INFO);
- $this->cache->touch();
+ $this->cache->forget(self::CACHE_QUEUE_INFO);
}
private function invalidateUser(int $userId): void
{
- CacheManager::invalidateAll("user_{$userId}");
- $this->cache->delete(self::CACHE_QUEUE_INFO);
+ $this->cache->forget($userId);
+ }
+ public function getLastError(): string
+ {
+ return $this->wpdb->last_error;
+ }
+
+ public function withTransaction(callable $callback): mixed
+ {
+ $this->wpdb->query('START TRANSACTION');
+
+ try {
+ $result = $callback();
+ $this->wpdb->query('COMMIT');
+ return $result;
+ } catch (\Throwable $e) {
+ $this->wpdb->query('ROLLBACK');
+ error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
+ throw $e;
+ }
+ }
+
+ public function resetStuckOperations(int $stuckMinutes = 30): int
+ {
+ return $this->withTransaction(function () use ($stuckMinutes) {
+ $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
+
+ // Get IDs first for logging
+ $stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
+ SELECT id FROM {$this->table}
+ WHERE state = 'processing'
+ AND started_at < %s
+ FOR UPDATE
+ ", $cutoff));
+
+ if (empty($stuckIds)) {
+ return 0;
+ }
+
+ // Reset them
+ $affected = $this->wpdb->query($this->wpdb->prepare("
+ UPDATE {$this->table}
+ SET state = 'scheduled',
+ scheduled_at = %s,
+ retries = retries + 1,
+ updated_at = %s
+ WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
+ ",
+ date('Y-m-d H:i:s', time() + 60),
+ current_time('mysql'),
+ ...$stuckIds
+ ));
+
+ // Optional: Log to audit table
+ // $this->logStuckReset($stuckIds);
+
+ return (int) $affected;
+ });
}
}
--
Gitblit v1.10.0