Jake Vanderwerf
2026-01-11 474109a5df0a06f5343ab184838fe2d80e3872a8
inc/managers/OperationQueue.php
@@ -3,6 +3,7 @@
use JVBase\managers\CacheManager;
use Exception;
use JVBase\utility\Features;
use WP_Error;
use WP_REST_Response;
use WP_REST_Request;
@@ -11,21 +12,6 @@
    exit; // Exit if accessed directly
}
//TODO: Register a server cron job for jvb_process_queue
//  1) Log in to your Ploi dashboard
//  2) Navigate to your server and select the site where Edmonton Ink is installed
//  3) Go to the "Cron Jobs" tab in the site management interface
//  4) Click "Create Cron Job" to add a new scheduled task
//  5) Configure the cron job with these settings:
//      - Command: Use the WP-CLI to trigger your custom wp cron event:
//          cd /path/to/your/wordpress && php wp-cli.phar cron event run jvb_process_queue
//      - Or if WP-CLI is installed globally:
//          cd /path/to/your/wordpress && wp cron event run ei_process_queue
//      - User: Select the appropriate system user (usually the one associated with your site)
//      - Frequency: Set to run every 5 minutes for queue processing:
//            */5 * * * *
//# Every minute - main queue processing
//* * * * * cd /path/to/wordpress && wp cron event run jvb_process_queue
//
@@ -79,7 +65,7 @@
    {
      global $wpdb;
      $this->wpdb = $wpdb;
      $this->cache = new CacheManager('queue');
      $this->cache = CacheManager::for('queue', DAY_IN_SECONDS);
        add_action('jvb_process_queue', [ $this, 'checkQueue' ]);
      add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']);
        add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']);
@@ -135,7 +121,6 @@
    {
        switch ($action) {
            case 'unlock-operation-queue':
            error_log('Unlocking Queue from admin action');
                $this->unlockQueue();
                return new WP_REST_Response([
                    'success'   => true,
@@ -202,7 +187,6 @@
         return;
      }
      error_log('[queue] Checking queue...');
      // Peek at what operations we might process
      $batch_size = $this->getAdaptiveBatchSize();
@@ -432,12 +416,7 @@
   {
      try {
         $table = $this->wpdb->prefix . $this->table;
         error_log('QUEUING OPERATION: '.print_r([
            'type' => $type,
               'user_id' => $user_id,
               'data' => $data,
               'options' => $options,
            ],true));
         // Extract options
         $priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal';
         $delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0;
@@ -468,10 +447,8 @@
         // Generate operation ID
         $operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id);
         error_log('Checked OperationId: '.print_r($operation_id, true));
         // Check for existing operation
         $existing = $this->getOperation($operation_id);
         error_log('Existing: '.print_r($existing, true));
         $dependencies = $options['depends_on'] ?? [];
         if (is_string($dependencies)) {
@@ -490,10 +467,9 @@
         }
         if ($existing) {
            error_log('[queue]:queueOperation - Already Existing operation... updating');
            if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) {
               error_log('Not appending operation, checking for merge');
               if ($merge === 'merge') {
                  $existing_data = json_decode($existing->request_data ?? '{}', true);
                  $data = $this->deepMerge($existing_data, $data);
@@ -541,14 +517,12 @@
               }
            } else if ($merge === 'append') {
               error_log('Append operation, creating a new one');
               $operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4);
               $existing = null;
            }
         }
         if (!$existing) {
            error_log('Inserting new operation into table');
            // Prepare metadata with chunk config
            $metadata = $chunk_config ? ['chunk_config' => $chunk_config] : [];
@@ -579,7 +553,7 @@
         $this->updateLastModified($user_id);
         $this->invalidateQueueCache();
         $this->cache->invalidate(self::CACHE_USER_QUEUE_PREFIX . $user_id);
         $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $user_id);
         $this->runQueueOnShutdown();
         return [
@@ -591,14 +565,13 @@
         ];
      } catch (Exception $e) {
         error_log("Error queueing operation: " . $e->getMessage());
         JVB()->error()->log('queue', $e->getMessage(), $data, 'high');
         return new WP_Error('queue_failed', $e->getMessage());
      }
   }
   protected function updateLastModified(int $user_id) {
      JVB()->routes('queue')->updateUserQueueTimestamp($user_id);
      CacheManager::updateTimestamp("user_{$user_id}");
   }
   protected function deepMerge(array $existing, array $new): array
@@ -606,10 +579,8 @@
      $merged = $existing;
      if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) {
         error_log('It is an associative array!');
         return array_merge($existing, $new);
      }
      error_log('Not an associative array... moving on!');
      foreach ($new as $key => $newValue) {
         if (!array_key_exists($key, $existing)) {
            $merged[$key] = $newValue;
@@ -721,19 +692,16 @@
               'limit' => 1
            ]);
         if ($existing) {
            error_log('Found existing User operations: '.print_r($existing, true));
            $operation_id = $existing[0]->id;
         }
      }
      if (!$operation_id) {
         $operation_id = uniqid('op_');
         error_log('Generated operation ID: '.print_r($operation_id, true));
      }
      if (!str_starts_with($operation_id, 'u')) {
         $operation_id = 'u' . $user_id . '_' . $operation_id;
         error_log('Added user ID to operation ID: '.print_r($operation_id, true));
      }
      return $operation_id;
