Jake Vanderwerf
9 days ago 47e77f9fac1155c536b2b87fec552c7fcce66fa6
inc/managers/queue/Storage.php
@@ -4,82 +4,97 @@
   exit;
}
use JVBase\managers\CacheManager;
use JVBase\managers\Cache;
use JVBase\managers\CustomTable;
use LogicException;
class Storage
{
   private \wpdb $wpdb;
   private string $table;
   private CacheManager $cache;
   private CustomTable $table;
   private CustomTable $stats;
   private Cache $cache;
   private const CACHE_USER_PREFIX = 'user_queue_';
   private const CACHE_QUEUE_INFO = 'queue_info';
   public function __construct()
   {
      global $wpdb;
      $this->wpdb = $wpdb;
      $this->table = $wpdb->prefix . BASE . '_operation_queue';
      $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
      $this->defineTables();
      $this->cache = Cache::for('queue', DAY_IN_SECONDS);
   }
   public function hasProcessingOperations(): bool
   {
      return (bool) $this->wpdb->get_var(
         "SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1"
      );
      return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1");
   }
   public function fetchRunnable(int $limit = 10): array
   public function fetchRunnable(int $offset = 0): array
   {
      $now = current_time('mysql');
      $rows = $this->table->queryResults("
        SELECT * FROM {table}
        WHERE state IN ('pending', 'scheduled')
          AND scheduled_at <= %s
        ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at
        LIMIT 10 OFFSET %d
        FOR UPDATE SKIP LOCKED
    ", [$now, $offset]);
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
        SELECT oq.* FROM {$this->table} oq
        WHERE oq.state IN ('pending', 'scheduled')
          AND oq.scheduled_at <= %s
          AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                  COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                  '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                 OR dep.outcome NOT IN ('success', 'partial')
          )
        ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
        LIMIT %d
    ", $now, $limit));
      $total = count($rows);
      foreach ($rows as $row) {
         $dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
         if (empty($dependencies)) {
            return [$this->rowToOperation($row)];
         }
         $totalDep = count($dependencies);
         $completed = [];
         $notCompleted = [];
         foreach ($dependencies as $dep) {
            $dependency = $this->find($dep);
            if ($dependency) {
               if ($dependency->state === 'completed') {
                  $completed[] = $dep;
               } else {
                  $notCompleted[] = $dep;
               }
            }
         }
         if (count($completed) === $totalDep) {
            return [$this->rowToOperation($row)];
         }
      }
      //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10
      if ($total === 10) {
         return $this->fetchRunnable($offset + 10);
      }
      return array_map([$this, 'rowToOperation'], $rows ?: []);
      //If, for whatever reason, nothing still is found, there likely are none
      return [];
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
      $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET state = 'processing', started_at = %s, updated_at = %s
            WHERE id = %s AND state IN ('pending', 'scheduled')
        ", $now, $now, $id));
      $affected = $this->table->query("
        UPDATE {table}
        SET state = 'processing', started_at = %s, updated_at = %s
        WHERE id = %s AND state IN ('pending', 'scheduled')
    ", [$now, $now, $id]);
      if ($affected > 0) {
         $op = $this->find($id);
         if ($op) {
            $this->invalidateUser($op->userId);
         }
         if ($op) $this->invalidateUser($op->userId);
      }
      return $affected > 0;
   }
   public function save(Operation $op): bool
   {
      $data = [
      $result = $this->table->update([
         'request_data'    => json_encode($op->requestData),
         'total_items'     => $op->totalItems,
         'processed_items' => $op->processedItems,
@@ -90,28 +105,70 @@
         'retries'         => $op->retries,
         'last_error_hash' => $op->lastErrorHash,
         'error_message'   => $op->errorMessage,
         'scheduled_at'    => $op->scheduledAt,
         'started_at'      => $op->startedAt,
         'completed_at'    => $op->completedAt,
         'metadata'        => json_encode($op->metadata),
         'result'          => $op->result ? json_encode($op->result) : null,
         'dependencies'    => json_encode($op->dependencies),
         'merged_into'     => $op->merged_into,
         'user_dismissed'  => $op->userDismissed ? 1 : 0,
         'updated_at'      => current_time('mysql'),
      ];
      ],
         ['id' => $op->id]
      );
      $result = $this->wpdb->update($this->table, $data, ['id' => $op->id]);
      if ($result !== false) $this->invalidateUser($op->userId);
      return $result !== false;
   }
      if ($result !== false) {
         $this->invalidateUser($op->userId);
   /**
    * For saving a 'step' in an operation; usually after each chunk
    * @param Operation $op
    * @return bool
    */
   public function saveProgress(Operation $op): bool
   {
      $result = $this->table->update([
         'processed_items' => $op->processedItems ?? 0,
         'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
         'metadata'        => $op->metadata ? json_encode($op->metadata) : null,
         'result'          => $op->result ? json_encode($op->result) : null,
      ], ['id' => $op->id, 'state' => 'processing']); // state guard preserved
      if ($result === false) return false;
      $this->invalidateUser($op->userId);
      return true;
   }
   /**
    * The operation is complete, let's progress to 'completed'
    * @param Operation $op
    * @return bool
    */
   public function saveFinal(Operation $op): bool
   {
      if ($op->state !== 'completed') {
         throw new LogicException('saveFinal called without completed state');
      }
      return $result !== false;
      $result = $this->table->update([
         'state'           => 'completed',
         'outcome'         => $op->outcome ?? 'success',
         'processed_items' => $op->processedItems ?? 0,
         'failed_items'    => $op->failedItems ? json_encode($op->failedItems) : null,
         'result'          => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'    => $op->completedAt ?? current_time('mysql'),
      ], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved
      if ($result === 0) return true; // already completed, not an error
      if ($result === false) return false;
      $this->invalidateQueueCache();
      $this->invalidateUser($op->userId);
      return true;
   }
   public function insert(Operation $op): bool
   {
      $result = $this->wpdb->insert($this->table, [
      $result = $this->table->insert([
         'id'              => $op->id,
         'type'            => $op->type,
         'user_id'         => $op->userId,
@@ -123,17 +180,10 @@
         'state'           => $op->state,
         'outcome'         => $op->outcome,
         'retries'         => 0,
         'last_error_hash' => null,
         'error_message'   => null,
         'scheduled_at'    => $op->scheduledAt ?? current_time('mysql'),
         'started_at'      => null,
         'completed_at'    => null,
         'metadata'        => json_encode($op->metadata),
         'result'          => null,
         'dependencies'    => json_encode($op->dependencies),
         'user_dismissed'  => 0,
         'created_at'      => current_time('mysql'),
         'updated_at'      => current_time('mysql'),
         'merged_into'       => null,
      ]);
      if ($result) {
@@ -145,24 +195,25 @@
   public function find(string $id): ?Operation
   {
      $row = $this->wpdb->get_row($this->wpdb->prepare(
         "SELECT * FROM {$this->table} WHERE id = %s",
         $id
      ));
      $row = $this->table->get(['id' => $id]);
      return $row ? $this->rowToOperation($row) : null;
   }
   public function findMergeable(string $type, int $userId): ?Operation
   public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
   {
      $row = $this->wpdb->get_row($this->wpdb->prepare(
         "SELECT * FROM {$this->table}
             WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')
             ORDER BY created_at DESC LIMIT 1",
         $type, $userId
      ));
      $sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
      $params = [$type, $userId];
      return $row ? $this->rowToOperation($row) : null;
      foreach ($criteria as $key => $value) {
         if ($value === null) continue;
         $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
         $params[] = '$.' . $key;
         $params[] = (string) $value;
      }
      $sql .= " ORDER BY created_at DESC LIMIT 1";
      $rows = $this->table->queryResults($sql, $params);
      return !empty($rows) ? $this->rowToOperation($rows[0]) : null;
   }
   public function getUserOperations(int $userId, array $filters = []): array
@@ -203,14 +254,12 @@
      // Order by state priority, then created_at
      $orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC";
      $limit = $filters['limit'] ?? 50;
      $params[] = $limit;
      $params[] = $filters['limit'] ?? 50;
      $rows = $this->wpdb->get_results($this->wpdb->prepare(
         "SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) .
         " ORDER BY {$orderBy} LIMIT %d",
         ...$params
      ));
      $rows = $this->table->queryResults(
         "SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d",
         $params
      );
      return array_map([$this, 'rowToOperation'], $rows ?: []);
   }
@@ -218,19 +267,17 @@
   public function getQueueInfo(): array
   {
      $cached = $this->cache->get(self::CACHE_QUEUE_INFO);
      if ($cached !== false) {
         return $cached;
      }
      if ($cached !== false) return $cached;
      $now = current_time('mysql');
      $row = $this->wpdb->get_row($this->wpdb->prepare("
            SELECT
                COUNT(*) as total,
                SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
                SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
            FROM {$this->table}
            WHERE state IN ('pending', 'processing', 'scheduled')
        ", $now));
      $row = $this->table->queryResults("
        SELECT COUNT(*) as total,
            SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
            SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
        FROM {table}
        WHERE state IN ('pending', 'processing', 'scheduled')
    ", [$now]);
      $row = $row[0] ?? null;
      $info = [
         'total'     => (int) ($row->total ?? 0),
@@ -265,6 +312,7 @@
      $op->startedAt      = $row->started_at;
      $op->completedAt    = $row->completed_at;
      $op->result         = $row->result ? json_decode($row->result, true) : null;
      $op->merged_into    = $row->merged_into;
      $op->userDismissed  = (bool) $row->user_dismissed;
      return $op;
@@ -273,33 +321,28 @@
   public function getQueueStatus(): array
   {
      $now = current_time('mysql');
      $rows = $this->table->queryResults("
        SELECT state, COUNT(*) as count,
            SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
        FROM {table} GROUP BY state
    ", [$now]);
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
            SELECT
                state,
                COUNT(*) as count,
                SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
            FROM {$this->table}
            GROUP BY state
        ", $now), OBJECT_K);
      $indexed = array_column($rows, null, 'state');
      return [
         'pending'         => (int) ($rows['pending']->count ?? 0),
         'scheduled'       => (int) ($rows['scheduled']->count ?? 0),
         'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0),
         'processing'      => (int) ($rows['processing']->count ?? 0),
         'completed'       => (int) ($rows['completed']->count ?? 0),
         'pending'         => (int) ($indexed['pending']->count ?? 0),
         'scheduled'       => (int) ($indexed['scheduled']->count ?? 0),
         'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0),
         'processing'      => (int) ($indexed['processing']->count ?? 0),
         'completed'       => (int) ($indexed['completed']->count ?? 0),
      ];
   }
   public function getUserStats(int $userId): array
   {
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
            SELECT state, outcome, COUNT(*) as count
            FROM {$this->table}
            WHERE user_id = %d
            GROUP BY state, outcome
        ", $userId));
      $rows = $this->table->queryResults(
         "SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome",
         [$userId]
      );
      $stats = [
         'pending' => 0,
@@ -332,13 +375,7 @@
   public function dismiss(string $id): bool
   {
      $result = $this->wpdb->update(
         $this->table,
         ['user_dismissed' => 1, 'updated_at' => current_time('mysql')],
         ['id' => $id]
      );
      return $result !== false;
      return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false;
   }
   /**
@@ -347,26 +384,180 @@
   public function delete(string $id): bool
   {
      $op = $this->find($id);
      $userId = $op?->userId;
      $result = $this->wpdb->delete($this->table, ['id' => $id]);
      if ($result && $userId) {
         $this->invalidateUser($userId);
      }
      $result = $this->table->delete(['id' => $id]);
      if ($result && $op) $this->invalidateUser($op->userId);
      return $result !== false;
   }
   public function invalidateQueueCache(): void
   {
      $this->cache->delete(self::CACHE_QUEUE_INFO);
      $this->cache->touch();
      $this->cache->forget(self::CACHE_QUEUE_INFO);
   }
   private function invalidateUser(int $userId): void
   {
      CacheManager::invalidateAll("user_{$userId}");
      $this->cache->delete(self::CACHE_QUEUE_INFO);
      Cache::for($userId.'_queue')->flush();
   }
   public function withTransaction(callable $callback): mixed
   {
      $this->table->startTransaction();
      try {
         $result = $callback();
         $this->table->commit();
         return $result;
      } catch (\Throwable $e) {
         $this->table->rollback();
         error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
         throw $e;
      }
   }
   public function resetStuckOperations(int $stuckMinutes = 30): int
   {
      return $this->withTransaction(function () use ($stuckMinutes) {
         $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
         $stuckIds = $this->table->queryResults(
            "SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE",
            [$cutoff]
         );
         $stuckIds = array_column($stuckIds, 'id');
         if (empty($stuckIds)) return 0;
         $placeholders = implode(',', array_fill(0, count($stuckIds), '%s'));
         return (int) $this->table->query(
            "UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})",
            array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds)
         );
      });
   }
   /**
    * @throws \Throwable
    */
   public function replaceDependency(string $fromId, string $toId): int
   {
      return $this->withTransaction(function () use ($fromId, $toId) {
         return (int) $this->table->query(
            "UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s",
            ['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%']
         );
      });
   }
   public function defineTables():void
   {
      $queue = CustomTable::for('_operation_queue');
      $queue->setColumns([
         'id'        => 'VARCHAR(64) NOT NULL',
         'type'         => 'VARCHAR(50) NOT NULL',
         'user_id'      => $queue->getUserIDType().' NOT NULL',
         'request_data' => 'JSON NOT NULL CHECK (JSON_VALID(request_data))',
         'total_items'  => 'INT(11) NOT NULL DEFAULT 1',
         'processed_items' => 'INT(11) DEFAULT 0',
         'failed_items' => 'JSON',
         'priority'     => 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'',
         'state'        => 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'',
         'outcome'      => 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'',
         'retries'      => 'INT(11) DEFAULT 0',
         'last_error_hash'=> 'CHAR(32) DEFAULT NULL',
         'error_message'   => 'TEXT',
         'scheduled_at' => 'DATETIME DEFAULT NULL',
         'started_at'   => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
         'completed_at' => 'DATETIME DEFAULT NULL',
         'metadata'     => 'JSON DEFAULT NULL',
         'result'    => 'JSON',
         'dependencies' => 'JSON',
         'merged_into'  => 'VARCHAR(64) DEFAULT NULL',
         'user_dismissed'=> 'tinyint(1) DEFAULT 0',
         'created_at'   => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
         'updated_at'   => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
      ]);
      $queue->setKeys([
         ['key' => 'PRIMARY', 'value' => '(`id`)'],
         '`idx_run_queue` (`state`, `priority`, `scheduled_at`)',
         '`idx_user_ops` (`user_id`, `state`)',
         '`idx_user_type_pending` (`user_id`, `type`, `state`)',
         '`idx_completed_at` (`completed_at`)',
         '`idx_processing_stuck` (`state`, `started_at`)'
      ]);
      $queue->defineTable();
      $this->table = $queue;
      $stats = CustomTable::for('stats__operation_queue');
      $stats->setColumns([
         'id'            => 'BIGINT unsigned AUTO_INCREMENT',
         'date'          => 'DATE NOT NULL',
         'type'          => 'VARCHAR(50) NOT NULL',
         // Only store what can't be queried from the main table later
         'peak_queue_size'   => 'INT NOT NULL DEFAULT 0',
         'peak_memory_bytes' => 'BIGINT DEFAULT NULL',
         // Snapshot totals for post-purge historical view
         'total_operations'          => 'INT NOT NULL DEFAULT 0',
         'successful_operations'     => 'INT NOT NULL DEFAULT 0',
         'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0',
         'total_items_processed'     => 'INT NOT NULL DEFAULT 0',
         'created_at'    => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
         'updated_at'    => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
      ]);
      $stats->setKeys([
         ['key' => 'PRIMARY', 'value' => '(`id`)'],
         ['key' => 'UNIQUE', 'value' => '(`date`, `type`)'],
         '`date_idx` (`date`)',
         '`type_idx` (`type`)'
      ]);
      $stats->defineTable();
      $this->stats = $stats;
   }
   public function snapshotDaily(): void
   {
      $today = current_time('Y-m-d');
      $types = $this->table->queryResults(
         "SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s",
         [$today]
      );
      foreach ($types as $row) {
         $stats = $this->table->queryResults("
            SELECT
                COUNT(*) as total,
                SUM(outcome = 'success') as successful,
                SUM(outcome = 'failed_permanent') as failed_permanent,
                SUM(processed_items) as items_processed
            FROM {table}
            WHERE type = %s AND DATE(completed_at) = %s
        ", [$row->type, $today]);
         $s = $stats[0] ?? null;
         if (!$s) continue;
         $this->stats->table->query("
            INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed)
            VALUES (%s, %s, %d, %d, %d, %d)
            ON DUPLICATE KEY UPDATE
                total_operations = VALUES(total_operations),
                successful_operations = VALUES(successful_operations),
                failed_permanent_operations = VALUES(failed_permanent_operations),
                total_items_processed = VALUES(total_items_processed),
                updated_at = NOW()
        ", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]);
      }
   }
}