From 2127b1bdd73ecd2423e443992da4b442f5a3c1a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Wed, 04 Feb 2026 21:19:25 +0000
Subject: [PATCH] =Major overhaul of MetaManager.php -> Meta.php and RestRouteManager.php -> Rest.php. Seems to work for JakeVan
---
inc/managers/queue/Queue.php | 106 ++++++++--------------------------------------------
1 files changed, 17 insertions(+), 89 deletions(-)
diff --git a/inc/managers/queue/Queue.php b/inc/managers/queue/Queue.php
index ce213f2..31c3823 100644
--- a/inc/managers/queue/Queue.php
+++ b/inc/managers/queue/Queue.php
@@ -17,10 +17,10 @@
{
$this->storage = new Storage();
$this->registry = new TypeRegistry();
- $this->locker = new Locker('queue_processor', 300);
+ $this->locker = new Locker();
$executor = new FilteredExecutor($this->storage);
- $this->processor = new Processor($this->storage, $executor, $this->registry, $this->locker);
+ $this->processor = new Processor($this->storage, $executor, $this->registry);
add_action('jvb_process_queue', [$this, 'checkQueue']);
add_action('jvb_queue_maintenance', [$this, 'maintenance']);
@@ -41,6 +41,11 @@
return $this->registry;
}
+ public function storage(): Storage
+ {
+ return $this->storage;
+ }
+
/**
* Queue a new operation or merge into existing
*
@@ -60,53 +65,6 @@
{
try {
$incoming = $this->buildOperation($type, $userId, $data, $options);
- $mergeable = $this->registry->getMergeable($type);
- $existingById = $this->storage->find($incoming->id);
-
- if ($existingById) {
- // Operation with this ID already exists
- if (in_array($existingById->state, ['pending', 'scheduled']) && $mergeable) {
- // Still pending and mergeable, merge into it
- $merged = $mergeable->merge($existingById, $incoming);
- $this->storage->save($merged);
- $this->runQueueOnShutdown();
-
- return [
- 'success' => true,
- 'operation_id' => $merged->id,
- 'updated_existing' => true,
- ];
- } else {
- // Already processing/completed, or not mergeable - generate new ID
- $incoming->id = 'u' . $userId . '_' . time() . '_' . uniqid();
-
- JVB()->error()->log(
- '[Queue]:add',
- 'Duplicate ID for non-mergeable operation, generated new ID',
- [
- 'type' => $type,
- 'existing_state' => $existingById->state,
- ],
- 'warning'
- );
- }
- }
-
- if ($mergeable) {
- $existing = $this->storage->findMergeable($type, $userId);
-
- if ($existing && $mergeable->canMerge($existing, $incoming)) {
- $merged = $mergeable->merge($existing, $incoming);
- $this->storage->save($merged);
- $this->runQueueOnShutdown();
-
- return [
- 'success' => true,
- 'operation_id' => $merged->id,
- 'updated_existing' => true,
- ];
- }
- }
$this->storage->insert($incoming);
$this->runQueueOnShutdown();
@@ -133,43 +91,18 @@
public function checkQueue(): void
{
- if (!$this->storage->getQueueInfo()['has_items']) {
- return;
- }
+ $this->locker->withLock(function () {
+ $this->processor->run();
+ });
- $this->processor->run();
}
public function maintenance(): void
{
- if ($this->locker->isLocked()) {
- return;
- }
+ $this->locker->withLock(function () {
+ $this->storage->resetStuckOperations(30);
+ });
- $this->cleanupStuck();
- }
-
- private function cleanupStuck(): void
- {
- // Operations stuck in processing > 30 min → reschedule
- global $wpdb;
- $table = $wpdb->prefix . BASE . '_operation_queue';
-
- $stuck = $wpdb->get_results($wpdb->prepare("
- SELECT id FROM {$table}
- WHERE state = 'processing'
- AND started_at < %s
- ", date('Y-m-d H:i:s', strtotime('-30 minutes'))));
-
- foreach ($stuck as $row) {
- $op = $this->storage->find($row->id);
- if ($op) {
- $op->state = 'scheduled';
- $op->scheduledAt = date('Y-m-d H:i:s', time() + 60);
- $op->retries++;
- $this->storage->save($op);
- }
- }
}
// === Public Getters ===
@@ -266,6 +199,7 @@
*/
public function retry(string $id, int $userId): bool
{
+
$op = $this->get($id);
if (!$op || $op->userId !== $userId) {
return false;
@@ -282,7 +216,7 @@
$op->lastErrorHash = null;
$op->scheduledAt = current_time('mysql');
$op->retries++;
-
+ error_log('[Queue]Retrying operation '.print_r($op->id, true));
$saved = $this->storage->save($op);
if ($saved) {
@@ -323,15 +257,9 @@
default => null,
};
}
-
- return $this->storage->save($op);
+ error_log('[Queue]: updating operation '.print_r($op->id, true));
+ return $this->storage->saveProgress($op);
}
-
- public function isLocked(): bool
- {
- return $this->locker->isLocked();
- }
-
// === Private Helpers ===
private function buildOperation(string $type, int $userId, array $data, array $options): Operation
--
Gitblit v1.10.0