Jake Vanderwerf
2026-02-17 a24a06002081ad71a78ffeff9072725ba39cf121
inc/managers/queue/Storage.php
@@ -30,36 +30,61 @@
      );
   }
   public function fetchRunnable(int $limit = 10): array
   public function fetchRunnable(int $offset = 0): array
   {
      $now = current_time('mysql');
      $rows = $this->wpdb->get_results($this->wpdb->prepare("
         SELECT oq.* FROM {$this->table} oq
         WHERE oq.state IN ('pending', 'scheduled')
           AND oq.scheduled_at <= %s
           AND NOT EXISTS (
              SELECT 1
              FROM JSON_TABLE(
                 COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
                 '\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
              ) AS deps
              JOIN {$this->table} dep ON dep.id = deps.dep_id
              WHERE dep.state != 'completed'
                OR dep.outcome NOT IN ('success', 'partial')
           )
         ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
         LIMIT %d
         FOR UPDATE SKIP LOCKED
      ", $now, $limit));
      $rows = $this->wpdb->get_results(
         $this->wpdb->prepare("
            SELECT *
            FROM {$this->table}
            WHERE state IN ('pending', 'scheduled')
              AND scheduled_at <= %s
            ORDER BY
              FIELD(priority, 'high', 'normal', 'low'),
              scheduled_at
            LIMIT 10 OFFSET %d
            FOR UPDATE SKIP LOCKED
        ", $now, $offset)
      );
      return array_map([$this, 'rowToOperation'], $rows ?: []);
      $total = count($rows);
      foreach ($rows as $row) {
         $dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
         if (empty($dependencies)) {
            return [$this->rowToOperation($row)];
         }
         $totalDep = count($dependencies);
         $completed = [];
         $notCompleted = [];
         foreach ($dependencies as $dep) {
            $dependency = $this->find($dep);
            if ($dependency) {
               if ($dependency->state === 'completed') {
                  $completed[] = $dep;
               } else {
                  $notCompleted[] = $dep;
               }
            }
         }
         if (count($completed) === $totalDep) {
            return [$this->rowToOperation($row)];
         }
      }
      //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10
      if ($total === 10) {
         return $this->fetchRunnable($offset + 10);
      }
      //If, for whatever reason, nothing still is found, there likely are none
      return [];
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
@@ -99,6 +124,7 @@
         'metadata'        => json_encode($op->metadata),
         'result'          => $op->result ? json_encode($op->result) : null,
         'dependencies'    => json_encode($op->dependencies),
         'merged_into'     => $op->merged_into,
         'user_dismissed'  => $op->userDismissed ? 1 : 0,
         'updated_at'      => current_time('mysql'),
      ];
@@ -143,6 +169,9 @@
         return false;
      }
      $this->invalidateUser($op->userId);
      return true;
   }
@@ -164,7 +193,7 @@
         'state'          => 'completed',
         'outcome'        => $op->outcome?? 'success',
         'processed_items'=> $op->processedItems ?? 0,
         'failed_items'   => $op->failedItems    ?? null,
         'failed_items'     => $op->failedItems ? json_encode($op->failedItems) : null,
         'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'   => $op->completedAt ?? current_time('mysql'),
         'updated_at'     => current_time('mysql'),
@@ -177,7 +206,6 @@
      ];
      $updated = $wpdb->update($table, $data, $where);
      $this->invalidateQueueCache();
      if ($updated === 0) {
         return true;
@@ -187,6 +215,8 @@
         error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
         return false;
      }
      $this->invalidateQueueCache();
      $this->invalidateUser($op->userId);
      return true;
   }
@@ -214,6 +244,7 @@
         'result'          => null,
         'dependencies'    => json_encode($op->dependencies),
         'user_dismissed'  => 0,
         'merged_into'       => null,
         'created_at'      => current_time('mysql'),
         'updated_at'      => current_time('mysql'),
      ]);
@@ -235,14 +266,26 @@
      return $row ? $this->rowToOperation($row) : null;
   }
   public function findMergeable(string $type, int $userId): ?Operation
   public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
   {
      $row = $this->wpdb->get_row($this->wpdb->prepare(
         "SELECT * FROM {$this->table}
             WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')
             ORDER BY created_at DESC LIMIT 1",
         $type, $userId
      ));
      $sql = "SELECT * FROM {$this->table}
            WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
      $params = [$type, $userId];
      foreach ($criteria as $key => $value) {
         if ($value === null) {
            continue;
         }
         $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
         $params[] = '$.' . $key;
         $params[] = (string) $value;
      }
      $sql .= " ORDER BY created_at DESC LIMIT 1";
      $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params));
      $this->invalidateUser($userId);
      return $row ? $this->rowToOperation($row) : null;
   }
@@ -347,6 +390,7 @@
      $op->startedAt      = $row->started_at;
      $op->completedAt    = $row->completed_at;
      $op->result         = $row->result ? json_decode($row->result, true) : null;
      $op->merged_into    = $row->merged_into;
      $op->userDismissed  = (bool) $row->user_dismissed;
      return $op;
@@ -447,7 +491,7 @@
   private function invalidateUser(int $userId): void
   {
      $this->cache->forget($userId);
      Cache::for($userId.'_queue')->flush();
   }
   public function getLastError(): string
   {
@@ -506,4 +550,30 @@
         return (int) $affected;
      });
   }
   /**
    * @throws \Throwable
    */
   public function replaceDependency(string $fromId, string $toId): int
   {
      return $this->withTransaction(function () use ($fromId, $toId) {
         // Only affect pending/scheduled operations
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET dependencies = REPLACE(dependencies, %s, %s),
                updated_at = %s
            WHERE state IN ('pending', 'scheduled')
              AND dependencies LIKE %s
        ",
            '"' . $fromId . '"',
            '"' . $toId . '"',
            current_time('mysql'),
            '%"' . $fromId . '"%'
         ));
         return (int) $affected;
      });
   }
}