Jake Vanderwerf
2026-02-04 2127b1bdd73ecd2423e443992da4b442f5a3c1a3
inc/rest/routes/QueueRoutes.php
@@ -1,259 +1,316 @@
<?php
namespace JVBase\rest\routes;
use Exception;
use JVBase\JVB;
use JVBase\managers\CacheManager;
use JVBase\rest\RestRouteManager;
use JVBase\managers\Cache;
use JVBase\managers\queue\Operation;
use JVBase\rest\Rest;
use JVBase\rest\Route;
use JVBase\rest\Response;
use WP_REST_Request;
use WP_REST_Response;
use DateTime;
use DateTimeZone;
if (!defined('ABSPATH')) {
    exit; // Exit if accessed directly
   exit; // Exit if accessed directly
}
class QueueRoutes extends RestRouteManager
class QueueRoutes extends Rest
{
   public function __construct()
   {
      $this->cacheName = 'queue';
      $this->cacheTtl = 300;
      parent::__construct();
    protected string $table = BASE.'_operation_queue';
    protected string $metricsTable = BASE.'stats__operation_queue';
      if (JVB_TESTING) {
         $this->cache->flush();
      }
   }
    public function __construct()
    {
        $this->cache_name = 'queue';
        $this->cache_ttl = 300;
        parent::__construct();
    }
   /**
    * Registers queue routes
    * @return void
    */
   public function registerRoutes():void
   {
      // Main queue endpoint - GET and POST
      Route::for('queue')
         ->get([$this, 'getQueue'])
         ->args([
            'status' => 'string|enum:all,queued,pending,processing,completed,failed,failed_permanent|default:all',
            'ids' => 'string',
            'limit' => 'integer|default:50|min:1|max:100',
         ])
         ->auth('user')
         ->rateLimit(30)
         ->post([$this, 'handleAction'])
         ->args([
            'ids' => 'array|required',
            'action' => 'string|required|enum:dismiss,retry,cancel',
         ])
         ->auth('user')
         ->rateLimit(30);
    /**
     * Registers queue routes
     * @return void
     */
    public function registerRoutes():void
    {
      register_rest_route($this->namespace, '/queue', [
         [
            'methods'   => 'GET',
            'callback'  => [$this, 'getQueue'],
            'permission_callback'   => [$this, 'checkPermission'],
            'args'  => [
               'status'    => [
                  'type'  => 'string',
                  'enum'  => ['all', 'queued', 'pending', 'processing', 'completed', 'failed', 'failed_permanent'],
                  'default' => 'all'
               ],
               'ids'    => [
                  'required'  => false,
                  'type'  => 'string',
                  'description' => 'Comma-separated list of operation IDs'
               ],
               'limit' => [
                  'type' => 'integer',
                  'default' => 50,
                  'minimum' => 1,
                  'maximum' => 100
               ]
            ]
         ],
         [
            'methods'   => 'POST',
            'callback'  => [$this, 'handleAction'],
            'permission_callback'   => [$this, 'checkPermission'],
            'args' => [
               'ids'    => [
                  'required'  => true,
                  'type'  => 'array',
                  'items' => [
                     'type' => 'string'
                  ],
                  'description' => 'Array of operation IDs (single or multiple)'
               ],
               'action'    => [
                  'required'  => true,
                  'type'  => 'string',
                  'enum'  => ['dismiss', 'retry', 'cancel'],
                  'description' => 'Action to perform on the operations'
               ]
            ]
         ]
      ]);
    }
      // Poll endpoint
      Route::for('queue/poll')
         ->get([$this, 'pollQueue'])
         ->args([
            'since' => 'string',
            'ids' => 'string',
         ])
         ->auth('user')
         ->rateLimit(15);
    /**
     * Get queue operations with optional filtering
     *
     * @param WP_REST_Request $request
     * @return WP_REST_Response
     */
      // Errors endpoint
      Route::for('queue/errors')
         ->get([$this, 'getOperationErrors'])
         ->auth('user')
         ->rateLimit(15);
      // Single operation with dynamic ID
      Route::for(Route::pattern('queue/{id}'))
         ->get([$this, 'getOperation'])
         ->arg('id', 'string|required')
         ->auth('user')
         ->rateLimit(15);
   }
   /**
    * Get queue operations with optional filtering
    *
    * @param WP_REST_Request $request
    * @return WP_REST_Response
    */
   public function getQueue(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = get_current_user_id();
      $status = $request->get_param('status');
      $ids = $request->get_param('ids');
      $limit = intval($request->get_param('limit'));
      $params = $request->get_params();
      $user_id = absint($params['user']);
      $status = sanitize_text_field($params['status']);
      $ids = array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])));
      $limit = absint($params['limit']);
      // Use base class user-specific header checking
      // This checks both 'queue' and 'user_{$user_id}' timestamps
      $cache_check = $this->checkUserHeaders($request, $user_id, 'queue');
      if ($cache_check) {
         return $cache_check; // Returns 304 Not Modified
      $cacheKey = $this->cache->generateKey(compact('user_id', 'status', 'ids', 'limit'));
      if ($cached = $this->checkHeaders($request, $cacheKey)) {
         return $cached;
      }
      global $wpdb;
      $table = $wpdb->prefix . $this->table;
      $data = $this->cache->remember($cacheKey, function() use ($user_id, $params) {
         $filters = $this->buildFilters($params);
         $operations = JVB()->queue()->getUserOperations($user_id, $filters);
      $sql = "SELECT * FROM $table WHERE user_id = %d AND COALESCE(user_dismissed, 0) = 0";
      $params = [$user_id];
         return [
            'items' => array_map([$this, 'formatOperation'], $operations),
            'total' => count($operations),
            'queue_stats' => $this->getQueueStats($user_id),
            'server_time' => date('c'),
         ];
      });
      if ($status !== 'all') {
         $sql .= " AND status = %s";
         $params[] = $status;
      $response = Response::success($data);
      return $this->addCacheHeaders($response);
   }
   private function buildFilters(array $params): array
   {
      $filters = [
         'not_dismissed' => true,
         'limit' => min(absint($params['limit'] ?? 50), 100),
      ];
      if (!empty($params['status']) && $params['status'] !== 'all') {
         $filters['state'] = $params['status'];
      }
      if (!empty($ids)) {
         $id_array = array_map('trim', explode(',', $ids));
         if (!empty($id_array)) {
            $placeholders = implode(',', array_fill(0, count($id_array), '%s'));
            $sql .= " AND id IN ($placeholders)";
            $params = array_merge($params, $id_array);
         }
      if (!empty($params['ids'])) {
         $filters['ids'] = array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])));
      }
      $sql .= " ORDER BY FIELD(status, 'processing', 'pending', 'failed', 'completed'), created_at DESC";
      return $filters;
   }
      if ($limit > 0) {
         $sql .= " LIMIT %d";
         $params[] = $limit;
   /**
    * Update operation status (dismiss or retry)
    *
    * @param WP_REST_Request $request
    * @return WP_REST_Response
    */
   public function handleAction(WP_REST_Request $request): WP_REST_Response
   {
      $data = $request->get_params();
      $ids = array_map('trim', array_map('sanitize_text_field', explode(',', $data['ids'])));
      $action = sanitize_text_field($data['action'] ?? '');
      $user_id = absint($data['user']);
      // Validate input
      if (empty($ids)) {
         return Response::validationError(['ids' => 'Missing or invalid operation IDs']);
      }
      $operations = $wpdb->get_results($wpdb->prepare($sql, $params), ARRAY_A);
      // Format operations
      foreach ($operations as &$op) {
         $op = $this->formatOperation($op);
      if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
         return Response::validationError(['action' => 'Invalid action. Must be: dismiss, retry, or cancel']);
      }
      $response = new WP_REST_Response([
         'items' => $operations,
         'total' => count($operations),
         'timestamp' => date('c'),
         'has_more' => count($operations) === $limit,
         'queue_stats' => $this->getQueueStats($user_id),
         'server_time' => date('c')
      // Get operations via Queue - verifies ownership
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'ids' => $ids,
         'limit' => count($ids),
      ]);
      // Add cache headers (ETag, Last-Modified)
      return $this->addCacheHeaders($response);
      if (empty($operations)) {
         return Response::notFound('No valid operations found');
      }
      $result = $this->processAction($action, $operations, $user_id);
      if ($result['success']) {
         Cache::touch($user_id);
      }
      return Response::success($result);
   }
   public function pollQueue(WP_REST_Request $request): WP_REST_Response
   {
      $userId = $request->get_param('user');
      $since = $request->get_param('since');
      $ids = $request->get_param('ids');
      $filters = ['not_dismissed' => true, 'limit' => 50];
      if (!empty($ids)) {
         $filters['ids'] = array_map('trim', explode(',', $ids));
      }
      $operations = JVB()->queue()->getUserOperations($userId, $filters);
      if ($since) {
         $sinceTime = strtotime($since);
         $operations = array_filter($operations, function($op) use ($sinceTime) {
            return strtotime($op->completedAt ?? $op->startedAt ?? $op->scheduledAt) > $sinceTime;
         });
      }
      $items = array_map(fn($op) => [
         'id' => $op->id,
         'status' => $this->mapStateToStatus($op->state, $op->outcome),
         'progress_count' => $op->processedItems,
         'count' => $op->totalItems,
         'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
         'error_message' => $op->errorMessage,
      ], $operations);
      return Response::success([
         'items' => array_values($items),
         'server_time' => date('c'),
         'has_active' => count(array_filter($items, fn($i) => in_array($i['status'], ['pending', 'processing']))) > 0,
      ]);
   }
   public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = absint($request->get_param('user'));
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'state' => 'completed',
         'outcome' => ['failed', 'failed_permanent', 'partial'],
         'has_errors' => true,
         'order_by' => 'updated_at DESC',
         'limit' => 20,
      ]);
      $formatted = array_map(fn($op) => [
         'id' => $op->id,
         'type' => $op->type,
         'error_message' => $op->errorMessage,
         'failed_items' => $op->failedItems ?? [],
         'retries' => $op->retries,
         'created_at' => $op->scheduledAt,
         'updated_at' => $op->completedAt,
      ], $operations);
      return Response::collection($formatted);
   }
   public function getOperation(WP_REST_Request $request): WP_REST_Response
   {
      $id = $request->get_param('id');
      $userId = $request->get_param('user');
      $op = JVB()->queue()->get($id);
      if (!$op || $op->userId !== $userId) {
         return Response::notFound('Operation not found');
      }
      return Response::item($this->formatOperation($op, true), 'operation');
   }
   /**
    * Get queue statistics for user
    */
   protected function getQueueStats(int $user_id): array
   private function getQueueStats(int $userId): array
   {
      global $wpdb;
      $table = $wpdb->prefix . $this->table;
      $stats = $wpdb->get_results($wpdb->prepare(
         "SELECT status, COUNT(*) as count FROM $table WHERE user_id = %d GROUP BY status",
         $user_id
      ), OBJECT_K);
      $formatted_stats = [
         'queued' => 0,
         'localProcessing' => 0,
         'uploading' => 0,
         'pending' => 0,
         'processing' => 0,
         'completed' => 0,
         'failed' => 0,
         'failed_permanent' => 0
      ];
      foreach ($stats as $status => $data) {
         if (isset($formatted_stats[$status])) {
            $formatted_stats[$status] = intval($data->count);
         }
      }
      return $formatted_stats;
      return array_merge(
         ['queued' => 0, 'localProcessing' => 0, 'uploading' => 0],
         JVB()->queue()->getUserStats($userId)
      );
   }
   /**
    * Format operation for API response
    */
   protected function formatOperation(array $operation): array
   private function formatOperation(Operation $op, bool $full = false): array
   {
      $formatted = [
         'id' => $operation['id'],
         'type' => $operation['type'],
         'status' => $operation['status'],
         'progress_count' => intval($operation['progress_count'] ?? 0),
         'count' => intval($operation['count'] ?? 1),
         'retries' => intval($operation['retries'] ?? 0),
         'data' => json_decode($operation['request_data'] ?? '{}', true)
         'id' => $op->id,
         'type' => $op->type,
         'status' => $this->mapStateToStatus($op->state, $op->outcome),
         'progress_count' => $op->processedItems,
         'count' => $op->totalItems,
         'title' => $this->getOperationTitle($op->type, $op->requestData),
         'created_at' => $this->formatTimestamp($op->scheduledAt),
         'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
      ];
      // Convert timestamps to ISO 8601 format with proper timezone
      $formatted['created_at'] = $this->formatTimestamp($operation['created_at']);
      $formatted['updated_at'] = $this->formatTimestamp($operation['updated_at']);
      // Add completed_at if status is completed
      if ($operation['status'] === 'completed' && !empty($operation['completed_at'])) {
         $formatted['completed_at'] = $this->formatTimestamp($operation['completed_at']);
      if ($op->processedItems > 0 && $op->totalItems > 0) {
         $formatted['progress_percentage'] = round(($op->processedItems / $op->totalItems) * 100);
      }
      // Add error message if failed
      if (!empty($operation['error_message'])) {
         $formatted['error_message'] = $operation['error_message'];
      if ($op->errorMessage) {
         $formatted['error_message'] = $op->errorMessage;
      }
      // Simple progress percentage calculation
      if ($formatted['count'] > 0) {
         $formatted['progress_percentage'] = round(
            ($formatted['progress_count'] / $formatted['count']) * 100
         );
      if ($full) {
         $formatted += [
            'data' => $op->requestData,
            'result' => $op->result ?? [],
            'retries' => $op->retries,
            'user_dismissed' => $op->userDismissed,
         ];
         if ($op->state === 'completed' && $op->completedAt) {
            $formatted['completed_at'] = $this->formatTimestamp($op->completedAt);
         }
      }
      // Add human-readable title for easier frontend display
      $formatted['title'] = $this->getOperationTitle($operation['type'], $formatted['data']);
      // Add user dismissal status
      $formatted['user_dismissed'] = !empty($operation['user_dismissed']);
      return $formatted;
   }
   /**
    * Convert MySQL datetime to ISO 8601 timestamp with proper timezone
    * Map backend state/outcome to frontend status
    * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent)
    * Frontend expects: queued, pending, processing, completed, failed, failed_permanent
    */
   protected function formatTimestamp(?string $mysql_datetime): ?string
   private function mapStateToStatus(string $state, ?string $outcome): string
   {
      if (empty($mysql_datetime)) {
         return null;
      if ($state === 'completed') {
         return match ($outcome) {
            'failed' => 'failed',
            'failed_permanent' => 'failed_permanent',
            default => 'completed',
         };
      }
      try {
         // Get WordPress timezone - dates are stored in this timezone
         $wp_timezone = wp_timezone();
         // Parse the datetime in WordPress timezone
         $date = new DateTime($mysql_datetime, $wp_timezone);
         // Convert to UTC for API consistency
         $date->setTimezone(new DateTimeZone('UTC'));
         // Return ISO 8601 format
         return $date->format('c');
      } catch (Exception $e) {
         return null;
      }
      return $state === 'scheduled' ? 'pending' : $state;
   }
   /**
    * Get human-readable operation title
    */
@@ -285,218 +342,39 @@
      return $base_title;
   }
    /**
     * Update operation status (dismiss or retry)
     *
     * @param WP_REST_Request $request
     * @return WP_REST_Response
     */
   public function handleAction(WP_REST_Request $request): WP_REST_Response
   private function processAction(string $action, array $operations, int $userId): array
   {
      $data = $request->get_json_params();
      $ids = $data['ids'] ?? [];
      $action = $data['action'] ?? '';
      $queue = JVB()->queue();
      $processed = 0;
      $processedIds = [];
      $user_id = (int)$data['user'];
      foreach ($operations as $op) {
         $result = match ($action) {
            'dismiss' => $queue->dismiss($op->id),
            'retry' => $queue->retry($op->id, $userId),
            'cancel' => $queue->cancel($op->id, $userId),
            default => false,
         };
      // Validate input
      if (empty($ids) || !is_array($ids)) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'Missing or invalid operation IDs'
         ], 400);
      }
      if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'Invalid action. Must be: dismiss, retry, or cancel'
         ], 400);
      }
      global $wpdb;
      $table = $wpdb->prefix . $this->table;
      // Verify operations exist and belong to user
      $placeholders = implode(',', array_fill(0, count($ids), '%s'));
      $valid_operations = $wpdb->get_results($wpdb->prepare(
         "SELECT id, status FROM $table WHERE id IN ($placeholders) AND user_id = %d",
         array_merge($ids, [$user_id])
      ));
      if (empty($valid_operations)) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'No valid operations found'
         ], 404);
      }
      $valid_ids = array_column($valid_operations, 'id');
      $placeholders = implode(',', array_fill(0, count($valid_ids), '%s'));
      // Process action using foreach approach as suggested
      $result = $this->processQueueAction($action, $valid_operations, $user_id);
      if ($result['success']) {
         // Update timestamp for this user's queue
         CacheManager::updateTimestamp("user_{$user_id}");
      }
      return new WP_REST_Response($result);
   }
   protected function processQueueAction(string $action, array $operations, int $user_id): array
   {
      global $wpdb;
      $table = $wpdb->prefix . $this->table;
      $processed_count = 0;
      $errors = [];
      $valid_ids = [];
      // Process each operation individually
      foreach ($operations as $operation) {
         $operation_id = $operation->id;
         $operation_status = $operation->status;
         try {
            $result = false;
            switch ($action) {
               case 'dismiss':
                  // Can dismiss any operation
                  $result = $wpdb->update(
                     $table,
                     ['user_dismissed' => 1],
                     ['id' => $operation_id, 'user_id' => $user_id]
                  );
                  break;
               case 'retry':
                  // Can only retry failed operations
                  if (!in_array($operation_status, ['failed', 'failed_permanent'])) {
                     $errors[] = "Operation {$operation_id} cannot be retried (status: {$operation_status})";
                     continue 2;
                  }
                  $result = $wpdb->update(
                     $table,
                     [
                        'status' => 'pending',
                        'error_message' => null,
                        'updated_at' => current_time('mysql'),
                        'retries' => $wpdb->get_var($wpdb->prepare(
                              "SELECT retries FROM $table WHERE id = %s",
                              $operation_id
                           )) + 1
                     ],
                     ['id' => $operation_id, 'user_id' => $user_id]
                  );
                  break;
               case 'cancel':
                  // Can only cancel pending/queued operations
                  if (!in_array($operation_status, ['pending', 'queued'])) {
                     $errors[] = "Operation {$operation_id} cannot be cancelled (status: {$operation_status})";
                     continue 2;
                  }
                  $result = $wpdb->delete(
                     $table,
                     ['id' => $operation_id, 'user_id' => $user_id]
                  );
                  break;
            }
            if ($result !== false) {
               $processed_count++;
               $valid_ids[] = $operation_id;
            } else {
               $errors[] = "Failed to {$action} operation {$operation_id}";
            }
         } catch (Exception $e) {
            $errors[] = "Error processing operation {$operation_id}: " . $e->getMessage();
         if ($result) {
            $processed++;
            $processedIds[] = $op->id;
         }
      }
      // Prepare response
      $message = $this->getActionMessage($action, $processed_count);
      if (!empty($errors)) {
         $message .= ". Errors: " . implode(', ', $errors);
      }
      $pastTense = ['dismiss' => 'dismissed', 'retry' => 'retried', 'cancel' => 'cancelled'];
      return [
         'success' => $processed_count > 0,
         'success' => $processed > 0,
         'action' => $action,
         'processed_count' => $processed_count,
         'processed_count' => $processed,
         'total_requested' => count($operations),
         'processed_ids' => $valid_ids,
         'errors' => $errors,
         'message' => $message
      ];
   }
   /**
    * Get user-friendly message for action results
    */
   protected function getActionMessage(string $action, int $count): string
   {
      if ($count === 0) {
         return "No operations were {$action}ed";
      }
      $past_tense = [
         'dismiss' => 'dismissed',
         'retry' => 'retried',
         'cancel' => 'cancelled'
      ];
      return "{$count} operation" . ($count === 1 ? '' : 's') . " {$past_tense[$action]}";
   }
   public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = get_current_user_id();
      global $wpdb;
      $table = $wpdb->prefix . $this->table;
      $failed_operations = $wpdb->get_results($wpdb->prepare("
            SELECT id, type, error_message, failed_items, retries, created_at, updated_at
            FROM $table
            WHERE user_id = %d
            AND status IN ('failed', 'completed_with_errors')
            AND (error_message IS NOT NULL OR failed_items IS NOT NULL)
            ORDER BY updated_at DESC
            LIMIT 20
        ", $user_id), ARRAY_A);
      foreach ($failed_operations as &$op) {
         $op['failed_items'] = json_decode($op['failed_items'] ?? '[]', true);
         $op['error_details'] = $this->parseErrorMessage($op['error_message']);
      }
      return new WP_REST_Response([
         'errors' => $failed_operations,
         'total' => count($failed_operations)
      ]);
   }
   protected function parseErrorMessage(string $error_message): array
   {
      if (str_contains($error_message, ' | ')) {
         $parts = explode(' | ', $error_message);
         return [
            'original_error' => $parts[0] ?? '',
            'cleanup_reason' => $parts[1] ?? ''
         ];
      }
      return [
         'original_error' => $error_message,
         'cleanup_reason' => null
         'processed_ids' => $processedIds,
         'message' => $processed > 0
            ? "{$processed} operation(s) {$pastTense[$action]}"
            : "No operations were {$pastTense[$action]}",
      ];
   }
}