From d7e7d248cbe41cd7a9ef9c2fb022b6c4831f99a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Sun, 31 May 2026 15:22:56 +0000
Subject: [PATCH] =jakevan complete
---
inc/managers/queue/Queue.php | 224 +++++++++++++++++++++++++++++++++----------------------
1 files changed, 135 insertions(+), 89 deletions(-)
diff --git a/inc/managers/queue/Queue.php b/inc/managers/queue/Queue.php
index ce213f2..89a8ceb 100644
--- a/inc/managers/queue/Queue.php
+++ b/inc/managers/queue/Queue.php
@@ -4,26 +4,31 @@
exit;
}
+use JVBase\managers\CustomTable;
use WP_Error;
+use WP_REST_Request;
+use WP_REST_Response;
class Queue
{
private Storage $storage;
private Processor $processor;
private TypeRegistry $registry;
+ public Executor $executor;
private Locker $locker;
public function __construct()
{
$this->storage = new Storage();
$this->registry = new TypeRegistry();
- $this->locker = new Locker('queue_processor', 300);
+ $this->locker = new Locker();
- $executor = new FilteredExecutor($this->storage);
- $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker);
+ $this->executor = new FilteredExecutor();
+ $this->processor = new Processor($this->storage, $this->executor, $this->registry);
add_action('jvb_process_queue', [$this, 'checkQueue']);
add_action('jvb_queue_maintenance', [$this, 'maintenance']);
+ add_action('jvb_daily_snapshot', [$this->storage, 'snapshotDaily']);
if (!wp_next_scheduled('jvb_process_queue')) {
wp_schedule_event(time(), 'every-minute', 'jvb_process_queue');
@@ -31,6 +36,14 @@
if (!wp_next_scheduled('jvb_queue_maintenance')) {
wp_schedule_event(time(), 'hourly', 'jvb_queue_maintenance');
}
+ if (!wp_next_scheduled('jvb_daily_snapshot')) {
+ // Schedule for next 3am
+ $next3am = strtotime('tomorrow 3am', current_time('timestamp'));
+ wp_schedule_event($next3am, 'daily', 'jvb_daily_snapshot');
+ }
+
+ jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
+ add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
}
/**
@@ -41,6 +54,11 @@
return $this->registry;
}
+ public function storage(): Storage
+ {
+ return $this->storage;
+ }
+
/**
* Queue a new operation or merge into existing
*
@@ -60,52 +78,12 @@
{
try {
$incoming = $this->buildOperation($type, $userId, $data, $options);
- $mergeable = $this->registry->getMergeable($type);
- $existingById = $this->storage->find($incoming->id);
- if ($existingById) {
- // Operation with this ID already exists
- if (in_array($existingById->state, ['pending', 'scheduled']) && $mergeable) {
- // Still pending and mergeable, merge into it
- $merged = $mergeable->merge($existingById, $incoming);
- $this->storage->save($merged);
- $this->runQueueOnShutdown();
-
- return [
- 'success' => true,
- 'operation_id' => $merged->id,
- 'updated_existing' => true,
- ];
- } else {
- // Already processing/completed, or not mergeable - generate new ID
- $incoming->id = 'u' . $userId . '_' . time() . '_' . uniqid();
-
- JVB()->error()->log(
- '[Queue]:add',
- 'Duplicate ID for non-mergeable operation, generated new ID',
- [
- 'type' => $type,
- 'existing_state' => $existingById->state,
- ],
- 'warning'
- );
- }
- }
-
- if ($mergeable) {
- $existing = $this->storage->findMergeable($type, $userId);
-
- if ($existing && $mergeable->canMerge($existing, $incoming)) {
- $merged = $mergeable->merge($existing, $incoming);
- $this->storage->save($merged);
- $this->runQueueOnShutdown();
-
- return [
- 'success' => true,
- 'operation_id' => $merged->id,
- 'updated_existing' => true,
- ];
- }
+ // Attempt pre-insert merge
+ $merged = $this->tryMerge($incoming);
+ if ($merged) {
+ $this->runQueueOnShutdown();
+ return $merged;
}
$this->storage->insert($incoming);
@@ -124,6 +102,59 @@
}
/**
+ * Try to merge incoming operation into an existing pending/scheduled one.
+ * Returns result array if merged, null if not.
+ * @throws \Throwable
+ */
+ private function tryMerge(Operation $incoming): ?array
+ {
+ $mergeable = $this->registry->getMergeable($incoming->type);
+ if (!$mergeable) {
+ return null;
+ }
+
+ $criteria = $mergeable->matchCriteria($incoming);
+
+ $existing = $this->storage->findMergeable($incoming->type, $incoming->userId, $criteria);
+ if (!$existing || !$mergeable->canMerge($existing, $incoming)) {
+ return null;
+ }
+
+ $this->storage->withTransaction(function () use ($incoming, $existing, $mergeable) {
+ $mergeable->merge($existing, $incoming);
+
+ $this->storage->replaceDependency(
+ $incoming->id,
+ $existing->id
+ );
+
+ // Merge dependency arrays safely
+ $existing->dependencies = array_values(array_unique(
+ array_merge($existing->dependencies, $incoming->dependencies)
+ ));
+
+ // Prevent self dependency
+ $existing->dependencies = array_diff(
+ $existing->dependencies,
+ [$existing->id]
+ );
+
+ $this->storage->save($existing);
+
+ $incoming->state = 'completed';
+ $incoming->outcome = 'merged';
+ $incoming->merged_into = $existing->id;
+ $this->storage->saveFinal($incoming);
+ });
+
+ return [
+ 'success' => true,
+ 'operation_id' => $existing->id,
+ 'updated_existing' => true,
+ ];
+ }
+
+ /**
* Alias for add() - backwards compatibility
*/
public function queueOperation(string $type, int $userId, array $data, array $options = []): array|WP_Error
@@ -133,43 +164,18 @@
public function checkQueue(): void
{
- if (!$this->storage->getQueueInfo()['has_items']) {
- return;
- }
+ $this->locker->withLock(function () {
+ $this->processor->run();
+ });
- $this->processor->run();
}
public function maintenance(): void
{
- if ($this->locker->isLocked()) {
- return;
- }
-
- $this->cleanupStuck();
- }
-
- private function cleanupStuck(): void
- {
- // Operations stuck in processing > 30 min → reschedule
- global $wpdb;
- $table = $wpdb->prefix . BASE . '_operation_queue';
-
- $stuck = $wpdb->get_results($wpdb->prepare("
- SELECT id FROM {$table}
- WHERE state = 'processing'
- AND started_at < %s
- ", date('Y-m-d H:i:s', strtotime('-30 minutes'))));
-
- foreach ($stuck as $row) {
- $op = $this->storage->find($row->id);
- if ($op) {
- $op->state = 'scheduled';
- $op->scheduledAt = date('Y-m-d H:i:s', time() + 60);
- $op->retries++;
- $this->storage->save($op);
- }
- }
+ $this->locker->withLock(function () {
+ $this->storage->resetStuckOperations(30);
+ });
+ $this->runQueueOnShutdown();
}
// === Public Getters ===
@@ -206,6 +212,7 @@
'user_id' => $op->userId,
'metadata' => $op->metadata,
'dependencies' => $op->dependencies,
+ 'merged_into' => $op->merged_into,
default => null,
};
}
@@ -266,6 +273,7 @@
*/
public function retry(string $id, int $userId): bool
{
+
$op = $this->get($id);
if (!$op || $op->userId !== $userId) {
return false;
@@ -282,7 +290,7 @@
$op->lastErrorHash = null;
$op->scheduledAt = current_time('mysql');
$op->retries++;
-
+ error_log('[Queue]Retrying operation '.print_r($op->id, true));
$saved = $this->storage->save($op);
if ($saved) {
@@ -323,15 +331,9 @@
default => null,
};
}
-
- return $this->storage->save($op);
+ error_log('[Queue]: updating operation '.print_r($op->id, true));
+ return $this->storage->saveProgress($op);
}
-
- public function isLocked(): bool
- {
- return $this->locker->isLocked();
- }
-
// === Private Helpers ===
private function buildOperation(string $type, int $userId, array $data, array $options): Operation
@@ -417,4 +419,48 @@
$this->checkQueue();
}
+
+ public function registerAdminAction():void
+ {
+ $admin = JVB()->admin();
+ $admin->registerAction(
+ 'Restart Stuck Operations',
+ 'restart-stuck-operations',
+ 'manage_options',
+ 'arrows-clockwise'
+ );
+ $admin->registerAction(
+ 'Unlock Queue',
+ 'unlock-operation-queue',
+ 'manage_options',
+ 'infinity'
+ );
+ }
+ /**
+ * @param WP_REST_Response $response
+ * @param string $action
+ *
+ * @return bool|WP_REST_Response
+ */
+ public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
+ {
+ switch ($action) {
+ case 'unlock-operation-queue':
+ error_log('Unlocking Queue');
+ $this->locker->unlock();
+ return new WP_REST_Response([
+ 'success' => true,
+ 'message' => 'Unlocked Queue'
+ ]);
+ case 'restart-stuck-operations':
+ error_log('Restarting stuck operations');
+ $this->maintenance();
+ return new WP_REST_Response([
+ 'success' => true,
+ 'message' => 'Restarted Stuck Operations'
+ ]);
+ default:
+ return $response;
+ }
+ }
}
--
Gitblit v1.10.0