Jake Vanderwerf
2026-01-28 8c6502de2f8ec2bd8382cd6945c327d7be400e14
inc/managers/queue/Storage.php
@@ -5,6 +5,7 @@
}
use JVBase\managers\Cache;
use LogicException;
class Storage
{
@@ -34,28 +35,31 @@
      $now = current_time('mysql');
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
        SELECT oq.* FROM {$this->table} oq
        WHERE oq.state IN ('pending', 'scheduled')
          AND oq.scheduled_at <= %s
          AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                  COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                  '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                 OR dep.outcome NOT IN ('success', 'partial')
          )
        ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
        LIMIT %d
    ", $now, $limit));
         SELECT oq.* FROM {$this->table} oq
         WHERE oq.state IN ('pending', 'scheduled')
           AND oq.scheduled_at <= %s
           AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                 COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                 '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                OR dep.outcome NOT IN ('success', 'partial')
           )
         ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
         LIMIT %d
         FOR UPDATE SKIP LOCKED
      ", $now, $limit));
      return array_map([$this, 'rowToOperation'], $rows ?: []);
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
@@ -108,6 +112,85 @@
      return $result !== false;
   }
   /**
    * For saving a 'step' in an operation; usually after each chunk
    * @param Operation $op
    * @return bool
    */
   public function saveProgress(Operation $op): bool {
      global $wpdb;
      $table = $this->table;
      $data = [
         'processed_items' => $op->processedItems ?? 0,
         'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
         'metadata'        => ($op->metadata) ? json_encode($op->metadata) : null,
         'result'          => ($op->result) ? json_encode($op->result) :null,
         'updated_at'      => current_time('mysql'),
      ];
      // IMPORTANT: never touch terminal state
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      if ($updated === false) {
         error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
         return false;
      }
      return true;
   }
   /**
    * The operation is complete, let's progress to 'completed'
    * @param Operation $op
    * @return bool
    */
   public function saveFinal(Operation $op): bool {
      global $wpdb;
      if (($op->state?? null) !== 'completed') {
         throw new LogicException('saveFinal called without completed state');
      }
      $table = $this->table;
      $data = [
         'state'          => 'completed',
         'outcome'        => $op->outcome?? 'success',
         'processed_items'=> $op->processedItems ?? 0,
         'failed_items'   => $op->failedItems    ?? null,
         'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'   => $op->completedAt ?? current_time('mysql'),
         'updated_at'     => current_time('mysql'),
      ];
      // HARD GUARD: cannot overwrite completed
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      $this->invalidateQueueCache();
      if ($updated === 0) {
         return true;
      }
      if ($updated === false) {
         error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
         return false;
      }
      return true;
   }
   public function insert(Operation $op): bool
   {
      $result = $this->wpdb->insert($this->table, [
@@ -366,4 +449,61 @@
   {
      $this->cache->forget($userId);
   }
   public function getLastError(): string
   {
      return $this->wpdb->last_error;
   }
   public function withTransaction(callable $callback): mixed
   {
      $this->wpdb->query('START TRANSACTION');
      try {
         $result = $callback();
         $this->wpdb->query('COMMIT');
         return $result;
      } catch (\Throwable $e) {
         $this->wpdb->query('ROLLBACK');
         error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
         throw $e;
      }
   }
   public function resetStuckOperations(int $stuckMinutes = 30): int
   {
      return $this->withTransaction(function () use ($stuckMinutes) {
         $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
         // Get IDs first for logging
         $stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
            SELECT id FROM {$this->table}
            WHERE state = 'processing'
              AND started_at < %s
            FOR UPDATE
        ", $cutoff));
         if (empty($stuckIds)) {
            return 0;
         }
         // Reset them
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET state = 'scheduled',
                scheduled_at = %s,
                retries = retries + 1,
                updated_at = %s
            WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
        ",
            date('Y-m-d H:i:s', time() + 60),
            current_time('mysql'),
            ...$stuckIds
         ));
         // Optional: Log to audit table
         // $this->logStuckReset($stuckIds);
         return (int) $affected;
      });
   }
}