Jake Vanderwerf
2026-02-17 a24a06002081ad71a78ffeff9072725ba39cf121
inc/managers/queue/Storage.php
@@ -5,6 +5,7 @@
}
use JVBase\managers\Cache;
use LogicException;
class Storage
{
@@ -29,33 +30,61 @@
      );
   }
   public function fetchRunnable(int $limit = 10): array
   public function fetchRunnable(int $offset = 0): array
   {
      $now = current_time('mysql');
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
        SELECT oq.* FROM {$this->table} oq
        WHERE oq.state IN ('pending', 'scheduled')
          AND oq.scheduled_at <= %s
          AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                  COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                  '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                 OR dep.outcome NOT IN ('success', 'partial')
          )
        ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
        LIMIT %d
    ", $now, $limit));
      $rows = $this->wpdb->get_results(
         $this->wpdb->prepare("
            SELECT *
            FROM {$this->table}
            WHERE state IN ('pending', 'scheduled')
              AND scheduled_at <= %s
            ORDER BY
              FIELD(priority, 'high', 'normal', 'low'),
              scheduled_at
            LIMIT 10 OFFSET %d
            FOR UPDATE SKIP LOCKED
        ", $now, $offset)
      );
      return array_map([$this, 'rowToOperation'], $rows ?: []);
      $total = count($rows);
      foreach ($rows as $row) {
         $dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
         if (empty($dependencies)) {
            return [$this->rowToOperation($row)];
         }
         $totalDep = count($dependencies);
         $completed = [];
         $notCompleted = [];
         foreach ($dependencies as $dep) {
            $dependency = $this->find($dep);
            if ($dependency) {
               if ($dependency->state === 'completed') {
                  $completed[] = $dep;
               } else {
                  $notCompleted[] = $dep;
               }
            }
         }
         if (count($completed) === $totalDep) {
            return [$this->rowToOperation($row)];
         }
      }
      //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10
      if ($total === 10) {
         return $this->fetchRunnable($offset + 10);
      }
      //If, for whatever reason, nothing still is found, there likely are none
      return [];
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
@@ -95,6 +124,7 @@
         'metadata'        => json_encode($op->metadata),
         'result'          => $op->result ? json_encode($op->result) : null,
         'dependencies'    => json_encode($op->dependencies),
         'merged_into'     => $op->merged_into,
         'user_dismissed'  => $op->userDismissed ? 1 : 0,
         'updated_at'      => current_time('mysql'),
      ];
