<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
use JVBase\managers\Cache;
|
use LogicException;
|
|
class Storage
|
{
|
private \wpdb $wpdb;
|
private string $table;
|
private Cache $cache;
|
|
private const CACHE_QUEUE_INFO = 'queue_info';
|
|
public function __construct()
|
{
|
global $wpdb;
|
$this->wpdb = $wpdb;
|
$this->table = $wpdb->prefix . BASE . '_operation_queue';
|
$this->cache = Cache::for('queue', DAY_IN_SECONDS);
|
}
|
|
public function hasProcessingOperations(): bool
|
{
|
return (bool) $this->wpdb->get_var(
|
"SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1"
|
);
|
}
|
|
public function fetchRunnable(int $limit = 10): array
|
{
|
$now = current_time('mysql');
|
|
$rows = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT oq.* FROM {$this->table} oq
|
WHERE oq.state IN ('pending', 'scheduled')
|
AND oq.scheduled_at <= %s
|
AND NOT EXISTS (
|
SELECT 1
|
FROM JSON_TABLE(
|
COALESCE(NULLIF(oq.dependencies, 'null'), '[]'),
|
'\$[*]' COLUMNS (dep_id VARCHAR(64) PATH '\$')
|
) AS deps
|
JOIN {$this->table} dep ON dep.id = deps.dep_id
|
WHERE dep.state != 'completed'
|
OR dep.outcome NOT IN ('success', 'partial')
|
)
|
ORDER BY FIELD(oq.priority, 'high', 'normal', 'low'), oq.scheduled_at
|
LIMIT %d
|
FOR UPDATE SKIP LOCKED
|
", $now, $limit));
|
|
return array_map([$this, 'rowToOperation'], $rows ?: []);
|
}
|
|
|
|
|
|
public function markProcessing(string $id): bool
|
{
|
$now = current_time('mysql');
|
|
$affected = $this->wpdb->query($this->wpdb->prepare("
|
UPDATE {$this->table}
|
SET state = 'processing', started_at = %s, updated_at = %s
|
WHERE id = %s AND state IN ('pending', 'scheduled')
|
", $now, $now, $id));
|
|
if ($affected > 0) {
|
$op = $this->find($id);
|
if ($op) {
|
$this->invalidateUser($op->userId);
|
}
|
}
|
|
return $affected > 0;
|
}
|
|
public function save(Operation $op): bool
|
{
|
$data = [
|
'request_data' => json_encode($op->requestData),
|
'total_items' => $op->totalItems,
|
'processed_items' => $op->processedItems,
|
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
|
'priority' => $op->priority,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'retries' => $op->retries,
|
'last_error_hash' => $op->lastErrorHash,
|
'error_message' => $op->errorMessage,
|
'scheduled_at' => $op->scheduledAt,
|
'started_at' => $op->startedAt,
|
'completed_at' => $op->completedAt,
|
'metadata' => json_encode($op->metadata),
|
'result' => $op->result ? json_encode($op->result) : null,
|
'dependencies' => json_encode($op->dependencies),
|
'user_dismissed' => $op->userDismissed ? 1 : 0,
|
'updated_at' => current_time('mysql'),
|
];
|
|
$result = $this->wpdb->update($this->table, $data, ['id' => $op->id]);
|
|
if ($result !== false) {
|
$this->invalidateUser($op->userId);
|
}
|
|
return $result !== false;
|
}
|
|
/**
|
* For saving a 'step' in an operation; usually after each chunk
|
* @param Operation $op
|
* @return bool
|
*/
|
public function saveProgress(Operation $op): bool {
|
global $wpdb;
|
|
$table = $this->table;
|
|
$data = [
|
'processed_items' => $op->processedItems ?? 0,
|
'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null,
|
'metadata' => ($op->metadata) ? json_encode($op->metadata) : null,
|
'result' => ($op->result) ? json_encode($op->result) :null,
|
'updated_at' => current_time('mysql'),
|
];
|
|
// IMPORTANT: never touch terminal state
|
$where = [
|
'id' => $op->id,
|
'state' => 'processing',
|
];
|
|
$updated = $wpdb->update($table, $data, $where);
|
|
if ($updated === false) {
|
error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error);
|
return false;
|
}
|
|
return true;
|
}
|
|
/**
|
* The operation is complete, let's progress to 'completed'
|
* @param Operation $op
|
* @return bool
|
*/
|
public function saveFinal(Operation $op): bool {
|
global $wpdb;
|
|
if (($op->state?? null) !== 'completed') {
|
throw new LogicException('saveFinal called without completed state');
|
}
|
|
$table = $this->table;
|
|
$data = [
|
'state' => 'completed',
|
'outcome' => $op->outcome?? 'success',
|
'processed_items'=> $op->processedItems ?? 0,
|
'failed_items' => $op->failedItems ?? null,
|
'result' => isset($op->result) ? wp_json_encode($op->result) : null,
|
'completed_at' => $op->completedAt ?? current_time('mysql'),
|
'updated_at' => current_time('mysql'),
|
];
|
|
// HARD GUARD: cannot overwrite completed
|
$where = [
|
'id' => $op->id,
|
'state' => 'processing',
|
];
|
|
$updated = $wpdb->update($table, $data, $where);
|
$this->invalidateQueueCache();
|
|
if ($updated === 0) {
|
return true;
|
}
|
|
if ($updated === false) {
|
error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error);
|
return false;
|
}
|
|
return true;
|
}
|
|
public function insert(Operation $op): bool
|
{
|
$result = $this->wpdb->insert($this->table, [
|
'id' => $op->id,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'request_data' => json_encode($op->requestData),
|
'total_items' => $op->totalItems,
|
'processed_items' => $op->processedItems,
|
'failed_items' => null,
|
'priority' => $op->priority,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'retries' => 0,
|
'last_error_hash' => null,
|
'error_message' => null,
|
'scheduled_at' => $op->scheduledAt ?? current_time('mysql'),
|
'started_at' => null,
|
'completed_at' => null,
|
'metadata' => json_encode($op->metadata),
|
'result' => null,
|
'dependencies' => json_encode($op->dependencies),
|
'user_dismissed' => 0,
|
'created_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql'),
|
]);
|
|
if ($result) {
|
$this->invalidateUser($op->userId);
|
}
|
|
return $result !== false;
|
}
|
|
public function find(string $id): ?Operation
|
{
|
$row = $this->wpdb->get_row($this->wpdb->prepare(
|
"SELECT * FROM {$this->table} WHERE id = %s",
|
$id
|
));
|
|
return $row ? $this->rowToOperation($row) : null;
|
}
|
|
public function findMergeable(string $type, int $userId): ?Operation
|
{
|
$row = $this->wpdb->get_row($this->wpdb->prepare(
|
"SELECT * FROM {$this->table}
|
WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')
|
ORDER BY created_at DESC LIMIT 1",
|
$type, $userId
|
));
|
|
return $row ? $this->rowToOperation($row) : null;
|
}
|
|
public function getUserOperations(int $userId, array $filters = []): array
|
{
|
$where = ['user_id = %d'];
|
$params = [$userId];
|
|
if (!empty($filters['state'])) {
|
$where[] = 'state = %s';
|
$params[] = $filters['state'];
|
}
|
if (!empty($filters['type'])) {
|
$where[] = 'type = %s';
|
$params[] = $filters['type'];
|
}
|
if (!empty($filters['outcome'])) {
|
$outcomes = (array) $filters['outcome'];
|
$placeholders = implode(',', array_fill(0, count($outcomes), '%s'));
|
$where[] = "outcome IN ($placeholders)";
|
$params = array_merge($params, $outcomes);
|
}
|
if (!empty($filters['not_dismissed'])) {
|
$where[] = 'user_dismissed = 0';
|
}
|
if (!empty($filters['active'])) {
|
$where[] = "state IN ('pending', 'scheduled', 'processing')";
|
}
|
if (!empty($filters['has_errors'])) {
|
$where[] = "(error_message IS NOT NULL OR failed_items IS NOT NULL)";
|
}
|
if (!empty($filters['ids'])) {
|
$ids = (array) $filters['ids'];
|
$placeholders = implode(',', array_fill(0, count($ids), '%s'));
|
$where[] = "id IN ($placeholders)";
|
$params = array_merge($params, $ids);
|
}
|
|
// Order by state priority, then created_at
|
$orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC";
|
|
$limit = $filters['limit'] ?? 50;
|
$params[] = $limit;
|
|
$rows = $this->wpdb->get_results($this->wpdb->prepare(
|
"SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) .
|
" ORDER BY {$orderBy} LIMIT %d",
|
...$params
|
));
|
|
return array_map([$this, 'rowToOperation'], $rows ?: []);
|
}
|
|
public function getQueueInfo(): array
|
{
|
$cached = $this->cache->get(self::CACHE_QUEUE_INFO);
|
if ($cached !== false) {
|
return $cached;
|
}
|
|
$now = current_time('mysql');
|
$row = $this->wpdb->get_row($this->wpdb->prepare("
|
SELECT
|
COUNT(*) as total,
|
SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active,
|
SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
|
FROM {$this->table}
|
WHERE state IN ('pending', 'processing', 'scheduled')
|
", $now));
|
|
$info = [
|
'total' => (int) ($row->total ?? 0),
|
'active' => (int) ($row->active ?? 0),
|
'ready' => (int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0),
|
'has_items' => ((int) ($row->active ?? 0) + (int) ($row->ready_scheduled ?? 0)) > 0,
|
];
|
|
$this->cache->set(self::CACHE_QUEUE_INFO, $info, 30);
|
return $info;
|
}
|
|
private function rowToOperation(object $row): Operation
|
{
|
$op = new Operation();
|
$op->id = $row->id;
|
$op->type = $row->type;
|
$op->userId = (int) $row->user_id;
|
$op->requestData = json_decode($row->request_data ?? '{}', true) ?: [];
|
$op->metadata = json_decode($row->metadata ?? '{}', true) ?: [];
|
$op->dependencies = json_decode($row->dependencies ?? '[]', true) ?: [];
|
$op->totalItems = (int) $row->total_items;
|
$op->processedItems = (int) $row->processed_items;
|
$op->failedItems = $row->failed_items ? json_decode($row->failed_items, true) : null;
|
$op->priority = $row->priority;
|
$op->state = $row->state;
|
$op->outcome = $row->outcome;
|
$op->retries = (int) $row->retries;
|
$op->lastErrorHash = $row->last_error_hash;
|
$op->errorMessage = $row->error_message;
|
$op->scheduledAt = $row->scheduled_at;
|
$op->startedAt = $row->started_at;
|
$op->completedAt = $row->completed_at;
|
$op->result = $row->result ? json_decode($row->result, true) : null;
|
$op->userDismissed = (bool) $row->user_dismissed;
|
|
return $op;
|
}
|
|
public function getQueueStatus(): array
|
{
|
$now = current_time('mysql');
|
|
$rows = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT
|
state,
|
COUNT(*) as count,
|
SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready
|
FROM {$this->table}
|
GROUP BY state
|
", $now), OBJECT_K);
|
|
return [
|
'pending' => (int) ($rows['pending']->count ?? 0),
|
'scheduled' => (int) ($rows['scheduled']->count ?? 0),
|
'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0),
|
'processing' => (int) ($rows['processing']->count ?? 0),
|
'completed' => (int) ($rows['completed']->count ?? 0),
|
];
|
}
|
|
public function getUserStats(int $userId): array
|
{
|
$rows = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT state, outcome, COUNT(*) as count
|
FROM {$this->table}
|
WHERE user_id = %d
|
GROUP BY state, outcome
|
", $userId));
|
|
$stats = [
|
'pending' => 0,
|
'scheduled' => 0,
|
'processing' => 0,
|
'completed' => 0,
|
'failed' => 0,
|
'failed_permanent' => 0,
|
];
|
|
foreach ($rows as $row) {
|
if ($row->state === 'completed') {
|
// Map outcome to frontend status
|
match($row->outcome) {
|
'failed' => $stats['failed'] += (int) $row->count,
|
'failed_permanent' => $stats['failed_permanent'] += (int) $row->count,
|
default => $stats['completed'] += (int) $row->count,
|
};
|
} else {
|
// Combine pending/scheduled for frontend
|
$key = $row->state === 'scheduled' ? 'pending' : $row->state;
|
if (isset($stats[$key])) {
|
$stats[$key] += (int) $row->count;
|
}
|
}
|
}
|
|
return $stats;
|
}
|
|
public function dismiss(string $id): bool
|
{
|
$result = $this->wpdb->update(
|
$this->table,
|
['user_dismissed' => 1, 'updated_at' => current_time('mysql')],
|
['id' => $id]
|
);
|
|
return $result !== false;
|
}
|
|
/**
|
* Delete an operation from the queue
|
*/
|
public function delete(string $id): bool
|
{
|
$op = $this->find($id);
|
$userId = $op?->userId;
|
|
$result = $this->wpdb->delete($this->table, ['id' => $id]);
|
|
if ($result && $userId) {
|
$this->invalidateUser($userId);
|
}
|
|
return $result !== false;
|
}
|
|
public function invalidateQueueCache(): void
|
{
|
$this->cache->forget(self::CACHE_QUEUE_INFO);
|
}
|
|
private function invalidateUser(int $userId): void
|
{
|
$this->cache->forget($userId);
|
}
|
public function getLastError(): string
|
{
|
return $this->wpdb->last_error;
|
}
|
|
public function withTransaction(callable $callback): mixed
|
{
|
$this->wpdb->query('START TRANSACTION');
|
|
try {
|
$result = $callback();
|
$this->wpdb->query('COMMIT');
|
return $result;
|
} catch (\Throwable $e) {
|
$this->wpdb->query('ROLLBACK');
|
error_log('[Storage] Transaction rolled back: ' . $e->getMessage());
|
throw $e;
|
}
|
}
|
|
public function resetStuckOperations(int $stuckMinutes = 30): int
|
{
|
return $this->withTransaction(function () use ($stuckMinutes) {
|
$cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes"));
|
|
// Get IDs first for logging
|
$stuckIds = $this->wpdb->get_col($this->wpdb->prepare("
|
SELECT id FROM {$this->table}
|
WHERE state = 'processing'
|
AND started_at < %s
|
FOR UPDATE
|
", $cutoff));
|
|
if (empty($stuckIds)) {
|
return 0;
|
}
|
|
// Reset them
|
$affected = $this->wpdb->query($this->wpdb->prepare("
|
UPDATE {$this->table}
|
SET state = 'scheduled',
|
scheduled_at = %s,
|
retries = retries + 1,
|
updated_at = %s
|
WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ")
|
",
|
date('Y-m-d H:i:s', time() + 60),
|
current_time('mysql'),
|
...$stuckIds
|
));
|
|
// Optional: Log to audit table
|
// $this->logStuckReset($stuckIds);
|
|
return (int) $affected;
|
});
|
}
|
}
|