Jake Vanderwerf
2026-02-04 2127b1bdd73ecd2423e443992da4b442f5a3c1a3
inc/managers/queue/Storage.php
@@ -4,15 +4,15 @@
   exit;
}
use JVBase\managers\CacheManager;
use JVBase\managers\Cache;
use LogicException;
class Storage
{
   private \wpdb $wpdb;
   private string $table;
   private CacheManager $cache;
   private Cache $cache;
   private const CACHE_USER_PREFIX = 'user_queue_';
   private const CACHE_QUEUE_INFO = 'queue_info';
   public function __construct()
@@ -20,7 +20,7 @@
      global $wpdb;
      $this->wpdb = $wpdb;
      $this->table = $wpdb->prefix . BASE . '_operation_queue';
      $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
      $this->cache = Cache::for('queue', DAY_IN_SECONDS);
   }
   public function hasProcessingOperations(): bool
@@ -34,29 +34,27 @@
   {
      $now = current_time('mysql');
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
        SELECT oq.* FROM {$this->table} oq
        WHERE oq.state IN ('pending', 'scheduled')
          AND oq.scheduled_at <= %s
          AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                  COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                  '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                 OR dep.outcome NOT IN ('success', 'partial')
          )
        ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
        LIMIT %d
    ", $now, $limit));
      $rows = $this->wpdb->get_results(
         $this->wpdb->prepare("
            SELECT *
            FROM {$this->table}
            WHERE state IN ('pending', 'scheduled')
              AND scheduled_at <= %s
            ORDER BY
              FIELD(priority, 'high', 'normal', 'low'),
              scheduled_at
            LIMIT %d
            FOR UPDATE SKIP LOCKED
        ", $now, $limit)
      );
      return array_map([$this, 'rowToOperation'], $rows ?: []);
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
@@ -109,6 +107,85 @@
      return $result !== false;
   }
   /**
    * For saving a 'step' in an operation; usually after each chunk
    * @param Operation $op
    * @return bool
    */
   public function saveProgress(Operation $op): bool {
      global $wpdb;
      $table = $this->table;
      $data = [
         'processed_items' => $op->processedItems ?? 0,
         'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
         'metadata'        => ($op->metadata) ? json_encode($op->metadata) : null,
         'result'          => ($op->result) ? json_encode($op->result) :null,
         'updated_at'      => current_time('mysql'),
      ];
      // IMPORTANT: never touch terminal state
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      if ($updated === false) {
         error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
         return false;
      }
      return true;
   }
   /**
    * The operation is complete, let's progress to 'completed'
    * @param Operation $op
    * @return bool
    */
   public function saveFinal(Operation $op): bool {
      global $wpdb;
      if (($op->state?? null) !== 'completed') {
         throw new LogicException('saveFinal called without completed state');
      }
      $table = $this->table;
      $data = [
         'state'          => 'completed',
         'outcome'        => $op->outcome?? 'success',
         'processed_items'=> $op->processedItems ?? 0,
         'failed_items'   => $op->failedItems    ?? null,
         'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'   => $op->completedAt ?? current_time('mysql'),
         'updated_at'     => current_time('mysql'),
      ];
      // HARD GUARD: cannot overwrite completed
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      $this->invalidateQueueCache();
      if ($updated === 0) {
         return true;
      }
      if ($updated === false) {
         error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
         return false;
      }
      return true;
   }
   public function insert(Operation $op): bool
   {
      $result = $this->wpdb->insert($this->table, [
@@ -360,13 +437,68 @@
   public function invalidateQueueCache(): void
   {
      $this->cache->delete(self::CACHE_QUEUE_INFO);
      $this->cache->touch();
      $this->cache->forget(self::CACHE_QUEUE_INFO);
   }
   private function invalidateUser(int $userId): void
   {
      CacheManager::invalidateAll("user_{$userId}");
      $this->cache->delete(self::CACHE_QUEUE_INFO);
      $this->cache->forget($userId);
   }
   public function getLastError(): string
   {
      return $this->wpdb->last_error;
   }
   public function withTransaction(callable $callback): mixed
   {
      $this->wpdb->query('START TRANSACTION');
      try {
         $result = $callback();
         $this->wpdb->query('COMMIT');
         return $result;
      } catch (\Throwable $e) {
         $this->wpdb->query('ROLLBACK');
         error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
         throw $e;
      }
   }
   public function resetStuckOperations(int $stuckMinutes = 30): int
   {
      return $this->withTransaction(function () use ($stuckMinutes) {
         $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
         // Get IDs first for logging
         $stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
            SELECT id FROM {$this->table}
            WHERE state = 'processing'
              AND started_at < %s
            FOR UPDATE
        ", $cutoff));
         if (empty($stuckIds)) {
            return 0;
         }
         // Reset them
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET state = 'scheduled',
                scheduled_at = %s,
                retries = retries + 1,
                updated_at = %s
            WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
        ",
            date('Y-m-d H:i:s', time() + 60),
            current_time('mysql'),
            ...$stuckIds
         ));
         // Optional: Log to audit table
         // $this->logStuckReset($stuckIds);
         return (int) $affected;
      });
   }
}