@@ -744,7 +712,6 @@
   {
      // Only add if not already scheduled AND if operation is high priority or small
      if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) {
         error_log('Adding shutdown call');
         add_action('shutdown', [$this, 'processQueueOnShutdown'], 100);
      }
   }
@@ -814,8 +781,8 @@
            $this->processOperation($operation);
            // Invalidate operation cache after processing
            $this->cache->invalidate(self::CACHE_OPERATION_PREFIX . $operation->id);
            $this->cache->invalidate(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
            $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
            $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
         }
         // Batch invalidate caches at the end
@@ -909,7 +876,6 @@
   protected function canProcessOperation(object $operation): bool
   {
      if ($operation->retries >= $this->max_attempts) {
         error_log("Operation {$operation->id} exceeded retry limit: {$operation->retries} >= {$this->max_attempts}");
         $this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts');
         return false;
      }
@@ -942,7 +908,6 @@
      if ($result) {
         $this->invalidateQueueCache('status');
         error_log("Marked operation {$operation_id} as permanently failed: {$final_error_message}");
      }
   }
@@ -1025,13 +990,12 @@
      $keys = $cacheKeys[$scope] ?? $cacheKeys['all'];
      foreach ($keys as $key) {
         $this->cache->invalidate($key);
         $this->cache->delete($key);
      }
      if ($scope === 'all') {
         // Clear entire group for complete refresh
         $this->cache->invalidateGroup($this->cacheGroup);
         jvbUpdateCacheTimestamp('queue');
         $this->cache->invalidate();
         delete_transient('jvb_queue_status_counts');
      }
   }
