<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
use WP_Error;
|
|
class Queue
|
{
|
private Storage $storage;
|
private Processor $processor;
|
private TypeRegistry $registry;
|
private Locker $locker;
|
|
public function __construct()
|
{
|
$this->storage = new Storage();
|
$this->registry = new TypeRegistry();
|
$this->locker = new Locker();
|
|
$executor = new FilteredExecutor();
|
$this->processor = new Processor($this->storage, $executor, $this->registry);
|
|
add_action('jvb_process_queue', [$this, 'checkQueue']);
|
add_action('jvb_queue_maintenance', [$this, 'maintenance']);
|
|
if (!wp_next_scheduled('jvb_process_queue')) {
|
wp_schedule_event(time(), 'every-minute', 'jvb_process_queue');
|
}
|
if (!wp_next_scheduled('jvb_queue_maintenance')) {
|
wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
|
}
|
}
|
|
/**
|
* Access type registry for registering operation configs
|
*/
|
public function registry(): TypeRegistry
|
{
|
return $this->registry;
|
}
|
|
public function storage(): Storage
|
{
|
return $this->storage;
|
}
|
|
/**
|
* Queue a new operation or merge into existing
|
*
|
* @param string $type Operation type
|
* @param int $userId User ID
|
* @param array $data Request data
|
* @param array $options {
|
* @type string $priority 'low', 'normal', 'high'
|
* @type int $delay Seconds to delay processing
|
* @type string $scheduled Specific datetime to process
|
* @type array|string $depends_on Operation IDs this depends on
|
* @type string|array $chunk_key Key(s) to chunk (overrides registry)
|
* @type int $chunk_size Chunk size (overrides registry)
|
* }
|
*/
|
public function add(string $type, int $userId, array $data, array $options = []): array|WP_Error
|
{
|
try {
|
$incoming = $this->buildOperation($type, $userId, $data, $options);
|
|
$this->storage->insert($incoming);
|
$this->runQueueOnShutdown();
|
|
return [
|
'success' => true,
|
'operation_id' => $incoming->id,
|
'updated_existing' => false,
|
];
|
|
} catch (\Exception $e) {
|
JVB()->error()->log('queue', $e->getMessage(), $data, 'high');
|
return new WP_Error('queue_failed', $e->getMessage());
|
}
|
}
|
|
/**
|
* Alias for add() - backwards compatibility
|
*/
|
public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
|
{
|
return $this->add($type, $userId, $data, $options);
|
}
|
|
public function checkQueue(): void
|
{
|
$this->locker->withLock(function () {
|
$this->processor->run();
|
});
|
|
}
|
|
public function maintenance(): void
|
{
|
$this->locker->withLock(function () {
|
$this->storage->resetStuckOperations(30);
|
});
|
|
}
|
|
// === Public Getters ===
|
|
public function get(string $id): ?Operation
|
{
|
return $this->storage->find($id);
|
}
|
|
/**
|
* Alias for get() - backwards compatibility
|
*/
|
public function getOperation(string $id): ?Operation
|
{
|
return $this->get($id);
|
}
|
|
/**
|
* Get a specific value from an operation - backwards compatibility
|
*/
|
public function getOperationValue(string $id, string $column, bool $decodeJson = true): mixed
|
{
|
$op = $this->get($id);
|
if (!$op) {
|
return null;
|
}
|
|
return match($column) {
|
'result' => $op->result,
|
'request_data' => $op->requestData,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'metadata' => $op->metadata,
|
'dependencies' => $op->dependencies,
|
default => null,
|
};
|
}
|
|
public function getUserOperations(int $userId, array $filters = []): array
|
{
|
return $this->storage->getUserOperations($userId, $filters);
|
}
|
|
public function getStatus(): array
|
{
|
return $this->storage->getQueueStatus();
|
}
|
|
public function getUserStats(int $userId): array
|
{
|
return $this->storage->getUserStats($userId);
|
}
|
|
public function getInfo(): array
|
{
|
return $this->storage->getQueueInfo();
|
}
|
|
public function dismiss(string $id): bool
|
{
|
return $this->storage->dismiss($id);
|
}
|
|
/**
|
* Cancel a pending/scheduled operation (deletes it)
|
*
|
* @param string $id Operation ID
|
* @param int $userId User ID (for ownership verification)
|
* @return bool True if cancelled
|
*/
|
public function cancel(string $id, int $userId): bool
|
{
|
$op = $this->get($id);
|
if (!$op || $op->userId !== $userId) {
|
return false;
|
}
|
|
// Can only cancel pending or scheduled operations
|
if (!in_array($op->state, ['pending', 'scheduled'])) {
|
return false;
|
}
|
|
return $this->storage->delete($id);
|
}
|
|
/**
|
* Retry a failed operation
|
*
|
* @param string $id Operation ID
|
* @param int $userId User ID (for ownership verification)
|
* @return bool True if reset for retry
|
*/
|
public function retry(string $id, int $userId): bool
|
{
|
|
$op = $this->get($id);
|
if (!$op || $op->userId !== $userId) {
|
return false;
|
}
|
|
// Can only retry completed operations with failed outcomes
|
if ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent'])) {
|
return false;
|
}
|
|
$op->state = 'pending';
|
$op->outcome = 'pending';
|
$op->errorMessage = null;
|
$op->lastErrorHash = null;
|
$op->scheduledAt = current_time('mysql');
|
$op->retries++;
|
error_log('[Queue]Retrying operation '.print_r($op->id, true));
|
$saved = $this->storage->save($op);
|
|
if ($saved) {
|
$this->runQueueOnShutdown();
|
}
|
|
return $saved;
|
}
|
|
/**
|
* Update an operation's data or metadata
|
*
|
* @param string $id Operation ID
|
* @param array $updates Fields to update (requestData, metadata, etc.)
|
* @param int|null $userId Optional user ID for ownership verification
|
* @return bool True if updated
|
*/
|
public function update(string $id, array $updates, ?int $userId = null): bool
|
{
|
$op = $this->get($id);
|
if (!$op) {
|
return false;
|
}
|
|
if ($userId !== null && $op->userId !== $userId) {
|
return false;
|
}
|
|
// Apply allowed updates
|
foreach ($updates as $field => $value) {
|
match($field) {
|
'requestData' => $op->requestData = $value,
|
'metadata' => $op->metadata = array_merge($op->metadata, $value),
|
'priority' => $op->priority = $value,
|
'state' => $op->state = $value,
|
'outcome' => $op->outcome = $value,
|
'result' => $op->result = $value,
|
default => null,
|
};
|
}
|
error_log('[Queue]: updating operation '.print_r($op->id, true));
|
return $this->storage->saveProgress($op);
|
}
|
// === Private Helpers ===
|
|
private function buildOperation(string $type, int $userId, array $data, array $options): Operation
|
{
|
$op = new Operation();
|
|
// Use provided operation_id or generate one
|
$op->id = !empty($options['operation_id'])
|
? $options['operation_id']
|
: 'u' . $userId . '_' . uniqid('op_');
|
|
$op->type = $type;
|
$op->userId = $userId;
|
$op->requestData = $data;
|
$op->priority = $options['priority'] ?? 'normal';
|
$op->state = !empty($options['delay']) || !empty($options['scheduled']) ? 'scheduled' : 'pending';
|
$op->scheduledAt = $this->calculateScheduledTime($options);
|
|
// Chunk config: explicit options override registry
|
$chunkConfig = null;
|
if (!empty($options['chunk_key'])) {
|
$chunkConfig = [
|
'key' => $options['chunk_key'],
|
'size' => $options['chunk_size'] ?? 10,
|
];
|
} else {
|
$chunkConfig = $this->registry->getChunkConfig($type);
|
}
|
|
if ($chunkConfig) {
|
$op->metadata['chunk_key'] = $chunkConfig['key'];
|
$op->metadata['chunk_size'] = $chunkConfig['size'];
|
$op->totalItems = $this->countItems($data, $chunkConfig['key']);
|
}
|
|
// Dependencies
|
if (!empty($options['depends_on'])) {
|
$op->dependencies = is_string($options['depends_on'])
|
? explode(',', $options['depends_on'])
|
: $options['depends_on'];
|
}
|
|
return $op;
|
}
|
|
private function countItems(array $data, string|array $keys): int
|
{
|
$keys = (array) $keys;
|
$total = 0;
|
foreach ($keys as $key) {
|
if (isset($data[$key]) && is_array($data[$key])) {
|
$total += count($data[$key]);
|
}
|
}
|
return max(1, $total);
|
}
|
|
private function calculateScheduledTime(array $options): string
|
{
|
if (!empty($options['delay'])) {
|
return date('Y-m-d H:i:s', current_time('timestamp') + (int)$options['delay']);
|
}
|
if (!empty($options['scheduled'])) {
|
return $options['scheduled'];
|
}
|
return current_time('mysql');
|
}
|
|
private function runQueueOnShutdown(): void
|
{
|
if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) {
|
add_action('shutdown', [$this, 'processQueueOnShutdown'], 100);
|
}
|
}
|
|
public function processQueueOnShutdown(): void
|
{
|
remove_action('shutdown', [$this, 'processQueueOnShutdown']);
|
|
if (function_exists('fastcgi_finish_request')) {
|
fastcgi_finish_request();
|
}
|
|
$this->checkQueue();
|
}
|
}
|