From 47e77f9fac1155c536b2b87fec552c7fcce66fa6 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Mon, 01 Jun 2026 18:06:34 +0000
Subject: [PATCH] =Timeline block fixes. Next up: adding article schema classes
---
inc/managers/OperationQueue.php | 169 +++++++++++++++++++++++--------------------------------
1 files changed, 71 insertions(+), 98 deletions(-)
diff --git a/inc/managers/OperationQueue.php b/inc/managers/OperationQueue.php
index 040447d..3b93ccf 100644
--- a/inc/managers/OperationQueue.php
+++ b/inc/managers/OperationQueue.php
@@ -1,9 +1,7 @@
<?php
namespace JVBase\managers;
-use JVBase\managers\CacheManager;
use Exception;
-use JVBase\utility\Features;
use WP_Error;
use WP_REST_Response;
use WP_REST_Request;
@@ -12,21 +10,6 @@
exit; // Exit if accessed directly
}
-//TODO: Register a server cron job for jvb_process_queue
-// 1) Log in to your Ploi dashboard
-// 2) Navigate to your server and select the site where Edmonton Ink is installed
-// 3) Go to the "Cron Jobs" tab in the site management interface
-// 4) Click "Create Cron Job" to add a new scheduled task
-// 5) Configure the cron job with these settings:
-// - Command: Use the WP-CLI to trigger your custom wp cron event:
-// cd /path/to/your/wordpress && php wp-cli.phar cron event run jvb_process_queue
-// - Or if WP-CLI is installed globally:
-// cd /path/to/your/wordpress && wp cron event run ei_process_queue
-// - User: Select the appropriate system user (usually the one associated with your site)
-// - Frequency: Set to run every 5 minutes for queue processing:
-// */5 * * * *
-
-
//# Every minute - main queue processing
//* * * * * cd /path/to/wordpress && wp cron event run jvb_process_queue
//
@@ -62,25 +45,21 @@
];
- protected ?CacheManager $cache = null;
+ protected ?Cache $cache = null;
protected int $ttl = 300;
- protected string $cacheGroup = 'queue';
// Cache keys for different data types
private const CACHE_QUEUE_STATUS = 'status';
private const CACHE_HAS_ITEMS = 'has_items';
private const CACHE_OPERATION_PREFIX = 'op_';
private const CACHE_USER_QUEUE_PREFIX = 'user_queue_';
- private const CACHE_METRICS_PREFIX = 'metrics_';
private const CACHE_QUEUE_SIZE = 'queue_size';
- // Prepared statement cache
- protected array $preparedStatements = [];
public function __construct()
{
global $wpdb;
$this->wpdb = $wpdb;
- $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
+ $this->cache = Cache::for('queue', DAY_IN_SECONDS)->connect('user');
add_action('jvb_process_queue', [ $this, 'checkQueue' ]);
add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']);
add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']);
@@ -132,7 +111,7 @@
*
* @return bool|WP_REST_Response
*/
- public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action)
+ public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
{
switch ($action) {
case 'unlock-operation-queue':
@@ -167,8 +146,8 @@
$result = $this->wpdb->get_row($this->wpdb->prepare("
SELECT
COUNT(*) as total,
- SUM(CASE WHEN status IN ('pending', 'processing') THEN 1 ELSE 0 END) as active,
- SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= %s THEN 1 ELSE 0 END) as ready_scheduled
+ SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active,
+ SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
FROM $table
WHERE status IN ('pending', 'processing', 'scheduled')
", $current_time));
@@ -230,17 +209,6 @@
// $this->pruneOldMetrics(); // Optional: clean old metrics
}
- protected function pruneOldMetrics(): void
- {
- $metricsTable = $this->wpdb->prefix . $this->metricsTable;
-
- // Keep only last 90 days of metrics
- $this->wpdb->query($this->wpdb->prepare(
- "DELETE FROM $metricsTable WHERE date < %s",
- date('Y-m-d', strtotime('-90 days'))
- ));
- }
-
protected function isHeavyOperation(string $type): bool
{
$heavyOperations = [
@@ -270,24 +238,18 @@
* Check if the queue is currently locked for processing
* @return bool
*/
- protected function isQueueLocked():bool
- {
- $lock = get_transient(BASE . 'queue_lock');
+ protected function isQueueLocked(): bool
+ {
+ $lock_name = BASE . '_queue_lock';
- if ($lock) {
- // Check if the lock is stale (process might have crashed)
- $lock_time = (int)$lock;
- $current_time = time();
+ // Check if lock exists and is recent
+ $result = $this->wpdb->get_var($this->wpdb->prepare(
+ "SELECT IS_USED_LOCK(%s)",
+ $lock_name
+ ));
- // If lock is older than 5 minutes, it's probably stale
- if (($current_time - $lock_time) > 300) {
- $this->unlockQueue(); // Clear stale lock
- return false;
- }
- return true;
- }
- return false;
- }
+ return $result !== null;
+ }
/**
* Check if server is relatively idle (very low load)
@@ -315,25 +277,32 @@
* Acquire a lock on the queue
* @return bool
*/
- protected function lockQueue():bool
- {
- // Try to acquire the lock
- $result = set_transient(BASE . 'queue_lock', time());
+ protected function lockQueue(): bool
+ {
+ $lock_name = BASE . '_queue_lock';
- if ($result) {
- return true;
- }
- return false;
- }
+ // Try to acquire lock with 0 timeout (non-blocking)
+ $result = $this->wpdb->get_var($this->wpdb->prepare(
+ "SELECT GET_LOCK(%s, 0)",
+ $lock_name
+ ));
+
+ return $result === '1';
+ }
/**
* Release the queue lock
* @return void
*/
- protected function unlockQueue():void
- {
- delete_transient(BASE . 'queue_lock');
- }
+ protected function unlockQueue(): void
+ {
+ $lock_name = BASE . '_queue_lock';
+
+ $this->wpdb->query($this->wpdb->prepare(
+ "SELECT RELEASE_LOCK(%s)",
+ $lock_name
+ ));
+ }
/**
@@ -566,9 +535,7 @@
}
}
- $this->updateLastModified($user_id);
- $this->invalidateQueueCache();
- $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $user_id);
+ $this->invalidateUserQueue($user_id);
$this->runQueueOnShutdown();
return [
@@ -585,10 +552,6 @@
}
}
- protected function updateLastModified(int $user_id) {
- CacheManager::updateTimestamp("user_{$user_id}");
- }
-
protected function deepMerge(array $existing, array $new): array
{
$merged = $existing;
@@ -796,8 +759,8 @@
$this->processOperation($operation);
// Invalidate operation cache after processing
- $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
- $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
+ $this->cache->forget(self::CACHE_OPERATION_PREFIX . $operation->id);
+ $this->cache->forget(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
}
// Batch invalidate caches at the end
@@ -881,7 +844,7 @@
AND retries < %d
ORDER BY
FIELD(priority, 'high', 'normal', 'low'),
- COALESCE(scheduled_at, created_at) ASC
+ COALESCE(scheduled_at, created_at)
LIMIT %d
", $current_time, $this->max_attempts, $batch_size));
@@ -987,6 +950,23 @@
}
/**
+ * Check if user's queue has been modified since given timestamp
+ * Returns true if modified, false if not (can send 304)
+ */
+ public function isUserQueueModified(int $user_id, int $since_timestamp): bool
+ {
+ return $this->cache::lastModified("user_{$user_id}") > $since_timestamp;
+ }
+ protected function invalidateUserQueue(int $user_id): void
+ {
+ // This automatically:
+ // 1. Updates HTTP timestamp for user_{$user_id}
+ // 2. Flushes user-specific caches
+ // 3. Triggers connected cache invalidation
+ Cache::for($user_id)->flush();
+ }
+
+ /**
* Invalidate all queue-related caches
*/
protected function invalidateQueueCache(string $scope = 'all'): void
@@ -1005,12 +985,12 @@
$keys = $cacheKeys[$scope] ?? $cacheKeys['all'];
foreach ($keys as $key) {
- $this->cache->delete($key);
+ $this->cache->forget($key);
}
+ $this->cache->touch();
+
if ($scope === 'all') {
- // Clear entire group for complete refresh
- $this->cache->invalidate();
delete_transient('jvb_queue_status_counts');
}
}
@@ -1182,7 +1162,7 @@
if (!$updated) {
throw new Exception('Operation no longer available for processing');
}
- $this->updateLastModified($operation->user_id);
+ $this->invalidateUserQueue($operation->user_id);
$data = json_decode($operation->request_data, true);
$progress_count = (int) $operation->progress_count;
@@ -1283,9 +1263,7 @@
);
// Now do post-completion tasks
- $this->invalidateQueueCache('status');
- $this->updateLastModified($operation->user_id);
- $this->updateUserQueueTimestamp($operation->user_id);
+ $this->invalidateUserQueue($operation->user_id);
$this->trackOperationMetrics($operation->id);
@@ -1304,7 +1282,7 @@
['%s']
);
- $this->updateLastModified($operation->user_id);
+ $this->invalidateUserQueue($operation->user_id);
}
} else {
@@ -1345,8 +1323,7 @@
['%s']
);
- $this->invalidateQueueCache('status');
- $this->updateLastModified($operation->user_id);
+ $this->invalidateUserQueue($operation->user_id);
} else {
// Failed but more to process - continue with next chunk
$this->wpdb->update(
@@ -1366,8 +1343,7 @@
}
}
// Clear operation cache after any update
- $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
- $this->updateLastModified($operation->user_id);
+ $this->invalidateUserQueue($operation->user_id);
return $filterResult;
} catch (Exception $e) {
@@ -1442,12 +1418,9 @@
$batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size);
// Merge non-chunked data with chunked batch
- $result = [];
- foreach ($data as $k => $v) {
- if (!in_array($k, $keys)) {
- $result[$k] = $v; // Keep non-batch data
- }
- }
+ $result = array_filter($data, function ($k) use ($keys) {
+ return !in_array($k, $keys);
+ }, ARRAY_FILTER_USE_KEY);
// Add the batched data
foreach ($batch as $k => $v) {
@@ -1518,7 +1491,7 @@
protected function updateUserQueueTimestamp(int $user_id)
{
- CacheManager::updateTimestamp("user_{$user_id}");
+ Cache::touch("user_{$user_id}");
}
/**
@@ -1811,7 +1784,7 @@
SELECT
status,
COUNT(*) as count,
- SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= '$current_time' THEN 1 ELSE 0 END) as scheduled_ready
+ SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready
FROM $table
GROUP BY status
", OBJECT_K);
@@ -2190,8 +2163,8 @@
DATE(completed_at) as date,
type,
COUNT(*) as total_operations,
- SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful,
- SUM(CASE WHEN status LIKE 'failed%' THEN 1 ELSE 0 END) as failed,
+ SUM(IF(status = 'completed', 1, 0)) as successful,
+ SUM(IF(status LIKE 'failed%', 1, 0)) as failed,
AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration,
SUM(count) as items_processed
FROM $table
--
Gitblit v1.10.0