@@ -1143,14 +1107,6 @@
         ['%s']
      );
      // Log retry schedule
      error_log(sprintf(
         "Operation %s scheduled for retry #%d at %s (delay: %ds)",
         $operation_id,
         $attempt,
         $scheduled_at,
         $delay
      ));
   }
    /**
@@ -1180,7 +1136,7 @@
        $message .= "Please check the error logs for more details.";
        return jvbMail($admin_email, $subject, $message);
        return JVB()->email()->sendEmail($admin_email, $subject, $message);
    }
    /**
@@ -1277,8 +1233,6 @@
            ];
         }
         error_log('Filtered Result: '.print_r($filterResult, true));
         $newCount = $progress_count + $chunk['progress'];
         if ($filterResult['success']) {
@@ -1292,12 +1246,7 @@
               $filterResult['result'] = [$filterResult['result']];
            }
            // Store the result data
            error_log('Merging Old Result: '. print_r($oldResult, true));
            error_log('With Newer Result: '. print_r($filterResult['result'], true));
            $resultToStore = $this->deepMerge($oldResult, $filterResult['result']);
            error_log('Merged Result: '.print_r($resultToStore, true));
            $resultToStore['processed_at'] = current_time('mysql');
            // Check if operation is complete
@@ -1318,10 +1267,6 @@
                  ['%s']
               );
               error_log('Completion result: '.print_r($result, true));
               // Now do post-completion tasks
               $this->invalidateQueueCache('status');
               $this->updateLastModified($operation->user_id);
@@ -1406,7 +1351,7 @@
            }
         }
         // Clear operation cache after any update
         $this->cache->invalidate(self::CACHE_OPERATION_PREFIX . $operation->id);
         $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id);
         $this->updateLastModified($operation->user_id);
         return $filterResult;
@@ -1557,8 +1502,8 @@
    protected function updateUserQueueTimestamp(int $user_id)
    {
        $key = "{$user_id}_queue_timestamp";
      $this->cache->set($key, time());
      CacheManager::updateTimestamp("user_{$user_id}");
    }
   /**
@@ -1684,91 +1629,101 @@
     * Send daily metrics report to admin
     * @return void
     */
    public function emailDailyMetricsReport():void
    {
   public function emailDailyMetricsReport():void
   {
      $metrics_table = $this->wpdb->prefix . $this->metricsTable;
      $yesterday = date('Y-m-d', strtotime('-1 day'));
        $metrics_table = $this->wpdb->prefix . $this->metricsTable;
        $yesterday     = date('Y-m-d', strtotime('-1 day'));
      $metrics = $this->wpdb->get_results($this->wpdb->prepare(
         "SELECT * FROM $metrics_table WHERE date = %s",
         $yesterday
      ));
        // Get yesterday's metrics
        $metrics = $this->wpdb->get_results($this->wpdb->prepare(
            "SELECT * FROM $metrics_table WHERE date = %s",
            $yesterday
        ));
      if (empty($metrics)) {
         return;
      }
        if (empty($metrics)) {
            return; // No metrics to report
        }
      $admin_email = get_option('admin_email');
      $site_name = get_bloginfo('name');
      $subject = "[$site_name] Daily Queue Performance - " . $yesterday;
        $admin_email = get_option('admin_email');
        $site_name   = get_bloginfo('name');
      // Calculate totals
      $total_ops = 0;
      $total_success = 0;
      $total_failed = 0;
      $total_items = 0;
        $subject = "[$site_name] Daily Queue Performance Report - " . date('Y-m-d', strtotime('-1 day'));
      foreach ($metrics as $metric) {
         $total_ops += $metric->total_operations;
         $total_success += $metric->successful_operations;
         $total_failed += $metric->failed_operations;
         $total_items += $metric->total_items_processed;
      }
        $message = "Daily Queue Performance Report for $yesterday\n\n";
      $success_rate = round(($total_success / max(1, $total_ops)) * 100, 1);
        $message       .= "SUMMARY:\n";
        $total_ops     = 0;
        $total_success = 0;
        $total_failed  = 0;
        $total_items   = 0;
      $message = JVB()->email()->h1('Daily Queue Performance Report');
      $message .= sprintf('<p>Report for <strong>%s</strong></p>', $yesterday);
        foreach ($metrics as $metric) {
            $total_ops     += $metric->total_operations;
            $total_success += $metric->successful_operations;
            $total_failed  += $metric->failed_operations;
            $total_items   += $metric->total_items_processed;
        }
      // Summary stats in grid
      $stats = [
         JVB()->email()->stat($total_ops, 'Operations'),
         JVB()->email()->stat($total_success, 'Successful', '✓'),
         JVB()->email()->stat($total_failed, 'Failed', $total_failed > 0 ? '⚠' : ''),
         JVB()->email()->stat($success_rate . '%', 'Success Rate')
      ];
      $message .= JVB()->email()->grid($stats, 4);
        $message .= "- Total Operations: $total_ops\n";
        $message .= "- Successful: $total_success\n";
        $message .= "- Failed: $total_failed\n";
        $message .= "- Success Rate: " . round(($total_success / max(1, $total_ops)) * 100, 2) . "%\n";
        $message .= "- Total Items Processed: $total_items\n\n";
      $message .= JVB()->email()->spacer(20);
        $message .= "DETAILS BY OPERATION TYPE:\n";
      // Alert if success rate is low
      if ($success_rate < 90) {
         $message .= JVB()->email()->alert(
            sprintf('Success rate of %s%% is below the 90%% threshold', $success_rate),
            'warning'
         );
      }
        foreach ($metrics as $metric) {
            $message .= "• $metric->type:\n";
            $message .= "  - Operations: $metric->total_operations\n";
            $message .= "  - Success: $metric->successful_operations\n";
            $message .= "  - Failed: $metric->failed_operations\n";
      $message .= JVB()->email()->h2('Details by Operation Type');
            if ($metric->average_duration) {
                $message .= "  - Avg. Duration: " . round($metric->average_duration, 2) . " seconds\n";
            }
      // Details for each operation type
      foreach ($metrics as $metric) {
         $details = [];
         $details[] = ['label' => 'Total', 'value' => $metric->total_operations];
         $details[] = ['label' => 'Success', 'value' => JVB()->email()->badge($metric->successful_operations, 'success')];
         $details[] = ['label' => 'Failed', 'value' => $metric->failed_operations > 0 ? JVB()->email()->badge($metric->failed_operations, 'error') : '0'];
            $message .= "  - Items Processed: $metric->total_items_processed\n";
         if ($metric->average_duration) {
            $details[] = ['label' => 'Avg Duration', 'value' => round($metric->average_duration, 2) . 's'];
         }
            if ($metric->peak_queue_size) {
                $message .= "  - Peak Queue Size: $metric->peak_queue_size\n";
            }
         $details[] = ['label' => 'Items Processed', 'value' => number_format($metric->total_items_processed)];
            if ($metric->peak_memory_usage) {
                $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2);
                $message   .= "  - Peak Memory Usage: $memory_mb MB\n";
            }
         if ($metric->peak_memory_usage) {
            $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2);
            $details[] = ['label' => 'Peak Memory', 'value' => $memory_mb . ' MB'];
         }
            if ($metric->peak_cpu_usage) {
                $cpu_percent = round($metric->peak_cpu_usage * 50, 2); // Assuming 2 cores
                $message     .= "  - Peak CPU Usage: $cpu_percent%\n";
            }
         $message .= JVB()->email()->card(
            JVB()->email()->table($details),
            esc_html($metric->type)
         );
      }
            $message .= "\n";
        }
      // Current queue status
      $pending_count = $this->getCurrentQueueSize();
      if ($pending_count > 0) {
         $message .= JVB()->email()->spacer(20);
         $message .= JVB()->email()->notice(
            sprintf('<strong>Current Queue:</strong> %d operations pending', $pending_count)
         );
      }
        // Add any outstanding queue items
        $pending_count = $this->getCurrentQueueSize();
        if ($pending_count > 0) {
            $message .= "CURRENT QUEUE STATUS:\n";
            $message .= "- $pending_count operations currently pending in the queue\n\n";
        }
      $message .= JVB()->email()->spacer(20);
      $message .= JVB()->email()->button(admin_url('admin.php?page=jvb-queue'), 'View Queue Dashboard');
        $message .= "This is an automated report. Please check the admin dashboard for more details.";
        // Send email
        jvbMail($admin_email, $subject, $message);
    }
      JVB()->email()->sendEmail($admin_email, $subject, $message, 'QUEUE REPORT');
   }
    /**
     * @return int
@@ -2063,8 +2018,6 @@
        ", $this->max_attempts));
      if (!empty($stuck_operations)) {
         error_log("Found " . count($stuck_operations) . " stuck operations to clean up");
         foreach ($stuck_operations as $operation) {
            $this->markAsPermanentlyFailed(
               $operation->id,
@@ -2081,8 +2034,6 @@
        ", date('Y-m-d H:i:s', strtotime('-1 hour'))));
      if (!empty($long_processing)) {
         error_log("Found " . count($long_processing) . " long-running operations to reset");
         foreach ($long_processing as $operation) {
            $this->wpdb->update($table, [
               'status' => 'pending',