<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
use JVBase\managers\CustomTable;
|
use WP_Error;
|
use WP_REST_Request;
|
use WP_REST_Response;
|
|
class Queue
|
{
|
private Storage $storage;
|
private Processor $processor;
|
private TypeRegistry $registry;
|
public Executor $executor;
|
private Locker $locker;
|
|
public function __construct()
|
{
|
$this->storage = new Storage();
|
$this->registry = new TypeRegistry();
|
$this->locker = new Locker();
|
|
$this->executor = new FilteredExecutor();
|
$this->processor = new Processor($this->storage, $this->executor, $this->registry);
|
|
add_action('jvb_process_queue', [$this, 'checkQueue']);
|
add_action('jvb_queue_maintenance', [$this, 'maintenance']);
|
add_action('jvb_daily_snapshot', [$this->storage, 'snapshotDaily']);
|
|
if (!wp_next_scheduled('jvb_process_queue')) {
|
wp_schedule_event(time(), 'every-minute', 'jvb_process_queue');
|
}
|
if (!wp_next_scheduled('jvb_queue_maintenance')) {
|
wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
|
}
|
if (!wp_next_scheduled('jvb_daily_snapshot')) {
|
// Schedule for next 3am
|
$next3am = strtotime('tomorrow 3am', current_time('timestamp'));
|
wp_schedule_event($next3am, 'daily', 'jvb_daily_snapshot');
|
}
|
|
jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
|
add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
|
}
|
|
/**
|
* Access type registry for registering operation configs
|
*/
|
public function registry(): TypeRegistry
|
{
|
return $this->registry;
|
}
|
|
public function storage(): Storage
|
{
|
return $this->storage;
|
}
|
|
/**
|
* Queue a new operation or merge into existing
|
*
|
* @param string $type Operation type
|
* @param int $userId User ID
|
* @param array $data Request data
|
* @param array $options {
|
* @type string $priority 'low', 'normal', 'high'
|
* @type int $delay Seconds to delay processing
|
* @type string $scheduled Specific datetime to process
|
* @type array|string $depends_on Operation IDs this depends on
|
* @type string|array $chunk_key Key(s) to chunk (overrides registry)
|
* @type int $chunk_size Chunk size (overrides registry)
|
* }
|
*/
|
public function add(string $type, int $userId, array $data, array $options = []): array|WP_Error
|
{
|
try {
|
$incoming = $this->buildOperation($type, $userId, $data, $options);
|
|
// Attempt pre-insert merge
|
$merged = $this->tryMerge($incoming);
|
if ($merged) {
|
$this->runQueueOnShutdown();
|
return $merged;
|
}
|
|
$this->storage->insert($incoming);
|
$this->runQueueOnShutdown();
|
|
return [
|
'success' => true,
|
'operation_id' => $incoming->id,
|
'updated_existing' => false,
|
];
|
|
} catch (\Exception $e) {
|
JVB()->error()->log('queue', $e->getMessage(), $data, 'high');
|
return new WP_Error('queue_failed', $e->getMessage());
|
}
|
}
|
|
/**
|
* Try to merge incoming operation into an existing pending/scheduled one.
|
* Returns result array if merged, null if not.
|
* @throws \Throwable
|
*/
|
private function tryMerge(Operation $incoming): ?array
|
{
|
$mergeable = $this->registry->getMergeable($incoming->type);
|
if (!$mergeable) {
|
return null;
|
}
|
|
$criteria = $mergeable->matchCriteria($incoming);
|
|
$existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
|
if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
|
return null;
|
}
|
|
$this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
|
$mergeable->merge($existing, $incoming);
|
|
$this->storage->replaceDependency(
|
$incoming->id,
|
$existing->id
|
);
|
|
// Merge dependency arrays safely
|
$existing->dependencies = array_values(array_unique(
|
array_merge($existing->dependencies, $incoming->dependencies)
|
));
|
|
// Prevent self dependency
|
$existing->dependencies = array_diff(
|
$existing->dependencies,
|
[$existing->id]
|
);
|
|
$this->storage->save($existing);
|
|
$incoming->state = 'completed';
|
$incoming->outcome = 'merged';
|
$incoming->merged_into = $existing->id;
|
$this->storage->saveFinal($incoming);
|
});
|
|
return [
|
'success' => true,
|
'operation_id' => $existing->id,
|
'updated_existing' => true,
|
];
|
}
|
|
/**
|
* Alias for add() - backwards compatibility
|
*/
|
public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
|
{
|
return $this->add($type, $userId, $data, $options);
|
}
|
|
public function checkQueue(): void
|
{
|
$this->locker->withLock(function () {
|
$this->processor->run();
|
});
|
|
}
|
|
public function maintenance(): void
|
{
|
$this->locker->withLock(function () {
|
$this->storage->resetStuckOperations(30);
|
});
|
$this->runQueueOnShutdown();
|
}
|
|
// === Public Getters ===
|
|
public function get(string $id): ?Operation
|
{
|
return $this->storage->find($id);
|
}
|
|
/**
|
* Alias for get() - backwards compatibility
|
*/
|
public function getOperation(string $id): ?Operation
|
{
|
return $this->get($id);
|
}
|
|
/**
|
* Get a specific value from an operation - backwards compatibility
|
*/
|
public function getOperationValue(string $id, string $column, bool $decodeJson = true): mixed
|
{
|
$op = $this->get($id);
|
if (!$op) {
|
return null;
|
}
|
|
return match($column) {
|
'result' => $op->result,
|
'request_data' => $op->requestData,
|
'state' => $op->state,
|
'outcome' => $op->outcome,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'metadata' => $op->metadata,
|
'dependencies' => $op->dependencies,
|
'merged_into' => $op->merged_into,
|
default => null,
|
};
|
}
|
|
public function getUserOperations(int $userId, array $filters = []): array
|
{
|
return $this->storage->getUserOperations($userId, $filters);
|
}
|
|
public function getStatus(): array
|
{
|
return $this->storage->getQueueStatus();
|
}
|
|
public function getUserStats(int $userId): array
|
{
|
return $this->storage->getUserStats($userId);
|
}
|
|
public function getInfo(): array
|
{
|
return $this->storage->getQueueInfo();
|
}
|
|
public function dismiss(string $id): bool
|
{
|
return $this->storage->dismiss($id);
|
}
|
|
/**
|
* Cancel a pending/scheduled operation (deletes it)
|
*
|
* @param string $id Operation ID
|
* @param int $userId User ID (for ownership verification)
|
* @return bool True if cancelled
|
*/
|
public function cancel(string $id, int $userId): bool
|
{
|
$op = $this->get($id);
|
if (!$op || $op->userId !== $userId) {
|
return false;
|
}
|
|
// Can only cancel pending or scheduled operations
|
if (!in_array($op->state, ['pending', 'scheduled'])) {
|
return false;
|
}
|
|
return $this->storage->delete($id);
|
}
|
|
/**
|
* Retry a failed operation
|
*
|
* @param string $id Operation ID
|
* @param int $userId User ID (for ownership verification)
|
* @return bool True if reset for retry
|
*/
|
public function retry(string $id, int $userId): bool
|
{
|
|
$op = $this->get($id);
|
if (!$op || $op->userId !== $userId) {
|
return false;
|
}
|
|
// Can only retry completed operations with failed outcomes
|
if ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent'])) {
|
return false;
|
}
|
|
$op->state = 'pending';
|
$op->outcome = 'pending';
|
$op->errorMessage = null;
|
$op->lastErrorHash = null;
|
$op->scheduledAt = current_time('mysql');
|
$op->retries++;
|
error_log('[Queue]Retrying operation '.print_r($op->id, true));
|
$saved = $this->storage->save($op);
|
|
if ($saved) {
|
$this->runQueueOnShutdown();
|
}
|
|
return $saved;
|
}
|
|
/**
|
* Update an operation's data or metadata
|
*
|
* @param string $id Operation ID
|
* @param array $updates Fields to update (requestData, metadata, etc.)
|
* @param int|null $userId Optional user ID for ownership verification
|
* @return bool True if updated
|
*/
|
public function update(string $id, array $updates, ?int $userId = null): bool
|
{
|
$op = $this->get($id);
|
if (!$op) {
|
return false;
|
}
|
|
if ($userId !== null && $op->userId !== $userId) {
|
return false;
|
}
|
|
// Apply allowed updates
|
foreach ($updates as $field => $value) {
|
match($field) {
|
'requestData' => $op->requestData = $value,
|
'metadata' => $op->metadata = array_merge($op->metadata, $value),
|
'priority' => $op->priority = $value,
|
'state' => $op->state = $value,
|
'outcome' => $op->outcome = $value,
|
'result' => $op->result = $value,
|
default => null,
|
};
|
}
|
error_log('[Queue]: updating operation '.print_r($op->id, true));
|
return $this->storage->saveProgress($op);
|
}
|
// === Private Helpers ===
|
|
private function buildOperation(string $type, int $userId, array $data, array $options): Operation
|
{
|
$op = new Operation();
|
|
// Use provided operation_id or generate one
|
$op->id = !empty($options['operation_id'])
|
? $options['operation_id']
|
: 'u' . $userId . '_' . uniqid('op_');
|
|
$op->type = $type;
|
$op->userId = $userId;
|
$op->requestData = $data;
|
$op->priority = $options['priority'] ?? 'normal';
|
$op->state = !empty($options['delay']) || !empty($options['scheduled']) ? 'scheduled' : 'pending';
|
$op->scheduledAt = $this->calculateScheduledTime($options);
|
|
// Chunk config: explicit options override registry
|
$chunkConfig = null;
|
if (!empty($options['chunk_key'])) {
|
$chunkConfig = [
|
'key' => $options['chunk_key'],
|
'size' => $options['chunk_size'] ?? 10,
|
];
|
} else {
|
$chunkConfig = $this->registry->getChunkConfig($type);
|
}
|
|
if ($chunkConfig) {
|
$op->metadata['chunk_key'] = $chunkConfig['key'];
|
$op->metadata['chunk_size'] = $chunkConfig['size'];
|
$op->totalItems = $this->countItems($data, $chunkConfig['key']);
|
}
|
|
// Dependencies
|
if (!empty($options['depends_on'])) {
|
$op->dependencies = is_string($options['depends_on'])
|
? explode(',', $options['depends_on'])
|
: $options['depends_on'];
|
}
|
|
return $op;
|
}
|
|
private function countItems(array $data, string|array $keys): int
|
{
|
$keys = (array) $keys;
|
$total = 0;
|
foreach ($keys as $key) {
|
if (isset($data[$key]) && is_array($data[$key])) {
|
$total += count($data[$key]);
|
}
|
}
|
return max(1, $total);
|
}
|
|
private function calculateScheduledTime(array $options): string
|
{
|
if (!empty($options['delay'])) {
|
return date('Y-m-d H:i:s', current_time('timestamp') + (int)$options['delay']);
|
}
|
if (!empty($options['scheduled'])) {
|
return $options['scheduled'];
|
}
|
return current_time('mysql');
|
}
|
|
private function runQueueOnShutdown(): void
|
{
|
if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) {
|
add_action('shutdown', [$this, 'processQueueOnShutdown'], 100);
|
}
|
}
|
|
public function processQueueOnShutdown(): void
|
{
|
remove_action('shutdown', [$this, 'processQueueOnShutdown']);
|
|
if (function_exists('fastcgi_finish_request')) {
|
fastcgi_finish_request();
|
}
|
|
$this->checkQueue();
|
}
|
|
public function registerAdminAction():void
|
{
|
$admin = JVB()->admin();
|
$admin->registerAction(
|
'Restart Stuck Operations',
|
'restart-stuck-operations',
|
'manage_options',
|
'arrows-clockwise'
|
);
|
$admin->registerAction(
|
'Unlock Queue',
|
'unlock-operation-queue',
|
'manage_options',
|
'infinity'
|
);
|
}
|
/**
|
* @param WP_REST_Response $response
|
* @param string $action
|
*
|
* @return bool|WP_REST_Response
|
*/
|
public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
|
{
|
switch ($action) {
|
case 'unlock-operation-queue':
|
error_log('Unlocking Queue');
|
$this->locker->unlock();
|
return new WP_REST_Response([
|
'success' => true,
|
'message' => 'Unlocked Queue'
|
]);
|
case 'restart-stuck-operations':
|
error_log('Restarting stuck operations');
|
$this->maintenance();
|
return new WP_REST_Response([
|
'success' => true,
|
'message' => 'Restarted Stuck Operations'
|
]);
|
default:
|
return $response;
|
}
|
}
|
}
|