From 226b50642af0895948fbaa623a9b7180399a63b6 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Wed, 13 May 2026 19:15:48 +0000
Subject: [PATCH] =Queue fixes
---
inc/managers/queue/Storage.php | 442 ++++++++++++++++++++++++++----------------------------
1 files changed, 213 insertions(+), 229 deletions(-)
diff --git a/inc/managers/queue/Storage.php b/inc/managers/queue/Storage.php
index 40426e6..2a5d540 100644
--- a/inc/managers/queue/Storage.php
+++ b/inc/managers/queue/Storage.php
@@ -5,48 +5,39 @@
}
use JVBase\managers\Cache;
+use JVBase\managers\CustomTable;
use LogicException;
class Storage
{
- private \wpdb $wpdb;
- private string $table;
+ private CustomTable $table;
+ private CustomTable $stats;
private Cache $cache;
private const CACHE_QUEUE_INFO = 'queue_info';
public function __construct()
{
- global $wpdb;
- $this->wpdb = $wpdb;
- $this->table = $wpdb->prefix . BASE . '_operation_queue';
+ $this->defineTables();
$this->cache = Cache::for('queue', DAY_IN_SECONDS);
}
public function hasProcessingOperations(): bool
{
- return (bool) $this->wpdb->get_var(
- "SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1"
- );
+ return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1");
}
public function fetchRunnable(int $offset = 0): array
{
$now = current_time('mysql');
-
- $rows = $this->wpdb->get_results(
- $this->wpdb->prepare("
- SELECT *
- FROM {$this->table}
- WHERE state IN ('pending', 'scheduled')
- AND scheduled_at <= %s
- ORDER BY
- FIELD(priority, 'high', 'normal', 'low'),
- scheduled_at
- LIMIT 10 OFFSET %d
- FOR UPDATE SKIP LOCKED
- ", $now, $offset)
- );
+ $rows = $this->table->queryResults("
+ SELECT * FROM {table}
+ WHERE state IN ('pending', 'scheduled')
+ AND scheduled_at <= %s
+ ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at
+ LIMIT 10 OFFSET %d
+ FOR UPDATE SKIP LOCKED
+ ", [$now, $offset]);
$total = count($rows);
foreach ($rows as $row) {
@@ -88,26 +79,22 @@
public function markProcessing(string $id): bool
{
$now = current_time('mysql');
-
- $affected = $this->wpdb->query($this->wpdb->prepare("
- UPDATE {$this->table}
- SET state = 'processing', started_at = %s, updated_at = %s
- WHERE id = %s AND state IN ('pending', 'scheduled')
- ", $now, $now, $id));
+ $affected = $this->table->query("
+ UPDATE {table}
+ SET state = 'processing', started_at = %s, updated_at = %s
+ WHERE id = %s AND state IN ('pending', 'scheduled')
+ ", [$now, $now, $id]);
if ($affected > 0) {
$op = $this->find($id);
- if ($op) {
- $this->invalidateUser($op->userId);
- }
+ if ($op) $this->invalidateUser($op->userId);
}
-
return $affected > 0;
}
public function save(Operation $op): bool
{
- $data = [
+ $result = $this->table->update([
'request_data' => json_encode($op->requestData),
'total_items' => $op->totalItems,
'processed_items' => $op->processedItems,
@@ -118,23 +105,17 @@
'retries' => $op->retries,
'last_error_hash' => $op->lastErrorHash,
'error_message' => $op->errorMessage,
- 'scheduled_at' => $op->scheduledAt,
- 'started_at' => $op->startedAt,
'completed_at' => $op->completedAt,
'metadata' => json_encode($op->metadata),
'result' => $op->result ? json_encode($op->result) : null,
'dependencies' => json_encode($op->dependencies),
'merged_into' => $op->merged_into,
'user_dismissed' => $op->userDismissed ? 1 : 0,
- 'updated_at' => current_time('mysql'),
- ];
+ ],
+ ['id' => $op->id]
+ );
- $result = $this->wpdb->update($this->table, $data, ['id' => $op->id]);
-
- if ($result !== false) {
- $this->invalidateUser($op->userId);
- }
-
+ if ($result !== false) $this->invalidateUser($op->userId);
return $result !== false;
}
@@ -143,35 +124,17 @@
* @param Operation $op
* @return bool
*/
- public function saveProgress(Operation $op): bool {
- global $wpdb;
-
- $table = $this->table;
-
- $data = [
+ public function saveProgress(Operation $op): bool
+ {
+ $result = $this->table->update([
'processed_items' => $op->processedItems ?? 0,
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
- 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null,
- 'result' => ($op->result) ? json_encode($op->result) :null,
- 'updated_at' => current_time('mysql'),
- ];
+ 'metadata' => $op->metadata ? json_encode($op->metadata) : null,
+ 'result' => $op->result ? json_encode($op->result) : null,
+ ], ['id' => $op->id, 'state' => 'processing']); // state guard preserved
- // IMPORTANT: never touch terminal state
- $where = [
- 'id' => $op->id,
- 'state' => 'processing',
- ];
-
- $updated = $wpdb->update($table, $data, $where);
-
- if ($updated === false) {
- error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
- return false;
- }
-
-
+ if ($result === false) return false;
$this->invalidateUser($op->userId);
-
return true;
}
@@ -180,50 +143,32 @@
* @param Operation $op
* @return bool
*/
- public function saveFinal(Operation $op): bool {
- global $wpdb;
-
- if (($op->state?? null) !== 'completed') {
+ public function saveFinal(Operation $op): bool
+ {
+ if ($op->state !== 'completed') {
throw new LogicException('saveFinal called without completed state');
}
- $table = $this->table;
+ $result = $this->table->update([
+ 'state' => 'completed',
+ 'outcome' => $op->outcome ?? 'success',
+ 'processed_items' => $op->processedItems ?? 0,
+ 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
+ 'result' => isset($op->result) ? wp_json_encode($op->result) : null,
+ 'completed_at' => $op->completedAt ?? current_time('mysql'),
+ ], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved
- $data = [
- 'state' => 'completed',
- 'outcome' => $op->outcome?? 'success',
- 'processed_items'=> $op->processedItems ?? 0,
- 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
- 'result' => isset($op->result) ? wp_json_encode($op->result) : null,
- 'completed_at' => $op->completedAt ?? current_time('mysql'),
- 'updated_at' => current_time('mysql'),
- ];
+ if ($result === 0) return true; // already completed, not an error
+ if ($result === false) return false;
- // HARD GUARD: cannot overwrite completed
- $where = [
- 'id' => $op->id,
- 'state' => 'processing',
- ];
-
- $updated = $wpdb->update($table, $data, $where);
-
- if ($updated === 0) {
- return true;
- }
-
- if ($updated === false) {
- error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
- return false;
- }
$this->invalidateQueueCache();
$this->invalidateUser($op->userId);
-
return true;
}
public function insert(Operation $op): bool
{
- $result = $this->wpdb->insert($this->table, [
+ $result = $this->table->insert([
'id' => $op->id,
'type' => $op->type,
'user_id' => $op->userId,
@@ -235,18 +180,10 @@
'state' => $op->state,
'outcome' => $op->outcome,
'retries' => 0,
- 'last_error_hash' => null,
- 'error_message' => null,
'scheduled_at' => $op->scheduledAt ?? current_time('mysql'),
- 'started_at' => null,
- 'completed_at' => null,
'metadata' => json_encode($op->metadata),
- 'result' => null,
'dependencies' => json_encode($op->dependencies),
- 'user_dismissed' => 0,
'merged_into' => null,
- 'created_at' => current_time('mysql'),
- 'updated_at' => current_time('mysql'),
]);
if ($result) {
@@ -258,36 +195,25 @@
public function find(string $id): ?Operation
{
- $row = $this->wpdb->get_row($this->wpdb->prepare(
- "SELECT * FROM {$this->table} WHERE id = %s",
- $id
- ));
-
+ $row = $this->table->get(['id' => $id]);
return $row ? $this->rowToOperation($row) : null;
}
public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
{
- $sql = "SELECT * FROM {$this->table}
- WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
+ $sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
$params = [$type, $userId];
foreach ($criteria as $key => $value) {
- if ($value === null) {
- continue;
- }
+ if ($value === null) continue;
$sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
$params[] = '$.' . $key;
$params[] = (string) $value;
}
-
$sql .= " ORDER BY created_at DESC LIMIT 1";
- $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params));
-
- $this->invalidateUser($userId);
-
- return $row ? $this->rowToOperation($row) : null;
+ $rows = $this->table->queryResults($sql, $params);
+ return !empty($rows) ? $this->rowToOperation($rows[0]) : null;
}
public function getUserOperations(int $userId, array $filters = []): array
@@ -328,14 +254,12 @@
// Order by state priority, then created_at
$orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC";
- $limit = $filters['limit'] ?? 50;
- $params[] = $limit;
+ $params[] = $filters['limit'] ?? 50;
- $rows = $this->wpdb->get_results($this->wpdb->prepare(
- "SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) .
- " ORDER BY {$orderBy} LIMIT %d",
- ...$params
- ));
+ $rows = $this->table->queryResults(
+ "SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d",
+ $params
+ );
return array_map([$this, 'rowToOperation'], $rows ?: []);
}
@@ -343,19 +267,17 @@
public function getQueueInfo(): array
{
$cached = $this->cache->get(self::CACHE_QUEUE_INFO);
- if ($cached !== false) {
- return $cached;
- }
+ if ($cached !== false) return $cached;
$now = current_time('mysql');
- $row = $this->wpdb->get_row($this->wpdb->prepare("
- SELECT
- COUNT(*) as total,
- SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
- SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
- FROM {$this->table}
- WHERE state IN ('pending', 'processing', 'scheduled')
- ", $now));
+ $row = $this->table->queryResults("
+ SELECT COUNT(*) as total,
+ SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
+ SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
+ FROM {table}
+ WHERE state IN ('pending', 'processing', 'scheduled')
+ ", [$now]);
+ $row = $row[0] ?? null;
$info = [
'total' => (int) ($row->total ?? 0),
@@ -399,33 +321,28 @@
public function getQueueStatus(): array
{
$now = current_time('mysql');
+ $rows = $this->table->queryResults("
+ SELECT state, COUNT(*) as count,
+ SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
+ FROM {table} GROUP BY state
+ ", [$now]);
- $rows = $this->wpdb->get_results($this->wpdb->prepare("
- SELECT
- state,
- COUNT(*) as count,
- SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
- FROM {$this->table}
- GROUP BY state
- ", $now), OBJECT_K);
-
+ $indexed = array_column($rows, null, 'state');
return [
- 'pending' => (int) ($rows['pending']->count ?? 0),
- 'scheduled' => (int) ($rows['scheduled']->count ?? 0),
- 'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0),
- 'processing' => (int) ($rows['processing']->count ?? 0),
- 'completed' => (int) ($rows['completed']->count ?? 0),
+ 'pending' => (int) ($indexed['pending']->count ?? 0),
+ 'scheduled' => (int) ($indexed['scheduled']->count ?? 0),
+ 'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0),
+ 'processing' => (int) ($indexed['processing']->count ?? 0),
+ 'completed' => (int) ($indexed['completed']->count ?? 0),
];
}
public function getUserStats(int $userId): array
{
- $rows = $this->wpdb->get_results($this->wpdb->prepare("
- SELECT state, outcome, COUNT(*) as count
- FROM {$this->table}
- WHERE user_id = %d
- GROUP BY state, outcome
- ", $userId));
+ $rows = $this->table->queryResults(
+ "SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome",
+ [$userId]
+ );
$stats = [
'pending' => 0,
@@ -458,13 +375,7 @@
public function dismiss(string $id): bool
{
- $result = $this->wpdb->update(
- $this->table,
- ['user_dismissed' => 1, 'updated_at' => current_time('mysql')],
- ['id' => $id]
- );
-
- return $result !== false;
+ return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false;
}
/**
@@ -473,14 +384,8 @@
public function delete(string $id): bool
{
$op = $this->find($id);
- $userId = $op?->userId;
-
- $result = $this->wpdb->delete($this->table, ['id' => $id]);
-
- if ($result && $userId) {
- $this->invalidateUser($userId);
- }
-
+ $result = $this->table->delete(['id' => $id]);
+ if ($result && $op) $this->invalidateUser($op->userId);
return $result !== false;
}
@@ -493,21 +398,16 @@
{
Cache::for($userId.'_queue')->flush();
}
- public function getLastError(): string
- {
- return $this->wpdb->last_error;
- }
public function withTransaction(callable $callback): mixed
{
- $this->wpdb->query('START TRANSACTION');
-
+ $this->table->startTransaction();
try {
$result = $callback();
- $this->wpdb->query('COMMIT');
+ $this->table->commit();
return $result;
} catch (\Throwable $e) {
- $this->wpdb->query('ROLLBACK');
+ $this->table->rollback();
error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
throw $e;
}
@@ -518,36 +418,19 @@
return $this->withTransaction(function () use ($stuckMinutes) {
$cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
- // Get IDs first for logging
- $stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
- SELECT id FROM {$this->table}
- WHERE state = 'processing'
- AND started_at < %s
- FOR UPDATE
- ", $cutoff));
+ $stuckIds = $this->table->queryResults(
+ "SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE",
+ [$cutoff]
+ );
+ $stuckIds = array_column($stuckIds, 'id');
- if (empty($stuckIds)) {
- return 0;
- }
+ if (empty($stuckIds)) return 0;
- // Reset them
- $affected = $this->wpdb->query($this->wpdb->prepare("
- UPDATE {$this->table}
- SET state = 'scheduled',
- scheduled_at = %s,
- retries = retries + 1,
- updated_at = %s
- WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
- ",
- date('Y-m-d H:i:s', time() + 60),
- current_time('mysql'),
- ...$stuckIds
- ));
-
- // Optional: Log to audit table
- // $this->logStuckReset($stuckIds);
-
- return (int) $affected;
+ $placeholders = implode(',', array_fill(0, count($stuckIds), '%s'));
+ return (int) $this->table->query(
+ "UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})",
+ array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds)
+ );
});
}
@@ -557,23 +440,124 @@
public function replaceDependency(string $fromId, string $toId): int
{
return $this->withTransaction(function () use ($fromId, $toId) {
-
- // Only affect pending/scheduled operations
- $affected = $this->wpdb->query($this->wpdb->prepare("
- UPDATE {$this->table}
- SET dependencies = REPLACE(dependencies, %s, %s),
- updated_at = %s
- WHERE state IN ('pending', 'scheduled')
- AND dependencies LIKE %s
- ",
- '"' . $fromId . '"',
- '"' . $toId . '"',
- current_time('mysql'),
- '%"' . $fromId . '"%'
- ));
-
- return (int) $affected;
+ return (int) $this->table->query(
+ "UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s",
+ ['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%']
+ );
});
}
+ public function defineTables():void
+ {
+ $queue = CustomTable::for('_operation_queue');
+ $queue->setColumns([
+ 'id' => 'VARCHAR(64) NOT NULL',
+ 'type' => 'VARCHAR(50) NOT NULL',
+ 'user_id' => $queue->getUserIDType().' NOT NULL',
+
+ 'request_data' => 'JSON NOT NULL CHECK (JSON_VALID(request_data))',
+
+ 'total_items' => 'INT(11) NOT NULL DEFAULT 1',
+ 'processed_items' => 'INT(11) DEFAULT 0',
+ 'failed_items' => 'JSON',
+
+ 'priority' => 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'',
+ 'state' => 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'',
+ 'outcome' => 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'',
+
+ 'retries' => 'INT(11) DEFAULT 0',
+ 'last_error_hash'=> 'CHAR(32) DEFAULT NULL',
+ 'error_message' => 'TEXT',
+
+ 'scheduled_at' => 'DATETIME DEFAULT NULL',
+ 'started_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+ 'completed_at' => 'DATETIME DEFAULT NULL',
+
+ 'metadata' => 'JSON DEFAULT NULL',
+ 'result' => 'JSON',
+ 'dependencies' => 'JSON',
+ 'merged_into' => 'VARCHAR(64) DEFAULT NULL',
+
+ 'user_dismissed'=> 'tinyint(1) DEFAULT 0',
+ 'created_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+ 'updated_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
+ ]);
+
+ $queue->setKeys([
+ ['key' => 'PRIMARY', 'value' => '(`id`)'],
+ '`idx_run_queue` (`state`, `priority`, `scheduled_at`)',
+ '`idx_user_ops` (`user_id`, `state`)',
+ '`idx_user_type_pending` (`user_id`, `type`, `state`)',
+ '`idx_completed_at` (`completed_at`)',
+ '`idx_processing_stuck` (`state`, `started_at`)'
+ ]);
+
+ $queue->defineTable();
+ $this->table = $queue;
+
+ $stats = CustomTable::for('stats__operation_queue');
+ $stats->setColumns([
+ 'id' => 'BIGINT unsigned AUTO_INCREMENT',
+ 'date' => 'DATE NOT NULL',
+ 'type' => 'VARCHAR(50) NOT NULL',
+
+ // Only store what can't be queried from the main table later
+ 'peak_queue_size' => 'INT NOT NULL DEFAULT 0',
+ 'peak_memory_bytes' => 'BIGINT DEFAULT NULL',
+
+ // Snapshot totals for post-purge historical view
+ 'total_operations' => 'INT NOT NULL DEFAULT 0',
+ 'successful_operations' => 'INT NOT NULL DEFAULT 0',
+ 'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0',
+ 'total_items_processed' => 'INT NOT NULL DEFAULT 0',
+
+ 'created_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
+ 'updated_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
+ ]);
+
+ $stats->setKeys([
+ ['key' => 'PRIMARY', 'value' => '(`id`)'],
+ ['key' => 'UNIQUE', 'value' => '(`date`, `type`)'],
+ '`date_idx` (`date`)',
+ '`type_idx` (`type`)'
+ ]);
+ $stats->defineTable();
+
+ $this->stats = $stats;
+ }
+
+ public function snapshotDaily(): void
+ {
+ $today = current_time('Y-m-d');
+ $types = $this->table->queryResults(
+ "SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s",
+ [$today]
+ );
+
+ foreach ($types as $row) {
+ $stats = $this->table->queryResults("
+ SELECT
+ COUNT(*) as total,
+ SUM(outcome = 'success') as successful,
+ SUM(outcome = 'failed_permanent') as failed_permanent,
+ SUM(processed_items) as items_processed
+ FROM {table}
+ WHERE type = %s AND DATE(completed_at) = %s
+ ", [$row->type, $today]);
+
+ $s = $stats[0] ?? null;
+ if (!$s) continue;
+
+ $this->stats->table->query("
+ INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed)
+ VALUES (%s, %s, %d, %d, %d, %d)
+ ON DUPLICATE KEY UPDATE
+ total_operations = VALUES(total_operations),
+ successful_operations = VALUES(successful_operations),
+ failed_permanent_operations = VALUES(failed_permanent_operations),
+ total_items_processed = VALUES(total_items_processed),
+ updated_at = NOW()
+ ", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]);
+ }
+ }
}
--
Gitblit v1.10.0