Jake Vanderwerf
5 days ago a9b3b28d001941921aa70d37fdc87c758a163a44
inc/managers/OperationQueue.php
@@ -1,9 +1,7 @@
<?php
namespace JVBase\managers;
use JVBase\managers\CacheManager;
use Exception;
use JVBase\utility\Features;
use WP_Error;
use WP_REST_Response;
use WP_REST_Request;
@@ -47,25 +45,21 @@
    ];
   protected ?CacheManager $cache = null;
   protected ?Cache $cache = null;
   protected int $ttl = 300;
   protected string $cacheGroup = 'queue';
   // Cache keys for different data types
   private const CACHE_QUEUE_STATUS = 'status';
   private const CACHE_HAS_ITEMS = 'has_items';
   private const CACHE_OPERATION_PREFIX = 'op_';
   private const CACHE_USER_QUEUE_PREFIX = 'user_queue_';
   private const CACHE_METRICS_PREFIX = 'metrics_';
   private const CACHE_QUEUE_SIZE = 'queue_size';
   // Prepared statement cache
   protected array $preparedStatements = [];
    public function __construct()
    {
      global $wpdb;
      $this->wpdb = $wpdb;
      $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
      $this->cache = Cache::for('queue', DAY_IN_SECONDS)->connect('user')->user();
        add_action('jvb_process_queue', [ $this, 'checkQueue' ]);
      add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']);
        add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']);
@@ -117,7 +111,7 @@
     *
     * @return bool|WP_REST_Response
     */
    public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action)
    public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
    {
        switch ($action) {
            case 'unlock-operation-queue':
@@ -152,8 +146,8 @@
      $result = $this->wpdb->get_row($this->wpdb->prepare("
        SELECT
            COUNT(*) as total,
            SUM(CASE WHEN status IN ('pending', 'processing') THEN 1 ELSE 0 END) as active,
            SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= %s THEN 1 ELSE 0 END) as ready_scheduled
            SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active,
            SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
        FROM $table
        WHERE status IN ('pending', 'processing', 'scheduled')
    ", $current_time));
@@ -215,17 +209,6 @@
//    $this->pruneOldMetrics(); // Optional: clean old metrics
   }
   protected function pruneOldMetrics(): void
   {
      $metricsTable = $this->wpdb->prefix . $this->metricsTable;
      // Keep only last 90 days of metrics
      $this->wpdb->query($this->wpdb->prepare(
         "DELETE FROM $metricsTable WHERE date < %s",
         date('Y-m-d', strtotime('-90 days'))
      ));
   }
   protected function isHeavyOperation(string $type): bool
   {
      $heavyOperations = [
@@ -255,24 +238,18 @@
     * Check if the queue is currently locked for processing
     * @return bool
     */
    protected function isQueueLocked():bool
    {
        $lock = get_transient(BASE . 'queue_lock');
   protected function isQueueLocked(): bool
   {
      $lock_name = BASE . '_queue_lock';
        if ($lock) {
            // Check if the lock is stale (process might have crashed)
            $lock_time    = (int)$lock;
            $current_time = time();
      // Check if lock exists and is recent
      $result = $this->wpdb->get_var($this->wpdb->prepare(
         "SELECT IS_USED_LOCK(%s)",
         $lock_name
      ));
            // If lock is older than 5 minutes, it's probably stale
            if (($current_time - $lock_time) > 300) {
                $this->unlockQueue(); // Clear stale lock
                return false;
            }
            return true;
        }
        return false;
    }
      return $result !== null;
   }
    /**
     * Check if server is relatively idle (very low load)
@@ -300,25 +277,32 @@
     * Acquire a lock on the queue
     * @return bool
     */
    protected function lockQueue():bool
    {
        // Try to acquire the lock
        $result = set_transient(BASE . 'queue_lock', time());
   protected function lockQueue(): bool
   {
      $lock_name = BASE . '_queue_lock';
        if ($result) {
            return true;
        }
        return false;
    }
      // Try to acquire lock with 0 timeout (non-blocking)
      $result = $this->wpdb->get_var($this->wpdb->prepare(
         "SELECT GET_LOCK(%s, 0)",
         $lock_name
      ));
      return $result === '1';
   }
    /**
     * Release the queue lock
     * @return void
     */
    protected function unlockQueue():void
    {
        delete_transient(BASE . 'queue_lock');
    }
   protected function unlockQueue(): void
   {
      $lock_name = BASE . '_queue_lock';
      $this->wpdb->query($this->wpdb->prepare(
         "SELECT RELEASE_LOCK(%s)",
         $lock_name
      ));
   }
    /**
@@ -551,9 +535,7 @@
            }
         }
         $this->updateLastModified($user_id);
         $this->invalidateQueueCache();
         $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $user_id);
         $this->invalidateUserQueue($user_id);
         $this->runQueueOnShutdown();
         return [
@@ -570,10 +552,6 @@
      }
   }
   protected function updateLastModified(int $user_id) {
      CacheManager::updateTimestamp("user_{$user_id}");
   }
   protected function deepMerge(array $existing, array $new): array
   {
      $merged = $existing;
@@ -781,8 +759,8 @@
            $this->processOperation($operation);
            // Invalidate operation cache after processing
            $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
            $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
            $this->cache->forget(self::CACHE_OPERATION_PREFIX . $operation->id);
            $this->cache->forget(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
         }
         // Batch invalidate caches at the end
@@ -866,7 +844,7 @@
        AND retries < %d
        ORDER BY
            FIELD(priority, 'high', 'normal', 'low'),
            COALESCE(scheduled_at, created_at) ASC
            COALESCE(scheduled_at, created_at)
        LIMIT %d
    ", $current_time, $this->max_attempts, $batch_size));
@@ -972,6 +950,23 @@
   }
   /**
    * Check if user's queue has been modified since given timestamp
    * Returns true if modified, false if not (can send 304)
    */
   public function isUserQueueModified(int $user_id, int $since_timestamp): bool
   {
      return $this->cache::lastModified("user_{$user_id}") > $since_timestamp;
   }
   protected function invalidateUserQueue(int $user_id): void
   {
      // This automatically:
      // 1. Updates HTTP timestamp for user_{$user_id}
      // 2. Flushes user-specific caches
      // 3. Triggers connected cache invalidation
      Cache::for($user_id)->flush();
   }
   /**
    * Invalidate all queue-related caches
    */
   protected function invalidateQueueCache(string $scope = 'all'): void
@@ -990,12 +985,12 @@
      $keys = $cacheKeys[$scope] ?? $cacheKeys['all'];
      foreach ($keys as $key) {
         $this->cache->delete($key);
         $this->cache->forget($key);
      }
      $this->cache->touch();
      if ($scope === 'all') {
         // Clear entire group for complete refresh
         $this->cache->invalidate();
         delete_transient('jvb_queue_status_counts');
      }
   }
@@ -1167,7 +1162,7 @@
         if (!$updated) {
            throw new Exception('Operation no longer available for processing');
         }
         $this->updateLastModified($operation->user_id);
         $this->invalidateUserQueue($operation->user_id);
         $data = json_decode($operation->request_data, true);
         $progress_count = (int) $operation->progress_count;
@@ -1268,9 +1263,7 @@
               );
               // Now do post-completion tasks
               $this->invalidateQueueCache('status');
               $this->updateLastModified($operation->user_id);
               $this->updateUserQueueTimestamp($operation->user_id);
               $this->invalidateUserQueue($operation->user_id);
               $this->trackOperationMetrics($operation->id);
@@ -1289,7 +1282,7 @@
                  ['%s']
               );
               $this->updateLastModified($operation->user_id);
               $this->invalidateUserQueue($operation->user_id);
            }
         } else {
@@ -1330,8 +1323,7 @@
                  ['%s']
               );
               $this->invalidateQueueCache('status');
               $this->updateLastModified($operation->user_id);
               $this->invalidateUserQueue($operation->user_id);
            } else {
               // Failed but more to process - continue with next chunk
               $this->wpdb->update(
@@ -1351,8 +1343,7 @@
            }
         }
         // Clear operation cache after any update
         $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
         $this->updateLastModified($operation->user_id);
         $this->invalidateUserQueue($operation->user_id);
         return $filterResult;
      } catch (Exception $e) {
@@ -1427,12 +1418,9 @@
      $batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size);
      // Merge non-chunked data with chunked batch
      $result = [];
      foreach ($data as $k => $v) {
         if (!in_array($k, $keys)) {
            $result[$k] = $v; // Keep non-batch data
         }
      }
      $result = array_filter($data, function ($k) use ($keys) {
         return !in_array($k, $keys);
      }, ARRAY_FILTER_USE_KEY);
      // Add the batched data
      foreach ($batch as $k => $v) {
@@ -1503,7 +1491,7 @@
    protected function updateUserQueueTimestamp(int $user_id)
    {
      CacheManager::updateTimestamp("user_{$user_id}");
      Cache::touch("user_{$user_id}");
    }
   /**
@@ -1796,7 +1784,7 @@
            SELECT
                status,
                COUNT(*) as count,
                SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= '$current_time' THEN 1 ELSE 0 END) as scheduled_ready
                SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready
            FROM $table
            GROUP BY status
        ", OBJECT_K);
@@ -2175,8 +2163,8 @@
                DATE(completed_at) as date,
                type,
                COUNT(*) as total_operations,
                SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful,
                SUM(CASE WHEN status LIKE 'failed%' THEN 1 ELSE 0 END) as failed,
                SUM(IF(status = 'completed', 1, 0)) as successful,
                SUM(IF(status LIKE 'failed%', 1, 0)) as failed,
                AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration,
                SUM(count) as items_processed
            FROM $table