@@ -108,6 +138,89 @@
      return $result !== false;
   }
   /**
    * For saving a 'step' in an operation; usually after each chunk
    * @param Operation $op
    * @return bool
    */
   public function saveProgress(Operation $op): bool {
      global $wpdb;
      $table = $this->table;
      $data = [
         'processed_items' => $op->processedItems ?? 0,
         'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
         'metadata'        => ($op->metadata) ? json_encode($op->metadata) : null,
         'result'          => ($op->result) ? json_encode($op->result) :null,
         'updated_at'      => current_time('mysql'),
      ];
      // IMPORTANT: never touch terminal state
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      if ($updated === false) {
         error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
         return false;
      }
      $this->invalidateUser($op->userId);
      return true;
   }
   /**
    * The operation is complete, let's progress to 'completed'
    * @param Operation $op
    * @return bool
    */
   public function saveFinal(Operation $op): bool {
      global $wpdb;
      if (($op->state?? null) !== 'completed') {
         throw new LogicException('saveFinal called without completed state');
      }
      $table = $this->table;
      $data = [
         'state'          => 'completed',
         'outcome'        => $op->outcome?? 'success',
         'processed_items'=> $op->processedItems ?? 0,
         'failed_items'     => $op->failedItems ? json_encode($op->failedItems) : null,
         'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'   => $op->completedAt ?? current_time('mysql'),
         'updated_at'     => current_time('mysql'),
      ];
      // HARD GUARD: cannot overwrite completed
      $where = [
         'id'    => $op->id,
         'state' => 'processing',
      ];
      $updated = $wpdb->update($table, $data, $where);
      if ($updated === 0) {
         return true;
      }
      if ($updated === false) {
         error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
         return false;
      }
      $this->invalidateQueueCache();
      $this->invalidateUser($op->userId);
      return true;
   }
   public function insert(Operation $op): bool
   {
      $result = $this->wpdb->insert($this->table, [
@@ -131,6 +244,7 @@
         'result'          => null,
         'dependencies'    => json_encode($op->dependencies),
         'user_dismissed'  => 0,
         'merged_into'       => null,
         'created_at'      => current_time('mysql'),
         'updated_at'      => current_time('mysql'),
      ]);
@@ -152,14 +266,26 @@
      return $row ? $this->rowToOperation($row) : null;
   }
   public function findMergeable(string $type, int $userId): ?Operation
   public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
   {
      $row = $this->wpdb->get_row($this->wpdb->prepare(
         "SELECT * FROM {$this->table}
             WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')
             ORDER BY created_at DESC LIMIT 1",
         $type, $userId
      ));
      $sql = "SELECT * FROM {$this->table}
            WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
      $params = [$type, $userId];
      foreach ($criteria as $key => $value) {
         if ($value === null) {
            continue;
         }
         $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
         $params[] = '$.' . $key;
         $params[] = (string) $value;
      }
      $sql .= " ORDER BY created_at DESC LIMIT 1";
      $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params));
      $this->invalidateUser($userId);
      return $row ? $this->rowToOperation($row) : null;
   }
@@ -264,6 +390,7 @@
      $op->startedAt      = $row->started_at;
      $op->completedAt    = $row->completed_at;
      $op->result         = $row->result ? json_decode($row->result, true) : null;
      $op->merged_into    = $row->merged_into;
      $op->userDismissed  = (bool) $row->user_dismissed;
      return $op;
@@ -364,6 +491,89 @@
   private function invalidateUser(int $userId): void
   {
      $this->cache->forget($userId);
      Cache::for($userId.'_queue')->flush();
   }
   public function getLastError(): string
   {
      return $this->wpdb->last_error;
   }
   public function withTransaction(callable $callback): mixed
   {
      $this->wpdb->query('START TRANSACTION');
      try {
         $result = $callback();
         $this->wpdb->query('COMMIT');
         return $result;
      } catch (\Throwable $e) {
         $this->wpdb->query('ROLLBACK');
         error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
         throw $e;
      }
   }
   public function resetStuckOperations(int $stuckMinutes = 30): int
   {
      return $this->withTransaction(function () use ($stuckMinutes) {
         $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
         // Get IDs first for logging
         $stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
            SELECT id FROM {$this->table}
            WHERE state = 'processing'
              AND started_at < %s
            FOR UPDATE
        ", $cutoff));
         if (empty($stuckIds)) {
            return 0;
         }
         // Reset them
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET state = 'scheduled',
                scheduled_at = %s,
                retries = retries + 1,
                updated_at = %s
            WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
        ",
            date('Y-m-d H:i:s', time() + 60),
            current_time('mysql'),
            ...$stuckIds
         ));
         // Optional: Log to audit table
         // $this->logStuckReset($stuckIds);
         return (int) $affected;
      });
   }
   /**
    * @throws \Throwable
    */
   public function replaceDependency(string $fromId, string $toId): int
   {
      return $this->withTransaction(function () use ($fromId, $toId) {
         // Only affect pending/scheduled operations
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET dependencies = REPLACE(dependencies, %s, %s),
                updated_at = %s
            WHERE state IN ('pending', 'scheduled')
              AND dependencies LIKE %s
        ",
            '"' . $fromId . '"',
            '"' . $toId . '"',
            current_time('mysql'),
            '%"' . $fromId . '"%'
         ));
         return (int) $affected;
      });
   }
}