Jake Vanderwerf
2026-02-17 a24a06002081ad71a78ffeff9072725ba39cf121
inc/managers/queue/Storage.php
@@ -30,7 +30,7 @@
      );
   }
   public function fetchRunnable(int $limit = 10): array
   public function fetchRunnable(int $offset = 0): array
   {
      $now = current_time('mysql');
@@ -43,18 +43,48 @@
            ORDER BY
              FIELD(priority, 'high', 'normal', 'low'),
              scheduled_at
            LIMIT %d
            LIMIT 10 OFFSET %d
            FOR UPDATE SKIP LOCKED
        ", $now, $limit)
        ", $now, $offset)
      );
      return array_map([$this, 'rowToOperation'], $rows ?: []);
      $total = count($rows);
      foreach ($rows as $row) {
         $dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
         if (empty($dependencies)) {
            return [$this->rowToOperation($row)];
         }
         $totalDep = count($dependencies);
         $completed = [];
         $notCompleted = [];
         foreach ($dependencies as $dep) {
            $dependency = $this->find($dep);
            if ($dependency) {
               if ($dependency->state === 'completed') {
                  $completed[] = $dep;
               } else {
                  $notCompleted[] = $dep;
               }
            }
         }
         if (count($completed) === $totalDep) {
            return [$this->rowToOperation($row)];
         }
      }
      //If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10
      if ($total === 10) {
         return $this->fetchRunnable($offset + 10);
      }
      //If, for whatever reason, nothing still is found, there likely are none
      return [];
   }
   public function markProcessing(string $id): bool
   {
      $now = current_time('mysql');
@@ -94,6 +124,7 @@
         'metadata'        => json_encode($op->metadata),
         'result'          => $op->result ? json_encode($op->result) : null,
         'dependencies'    => json_encode($op->dependencies),
         'merged_into'     => $op->merged_into,
         'user_dismissed'  => $op->userDismissed ? 1 : 0,
         'updated_at'      => current_time('mysql'),
      ];
@@ -162,7 +193,7 @@
         'state'          => 'completed',
         'outcome'        => $op->outcome?? 'success',
         'processed_items'=> $op->processedItems ?? 0,
         'failed_items'   => $op->failedItems    ?? null,
         'failed_items'     => $op->failedItems ? json_encode($op->failedItems) : null,
         'result'         => isset($op->result) ? wp_json_encode($op->result) : null,
         'completed_at'   => $op->completedAt ?? current_time('mysql'),
         'updated_at'     => current_time('mysql'),
@@ -213,6 +244,7 @@
         'result'          => null,
         'dependencies'    => json_encode($op->dependencies),
         'user_dismissed'  => 0,
         'merged_into'       => null,
         'created_at'      => current_time('mysql'),
         'updated_at'      => current_time('mysql'),
      ]);
@@ -234,14 +266,24 @@
      return $row ? $this->rowToOperation($row) : null;
   }
   public function findMergeable(string $type, int $userId): ?Operation
   public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
   {
      $row = $this->wpdb->get_row($this->wpdb->prepare(
         "SELECT * FROM {$this->table}
             WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')
             ORDER BY created_at DESC LIMIT 1",
         $type, $userId
      ));
      $sql = "SELECT * FROM {$this->table}
            WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
      $params = [$type, $userId];
      foreach ($criteria as $key => $value) {
         if ($value === null) {
            continue;
         }
         $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
         $params[] = '$.' . $key;
         $params[] = (string) $value;
      }
      $sql .= " ORDER BY created_at DESC LIMIT 1";
      $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params));
      $this->invalidateUser($userId);
@@ -348,6 +390,7 @@
      $op->startedAt      = $row->started_at;
      $op->completedAt    = $row->completed_at;
      $op->result         = $row->result ? json_decode($row->result, true) : null;
      $op->merged_into    = $row->merged_into;
      $op->userDismissed  = (bool) $row->user_dismissed;
      return $op;
@@ -507,4 +550,30 @@
         return (int) $affected;
      });
   }
   /**
    * @throws \Throwable
    */
   public function replaceDependency(string $fromId, string $toId): int
   {
      return $this->withTransaction(function () use ($fromId, $toId) {
         // Only affect pending/scheduled operations
         $affected = $this->wpdb->query($this->wpdb->prepare("
            UPDATE {$this->table}
            SET dependencies = REPLACE(dependencies, %s, %s),
                updated_at = %s
            WHERE state IN ('pending', 'scheduled')
              AND dependencies LIKE %s
        ",
            '"' . $fromId . '"',
            '"' . $toId . '"',
            current_time('mysql'),
            '%"' . $fromId . '"%'
         ));
         return (int) $affected;
      });
   }
}