<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
use JVBase\managers\Cache;
|
use JVBase\managers\CustomTable;
|
use LogicException;
|
|
class Storage
|
{
|
private CustomTable $table;
|
private CustomTable $stats;
|
private Cache $cache;
|
|
private const CACHE_QUEUE_INFO = 'queue_info';
|
|
public function __construct()
|
{
|
$this->defineTables();
|
$this->cache = Cache::for('queue', DAY_IN_SECONDS);
|
}
|
|
public function hasProcessingOperations(): bool
|
{
|
return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1");
|
}
|
|
public function fetchRunnable(int $offset = 0): array
|
{
|
$now = current_time('mysql');
|
$rows = $this->table->queryResults("
|
SELECT * FROM {table}
|
WHERE state IN ('pending', 'scheduled')
|
AND scheduled_at <= %s
|
ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at
|
LIMIT 10 OFFSET %d
|
FOR UPDATE SKIP LOCKED
|
", [$now, $offset]);
|
|
$total = count($rows);
|
foreach ($rows as $row) {
|
$dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
|
if (empty($dependencies)) {
|
return [$this->rowToOperation($row)];
|
}
|
$totalDep = count($dependencies);
|
$completed = [];
|
$notCompleted = [];
|
foreach ($dependencies as $dep) {
|
$dependency = $this->find($dep);
|
if ($dependency) {
|
if ($dependency->state === 'completed') {
|
$completed[] = $dep;
|
} else {
|
$notCompleted[] = $dep;
|
}
|
}
|
}
|
if (count($completed) === $totalDep) {
|
return [$this->rowToOperation($row)];
|
}
|
}
|
//If we didn't find any operations from that 10 that are ready to go, or their dependencies aren't met, try the next 10
|
if ($total === 10) {
|
return $this->fetchRunnable($offset + 10);
|
}
|
|
//If, for whatever reason, nothing still is found, there likely are none
|
return [];
|
}
|
|
|
|
|
|
|
public function markProcessing(string $id): bool
|
{
|
$now = current_time('mysql');
|
$affected = $this->table->query("
|
UPDATE {table}
|
SET state = 'processing', started_at = %s, updated_at = %s
|
WHERE id = %s AND state IN ('pending', 'scheduled')
|
", [$now, $now, $id]);
|
|
if ($affected > 0) {
|
$op = $this->find($id);
|
if ($op) $this->invalidateUser($op->userId);
|
}
|
return $affected > 0;
|
}
|
|
public function save(Operation $op): bool
|
{
|
$result = $this->table->update([
|
'request_data' => json_encode($op->requestData),
|
'total_items' => $op->totalItems,
|
'processed_items' => $op->processedItems,
|
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
|
'priority' => $op->priority,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'retries' => $op->retries,
|
'last_error_hash' => $op->lastErrorHash,
|
'error_message' => $op->errorMessage,
|
'completed_at' => $op->completedAt,
|
'metadata' => json_encode($op->metadata),
|
'result' => $op->result ? json_encode($op->result) : null,
|
'dependencies' => json_encode($op->dependencies),
|
'merged_into' => $op->merged_into,
|
'user_dismissed' => $op->userDismissed ? 1 : 0,
|
],
|
['id' => $op->id]
|
);
|
|
if ($result !== false) $this->invalidateUser($op->userId);
|
return $result !== false;
|
}
|
|
/**
|
* For saving a 'step' in an operation; usually after each chunk
|
* @param Operation $op
|
* @return bool
|
*/
|
public function saveProgress(Operation $op): bool
|
{
|
$result = $this->table->update([
|
'processed_items' => $op->processedItems ?? 0,
|
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
|
'metadata' => $op->metadata ? json_encode($op->metadata) : null,
|
'result' => $op->result ? json_encode($op->result) : null,
|
], ['id' => $op->id, 'state' => 'processing']); // state guard preserved
|
|
if ($result === false) return false;
|
$this->invalidateUser($op->userId);
|
return true;
|
}
|
|
/**
|
* The operation is complete, let's progress to 'completed'
|
* @param Operation $op
|
* @return bool
|
*/
|
public function saveFinal(Operation $op): bool
|
{
|
if ($op->state !== 'completed') {
|
throw new LogicException('saveFinal called without completed state');
|
}
|
|
$result = $this->table->update([
|
'state' => 'completed',
|
'outcome' => $op->outcome ?? 'success',
|
'processed_items' => $op->processedItems ?? 0,
|
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
|
'result' => isset($op->result) ? wp_json_encode($op->result) : null,
|
'completed_at' => $op->completedAt ?? current_time('mysql'),
|
], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved
|
|
if ($result === 0) return true; // already completed, not an error
|
if ($result === false) return false;
|
|
$this->invalidateQueueCache();
|
$this->invalidateUser($op->userId);
|
return true;
|
}
|
|
public function insert(Operation $op): bool
|
{
|
$result = $this->table->insert([
|
'id' => $op->id,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'request_data' => json_encode($op->requestData),
|
'total_items' => $op->totalItems,
|
'processed_items' => $op->processedItems,
|
'failed_items' => null,
|
'priority' => $op->priority,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'retries' => 0,
|
'scheduled_at' => $op->scheduledAt ?? current_time('mysql'),
|
'metadata' => json_encode($op->metadata),
|
'dependencies' => json_encode($op->dependencies),
|
'merged_into' => null,
|
]);
|
|
if ($result) {
|
$this->invalidateUser($op->userId);
|
}
|
|
return $result !== false;
|
}
|
|
public function find(string $id): ?Operation
|
{
|
$row = $this->table->get(['id' => $id]);
|
return $row ? $this->rowToOperation($row) : null;
|
}
|
|
public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation
|
{
|
$sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')";
|
$params = [$type, $userId];
|
|
foreach ($criteria as $key => $value) {
|
if ($value === null) continue;
|
$sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s";
|
$params[] = '$.' . $key;
|
$params[] = (string) $value;
|
}
|
$sql .= " ORDER BY created_at DESC LIMIT 1";
|
|
$rows = $this->table->queryResults($sql, $params);
|
return !empty($rows) ? $this->rowToOperation($rows[0]) : null;
|
}
|
|
public function getUserOperations(int $userId, array $filters = []): array
|
{
|
$where = ['user_id = %d'];
|
$params = [$userId];
|
|
if (!empty($filters['state'])) {
|
$where[] = 'state = %s';
|
$params[] = $filters['state'];
|
}
|
if (!empty($filters['type'])) {
|
$where[] = 'type = %s';
|
$params[] = $filters['type'];
|
}
|
if (!empty($filters['outcome'])) {
|
$outcomes = (array) $filters['outcome'];
|
$placeholders = implode(',', array_fill(0, count($outcomes), '%s'));
|
$where[] = "outcome IN ($placeholders)";
|
$params = array_merge($params, $outcomes);
|
}
|
if (!empty($filters['not_dismissed'])) {
|
$where[] = 'user_dismissed = 0';
|
}
|
if (!empty($filters['active'])) {
|
$where[] = "state IN ('pending', 'scheduled', 'processing')";
|
}
|
if (!empty($filters['has_errors'])) {
|
$where[] = "(error_message IS NOT NULL OR failed_items IS NOT NULL)";
|
}
|
if (!empty($filters['ids'])) {
|
$ids = (array) $filters['ids'];
|
$placeholders = implode(',', array_fill(0, count($ids), '%s'));
|
$where[] = "id IN ($placeholders)";
|
$params = array_merge($params, $ids);
|
}
|
|
// Order by state priority, then created_at
|
$orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC";
|
|
$params[] = $filters['limit'] ?? 50;
|
|
$rows = $this->table->queryResults(
|
"SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d",
|
$params
|
);
|
|
return array_map([$this, 'rowToOperation'], $rows ?: []);
|
}
|
|
public function getQueueInfo(): array
|
{
|
$cached = $this->cache->get(self::CACHE_QUEUE_INFO);
|
if ($cached !== false) return $cached;
|
|
$now = current_time('mysql');
|
$row = $this->table->queryResults("
|
SELECT COUNT(*) as total,
|
SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
|
SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
|
FROM {table}
|
WHERE state IN ('pending', 'processing', 'scheduled')
|
", [$now]);
|
$row = $row[0] ?? null;
|
|
$info = [
|
'total' => (int) ($row->total ?? 0),
|
'active' => (int) ($row->active ?? 0),
|
'ready' => (int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0),
|
'has_items' => ((int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0)) > 0,
|
];
|
|
$this->cache->set(self::CACHE_QUEUE_INFO, $info, 30);
|
return $info;
|
}
|
|
private function rowToOperation(object $row): Operation
|
{
|
$op = new Operation();
|
$op->id = $row->id;
|
$op->type = $row->type;
|
$op->userId = (int) $row->user_id;
|
$op->requestData = json_decode($row->request_data ?? '{}', true) ?: [];
|
$op->metadata = json_decode($row->metadata ?? '{}', true) ?: [];
|
$op->dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
|
$op->totalItems = (int) $row->total_items;
|
$op->processedItems = (int) $row->processed_items;
|
$op->failedItems = $row->failed_items ? json_decode($row->failed_items, true) : null;
|
$op->priority = $row->priority;
|
$op->state = $row->state;
|
$op->outcome = $row->outcome;
|
$op->retries = (int) $row->retries;
|
$op->lastErrorHash = $row->last_error_hash;
|
$op->errorMessage = $row->error_message;
|
$op->scheduledAt = $row->scheduled_at;
|
$op->startedAt = $row->started_at;
|
$op->completedAt = $row->completed_at;
|
$op->result = $row->result ? json_decode($row->result, true) : null;
|
$op->merged_into = $row->merged_into;
|
$op->userDismissed = (bool) $row->user_dismissed;
|
|
return $op;
|
}
|
|
public function getQueueStatus(): array
|
{
|
$now = current_time('mysql');
|
$rows = $this->table->queryResults("
|
SELECT state, COUNT(*) as count,
|
SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
|
FROM {table} GROUP BY state
|
", [$now]);
|
|
$indexed = array_column($rows, null, 'state');
|
return [
|
'pending' => (int) ($indexed['pending']->count ?? 0),
|
'scheduled' => (int) ($indexed['scheduled']->count ?? 0),
|
'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0),
|
'processing' => (int) ($indexed['processing']->count ?? 0),
|
'completed' => (int) ($indexed['completed']->count ?? 0),
|
];
|
}
|
|
public function getUserStats(int $userId): array
|
{
|
$rows = $this->table->queryResults(
|
"SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome",
|
[$userId]
|
);
|
|
$stats = [
|
'pending' => 0,
|
'scheduled' => 0,
|
'processing' => 0,
|
'completed' => 0,
|
'failed' => 0,
|
'failed_permanent' => 0,
|
];
|
|
foreach ($rows as $row) {
|
if ($row->state === 'completed') {
|
// Map outcome to frontend status
|
match($row->outcome) {
|
'failed' => $stats['failed'] += (int) $row->count,
|
'failed_permanent' => $stats['failed_permanent'] += (int) $row->count,
|
default => $stats['completed'] += (int) $row->count,
|
};
|
} else {
|
// Combine pending/scheduled for frontend
|
$key = $row->state === 'scheduled' ? 'pending' : $row->state;
|
if (isset($stats[$key])) {
|
$stats[$key] += (int) $row->count;
|
}
|
}
|
}
|
|
return $stats;
|
}
|
|
public function dismiss(string $id): bool
|
{
|
return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false;
|
}
|
|
/**
|
* Delete an operation from the queue
|
*/
|
public function delete(string $id): bool
|
{
|
$op = $this->find($id);
|
$result = $this->table->delete(['id' => $id]);
|
if ($result && $op) $this->invalidateUser($op->userId);
|
return $result !== false;
|
}
|
|
public function invalidateQueueCache(): void
|
{
|
$this->cache->forget(self::CACHE_QUEUE_INFO);
|
}
|
|
private function invalidateUser(int $userId): void
|
{
|
Cache::for($userId.'_queue')->flush();
|
}
|
|
public function withTransaction(callable $callback): mixed
|
{
|
$this->table->startTransaction();
|
try {
|
$result = $callback();
|
$this->table->commit();
|
return $result;
|
} catch (\Throwable $e) {
|
$this->table->rollback();
|
error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
|
throw $e;
|
}
|
}
|
|
public function resetStuckOperations(int $stuckMinutes = 30): int
|
{
|
return $this->withTransaction(function () use ($stuckMinutes) {
|
$cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
|
|
$stuckIds = $this->table->queryResults(
|
"SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE",
|
[$cutoff]
|
);
|
$stuckIds = array_column($stuckIds, 'id');
|
|
if (empty($stuckIds)) return 0;
|
|
$placeholders = implode(',', array_fill(0, count($stuckIds), '%s'));
|
return (int) $this->table->query(
|
"UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})",
|
array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds)
|
);
|
});
|
}
|
|
/**
|
* @throws \Throwable
|
*/
|
public function replaceDependency(string $fromId, string $toId): int
|
{
|
return $this->withTransaction(function () use ($fromId, $toId) {
|
return (int) $this->table->query(
|
"UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s",
|
['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%']
|
);
|
});
|
}
|
|
public function defineTables():void
|
{
|
$queue = CustomTable::for('_operation_queue');
|
$queue->setColumns([
|
'id' => 'VARCHAR(64) NOT NULL',
|
'type' => 'VARCHAR(50) NOT NULL',
|
'user_id' => $queue->getUserIDType().' NOT NULL',
|
|
'request_data' => 'JSON NOT NULL CHECK (JSON_VALID(request_data))',
|
|
'total_items' => 'INT(11) NOT NULL DEFAULT 1',
|
'processed_items' => 'INT(11) DEFAULT 0',
|
'failed_items' => 'JSON',
|
|
'priority' => 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'',
|
'state' => 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'',
|
'outcome' => 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'',
|
|
'retries' => 'INT(11) DEFAULT 0',
|
'last_error_hash'=> 'CHAR(32) DEFAULT NULL',
|
'error_message' => 'TEXT',
|
|
'scheduled_at' => 'DATETIME DEFAULT NULL',
|
'started_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
|
'completed_at' => 'DATETIME DEFAULT NULL',
|
|
'metadata' => 'JSON DEFAULT NULL',
|
'result' => 'JSON',
|
'dependencies' => 'JSON',
|
'merged_into' => 'VARCHAR(64) DEFAULT NULL',
|
|
'user_dismissed'=> 'tinyint(1) DEFAULT 0',
|
'created_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
|
'updated_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP',
|
]);
|
|
$queue->setKeys([
|
['key' => 'PRIMARY', 'value' => '(`id`)'],
|
'`idx_run_queue` (`state`, `priority`, `scheduled_at`)',
|
'`idx_user_ops` (`user_id`, `state`)',
|
'`idx_user_type_pending` (`user_id`, `type`, `state`)',
|
'`idx_completed_at` (`completed_at`)',
|
'`idx_processing_stuck` (`state`, `started_at`)'
|
]);
|
|
$queue->defineTable();
|
$this->table = $queue;
|
|
$stats = CustomTable::for('stats__operation_queue');
|
$stats->setColumns([
|
'id' => 'BIGINT unsigned AUTO_INCREMENT',
|
'date' => 'DATE NOT NULL',
|
'type' => 'VARCHAR(50) NOT NULL',
|
|
// Only store what can't be queried from the main table later
|
'peak_queue_size' => 'INT NOT NULL DEFAULT 0',
|
'peak_memory_bytes' => 'BIGINT DEFAULT NULL',
|
|
// Snapshot totals for post-purge historical view
|
'total_operations' => 'INT NOT NULL DEFAULT 0',
|
'successful_operations' => 'INT NOT NULL DEFAULT 0',
|
'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0',
|
'total_items_processed' => 'INT NOT NULL DEFAULT 0',
|
|
'created_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
|
'updated_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
|
]);
|
|
$stats->setKeys([
|
['key' => 'PRIMARY', 'value' => '(`id`)'],
|
['key' => 'UNIQUE', 'value' => '(`date`, `type`)'],
|
'`date_idx` (`date`)',
|
'`type_idx` (`type`)'
|
]);
|
$stats->defineTable();
|
|
$this->stats = $stats;
|
}
|
|
public function snapshotDaily(): void
|
{
|
$today = current_time('Y-m-d');
|
$types = $this->table->queryResults(
|
"SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s",
|
[$today]
|
);
|
|
foreach ($types as $row) {
|
$stats = $this->table->queryResults("
|
SELECT
|
COUNT(*) as total,
|
SUM(outcome = 'success') as successful,
|
SUM(outcome = 'failed_permanent') as failed_permanent,
|
SUM(processed_items) as items_processed
|
FROM {table}
|
WHERE type = %s AND DATE(completed_at) = %s
|
", [$row->type, $today]);
|
|
$s = $stats[0] ?? null;
|
if (!$s) continue;
|
|
$this->stats->table->query("
|
INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed)
|
VALUES (%s, %s, %d, %d, %d, %d)
|
ON DUPLICATE KEY UPDATE
|
total_operations = VALUES(total_operations),
|
successful_operations = VALUES(successful_operations),
|
failed_permanent_operations = VALUES(failed_permanent_operations),
|
total_items_processed = VALUES(total_items_processed),
|
updated_at = NOW()
|
", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]);
|
}
|
}
|
}
|