Jake Vanderwerf
2 days ago 235ce5716edc2f7cbe80fdccf26eac7269587839
inc/rest/routes/QueueRoutes.php
@@ -1,25 +1,24 @@
<?php
namespace JVBase\rest\routes;
use Exception;
use JVBase\JVB;
use JVBase\managers\Cache;
use JVBase\rest\RestRouteManager;
use JVBase\managers\queue\Operation;
use JVBase\rest\Rest;
use JVBase\rest\Route;
use JVBase\rest\Response;
use WP_REST_Request;
use WP_REST_Response;
use DateTime;
use DateTimeZone;
if (!defined('ABSPATH')) {
   exit; // Exit if accessed directly
}
class QueueRoutes extends RestRouteManager
class QueueRoutes extends Rest
{
   public function __construct()
   {
      $this->cache_name = 'queue';
      $this->cache_ttl = 300;
      $this->cacheName = 'queue';
      $this->cacheTtl = 300;
      parent::__construct();
      if (JVB_TESTING) {
@@ -33,52 +32,50 @@
    */
   public function registerRoutes():void
   {
      register_rest_route($this->namespace, '/queue', [
         [
            'methods'   => 'GET',
            'callback'  => [$this, 'getQueue'],
            'permission_callback'   => [$this, 'checkPermission'],
            'args'  => [
               'status'    => [
                  'type'  => 'string',
                  'enum'  => ['all', 'queued', 'pending', 'processing', 'completed', 'failed', 'failed_permanent'],
                  'default' => 'all'
               ],
               'ids'    => [
                  'required'  => false,
                  'type'  => 'string',
                  'description' => 'Comma-separated list of operation IDs'
               ],
               'limit' => [
                  'type' => 'integer',
                  'default' => 50,
                  'minimum' => 1,
                  'maximum' => 100
               ]
            ]
         ],
         [
            'methods'   => 'POST',
            'callback'  => [$this, 'handleAction'],
            'permission_callback'   => [$this, 'checkPermission'],
            'args' => [
               'ids'    => [
                  'required'  => true,
                  'type'  => 'array',
                  'items' => [
                     'type' => 'string'
                  ],
                  'description' => 'Array of operation IDs (single or multiple)'
               ],
               'action'    => [
                  'required'  => true,
                  'type'  => 'string',
                  'enum'  => ['dismiss', 'retry', 'cancel'],
                  'description' => 'Action to perform on the operations'
               ]
            ]
         ]
      ]);
      // Main queue endpoint - GET and POST
      Route::for('queue')
         ->get([$this, 'getQueue'])
         ->args([
            'status' => 'string|enum:all,queued,pending,processing,completed,failed,failed_permanent|default:all',
            'ids' => 'string',
            'limit' => 'integer|default:50|min:1|max:100',
         ])
         ->auth('user')
         ->rateLimit()
         ->post([$this, 'handleAction'])
         ->args([
            'ids' => 'array|required',
            'action' => 'string|required|enum:dismiss,retry,cancel',
         ])
         ->auth('user')
         ->rateLimit()
         ->register();
      // Poll endpoint
      Route::for('queue/poll')
         ->get([$this, 'pollQueue'])
         ->args([
            'since' => 'string',
            'ids' => 'string',
         ])
         ->auth('user')
         ->rateLimit()
         ->register();
      // Errors endpoint
      Route::for('queue/errors')
         ->get([$this, 'getOperationErrors'])
         ->auth('user')
         ->rateLimit()
         ->register();
      // Single operation with dynamic ID
      Route::for(Route::pattern('queue/{id}'))
         ->get([$this, 'getOperation'])
         ->arg('id', 'string|required')
         ->auth('user')
         ->rateLimit()
         ->register();
   }
   /**
@@ -89,128 +86,260 @@
    */
   public function getQueue(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = $request->get_param('user');
      $status = sanitize_text_field($request->get_param('status'));
      $ids = $request->get_param('ids');
      $limit = intval($request->get_param('limit'));
      // Use base class user-specific header checking
      $key = $this->cache->generateKey(['user'=> $user_id, 'status'=> $status, 'ids'=> $ids, 'limit'=> $limit]);
      $cache_check = $this->checkHeaders($request, $key);
      if ($cache_check) {
         return $cache_check;
      $params = $request->get_params();
      $user_id = absint($params['user']);
      $this->cache = Cache::for('queue')->user();
      $status = sanitize_text_field($params['status']);
      $ids = !empty($params['ids'])
         ? array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])))
         : [];
      $limit = absint($params['limit']);
      $cacheKey = $this->cache->generateKey(compact('user_id', 'status', 'ids', 'limit'));
      if ($cached = $this->checkHeaders($request, $cacheKey)) {
         return $cached;
      }
      // Build filters for getUserOperations
      $data = $this->cache->tag("user:{$user_id}")->remember($cacheKey, function() use ($user_id, $params) {
         $filters = $this->buildFilters($params);
         $operations = JVB()->queue()->getUserOperations($user_id, $filters);
         return [
            'items' => array_map([$this, 'formatOperation'], $operations),
            'total' => count($operations),
            'queue_stats' => $this->getQueueStats($user_id),
            'server_time' => date('c'),
         ];
      });
      $response = Response::success($data);
      return $this->addCacheHeaders($response);
   }
   private function buildFilters(array $params): array
   {
      $filters = [
         'not_dismissed' => true,
         'limit' => $limit ?: 50,
         'limit' => min(absint($params['limit'] ?? 50), 100),
      ];
      if ($status && $status !== 'all') {
         $filters['state'] = $status;
      if (!empty($params['status']) && $params['status'] !== 'all') {
         $filters['state'] = $params['status'];
      }
      if (!empty($params['ids'])) {
         $filters['ids'] = array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])));
      }
      return $filters;
   }
   /**
    * Update operation status (dismiss or retry)
    *
    * @param WP_REST_Request $request
    * @return WP_REST_Response
    */
   public function handleAction(WP_REST_Request $request): WP_REST_Response
   {
      $data = $request->get_params();
      $ids = is_array($data['ids'])
         ? array_map('trim', array_map('sanitize_text_field', $data['ids']))
         : array_map('trim', array_map('sanitize_text_field', explode(',', $data['ids'])));
      $action = sanitize_text_field($data['action'] ?? '');
      $user_id = absint($data['user']);
      $this->cache = Cache::for($user_id.'_queue');
      // Validate input
      if (empty($ids)) {
         return Response::validationError(['ids' => 'Missing or invalid operation IDs']);
      }
      if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
         return Response::validationError(['action' => 'Invalid action. Must be: dismiss, retry, or cancel']);
      }
      // Get operations via Queue - verifies ownership
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'ids' => $ids,
         'limit' => count($ids),
      ]);
      if (empty($operations)) {
         return Response::notFound('No valid operations found');
      }
      $result = $this->processAction($action, $operations, $user_id);
      if ($result['success']) {
         Cache::touch($user_id);
      }
      return Response::success($result);
   }
   public function pollQueue(WP_REST_Request $request): WP_REST_Response
   {
      $userId = $request->get_param('user');
      $this->cache = Cache::for($userId.'_queue');
      $since = $request->get_param('since');
      $ids = $request->get_param('ids');
      $filters = ['not_dismissed' => true, 'limit' => 50];
      if (!empty($ids)) {
         $filters['ids'] = array_map('trim', explode(',', $ids));
      }
      // Get operations via Queue
      $operations = JVB()->queue()->getUserOperations($user_id, $filters);
      $operations = JVB()->queue()->getUserOperations($userId, $filters);
      // Format operations for API response
      $formatted = array_map([$this, 'formatOperationFromObject'], $operations);
      if ($since) {
         $sinceTime = strtotime($since);
         $operations = array_filter($operations, function($op) use ($sinceTime) {
            return strtotime($op->completedAt ?? $op->startedAt ?? $op->scheduledAt) > $sinceTime;
         });
      }
      $response = new WP_REST_Response([
         'items' => $formatted,
         'total' => count($formatted),
         'timestamp' => date('c'),
         'has_more' => count($formatted) === ($limit ?: 50),
         'queue_stats' => $this->getQueueStats($user_id),
         'server_time' => date('c')
      $items = array_map(fn($op) => [
         'id' => $op->id,
         'status' => $this->mapStateToStatus($op->state, $op->outcome),
         'progress_percentage'   => $this->formatPercentage($op),
         'progress_count' => $op->processedItems,
         'count' => $op->totalItems,
         'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
         'error_message' => $op->errorMessage,
      ], $operations);
      return Response::success([
         'items' => array_values($items),
         'server_time' => date('c'),
         'has_active' => count(array_filter($items, fn($i) => in_array($i['status'], ['pending', 'processing']))) > 0,
      ]);
   }
   public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = absint($request->get_param('user'));
      $this->cache = Cache::for($user_id.'_queue');
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'state' => 'completed',
         'outcome' => ['failed', 'failed_permanent', 'partial'],
         'has_errors' => true,
         'order_by' => 'updated_at DESC',
         'limit' => 20,
      ]);
      return $this->addCacheHeaders($response);
      $formatted = array_map(fn($op) => [
         'id' => $op->id,
         'type' => $op->type,
         'error_message' => $op->errorMessage,
         'failed_items' => $op->failedItems ?? [],
         'retries' => $op->retries,
         'created_at' => $op->scheduledAt,
         'updated_at' => $op->completedAt,
      ], $operations);
      return Response::collection($formatted);
   }
   public function getOperation(WP_REST_Request $request): WP_REST_Response
   {
      $id = $request->get_param('id');
      $userId = $request->get_param('user');
      $this->cache = Cache::for($userId.'_queue');
      $op = JVB()->queue()->get($id);
      if (!$op || $op->userId !== $userId) {
         return Response::notFound('Operation not found');
      }
      return Response::item($this->formatOperation($op, true), 'operation');
   }
   /**
    * Get queue statistics for user
    */
   protected function getQueueStats(int $user_id): array
   private function getQueueStats(int $userId): array
   {
      $stats = JVB()->queue()->getUserStats($user_id);
      // Add frontend-only statuses that don't exist in backend
      return array_merge([
         'queued' => 0,
         'localProcessing' => 0,
         'uploading' => 0,
      ], $stats);
      return array_merge(
         ['queued' => 0, 'localProcessing' => 0, 'uploading' => 0],
         JVB()->queue()->getUserStats($userId)
      );
   }
   /**
    * Map backend state/outcome to frontend status
    * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent)
    * Frontend expects: queued, pending, processing, completed, failed, failed_permanent
    */
   protected function mapStateToStatus(string $state, ?string $outcome): string
   {
      // If completed, check outcome for failure states
      if ($state === 'completed') {
         return match($outcome) {
            'failed' => 'failed',
            'failed_permanent' => 'failed_permanent',
            'partial' => 'completed', // or could be 'partial' if JS supports it
            default => 'completed'
         };
      }
      // Map other states directly
      return match($state) {
         'scheduled' => 'pending',
         default => $state
      };
   }
   /**
    * Format Operation object for API response
    */
   protected function formatOperationFromObject(\JVBase\managers\queue\Operation $op): array
   private function formatOperation(Operation $op, bool $full = true): array
   {
      $formatted = [
         'id' => $op->id,
         'type' => $op->type,
         'status' => $this->mapStateToStatus($op->state, $op->outcome),
         'progress_count' => $op->processedItems,
         'progress_percentage'   => $this->formatPercentage($op),
         'count' => $op->totalItems,
         'retries' => $op->retries,
         'data' => $op->requestData,
         'result' => $op->result ?? [],
         'title' => $this->getOperationTitle($op->type, $op->requestData),
         'created_at' => $this->formatTimestamp($op->scheduledAt),
         'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
      ];
      $formatted['created_at'] = $this->formatTimestamp($op->scheduledAt);
      $formatted['updated_at'] = $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt);
      if ($op->state === 'completed' && $op->completedAt) {
         $formatted['completed_at'] = $this->formatTimestamp($op->completedAt);
      if (!empty($op->result['merged_into'])) {
         $formatted['merged_into'] = $op->result['merged_into'];
      }
      if ($op->errorMessage) {
         $formatted['error_message'] = $op->errorMessage;
      }
      if ($formatted['count'] > 0) {
         $formatted['progress_percentage'] = round(
            ($formatted['progress_count'] / $formatted['count']) * 100
         );
      }
      if ($full) {
         $formatted += [
            'data' => $op->requestData,
            'result' => $op->result ?? [],
            'retries' => $op->retries,
            'user_dismissed' => $op->userDismissed,
         ];
      $formatted['title'] = $this->getOperationTitle($op->type, $op->requestData);
      $formatted['user_dismissed'] = $op->userDismissed;
         if ($op->state === 'completed' && $op->completedAt) {
            $formatted['completed_at'] = $this->formatTimestamp($op->completedAt);
         }
      }
      return $formatted;
   }
   protected function formatPercentage(Operation $op):int
   {
      if ($op->totalItems > 0){
         return round(($op->processedItems / $op->totalItems) * 100);
      } else {
         return match($op->state) {
            'pending','scheduled=' => 0,
            'processing' => 45,
            'completed' => 100
         };
      }
   }
   /**
    * Map backend state/outcome to frontend status
    * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent)
    * Frontend expects: queued, pending, processing, completed, failed, failed_permanent
    */
   private function mapStateToStatus(string $state, ?string $outcome): string
   {
      if ($state === 'completed') {
         return match ($outcome) {
            'failed' => 'failed',
            'failed_permanent' => 'failed_permanent',
            default => 'completed',
         };
      }
      return $state === 'scheduled' ? 'pending' : $state;
   }
   /**
    * Get human-readable operation title
    */
@@ -242,164 +371,39 @@
      return $base_title;
   }
   /**
    * Update operation status (dismiss or retry)
    *
    * @param WP_REST_Request $request
    * @return WP_REST_Response
    */
   public function handleAction(WP_REST_Request $request): WP_REST_Response
   {
      $data = $request->get_json_params();
      $ids = $data['ids'] ?? [];
      $action = $data['action'] ?? '';
      $user_id = (int)$data['user'];
      // Validate input
      if (empty($ids) || !is_array($ids)) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'Missing or invalid operation IDs'
         ], 400);
      }
      if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'Invalid action. Must be: dismiss, retry, or cancel'
         ], 400);
      }
      // Get operations via Queue - verifies ownership
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'ids' => $ids,
         'limit' => count($ids),
      ]);
      if (empty($operations)) {
         return new WP_REST_Response([
            'success' => false,
            'message' => 'No valid operations found'
         ], 404);
      }
      $result = $this->processQueueAction($action, $operations, $user_id);
      if ($result['success']) {
         Cache::touch($user_id);
      }
      return new WP_REST_Response($result);
   }
   protected function processQueueAction(string $action, array $operations, int $user_id): array
   private function processAction(string $action, array $operations, int $userId): array
   {
      $queue = JVB()->queue();
      $processed_count = 0;
      $errors = [];
      $valid_ids = [];
      $processed = 0;
      $processedIds = [];
      foreach ($operations as $op) {
         try {
            $result = match($action) {
               'dismiss' => $queue->dismiss($op->id),
               'retry'   => $queue->retry($op->id, $user_id),
               'cancel'  => $queue->cancel($op->id, $user_id),
               default   => false,
            };
         $result = match ($action) {
            'dismiss' => $queue->dismiss($op->id),
            'retry' => $queue->retry($op->id, $userId),
            'cancel' => $queue->cancel($op->id, $userId),
            default => false,
         };
            if ($result) {
               $processed_count++;
               $valid_ids[] = $op->id;
            } else {
               // Only add errors for meaningful failures
               if ($action === 'retry' && ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent']))) {
                  $errors[] = "Operation {$op->id} cannot be retried (state: {$op->state}, outcome: {$op->outcome})";
               }
               // Silently skip cancel failures (can't cancel processing/completed)
            }
         } catch (Exception $e) {
            $errors[] = "Error processing operation {$op->id}: " . $e->getMessage();
         if ($result) {
            $processed++;
            $processedIds[] = $op->id;
         }
      }
      $message = $this->getActionMessage($action, $processed_count);
      if (!empty($errors)) {
         $message .= ". Errors: " . implode(', ', $errors);
      }
      $pastTense = ['dismiss' => 'dismissed', 'retry' => 'retried', 'cancel' => 'cancelled'];
      return [
         'success' => $processed_count > 0,
         'success' => $processed > 0,
         'action' => $action,
         'processed_count' => $processed_count,
         'processed_count' => $processed,
         'total_requested' => count($operations),
         'processed_ids' => $valid_ids,
         'errors' => $errors,
         'message' => $message
      ];
   }
   /**
    * Get user-friendly message for action results
    */
   protected function getActionMessage(string $action, int $count): string
   {
      if ($count === 0) {
         return "No operations were {$action}ed";
      }
      $past_tense = [
         'dismiss' => 'dismissed',
         'retry' => 'retried',
         'cancel' => 'cancelled'
      ];
      return "{$count} operation" . ($count === 1 ? '' : 's') . " {$past_tense[$action]}";
   }
   public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
   {
      $user_id = get_current_user_id();
      $operations = JVB()->queue()->getUserOperations($user_id, [
         'state' => 'completed',
         'outcome' => ['failed', 'failed_permanent', 'partial'],
         'has_errors' => true,
         'order_by' => 'updated_at DESC',
         'limit' => 20,
      ]);
      $formatted = array_map(function($op) {
         return [
            'id' => $op->id,
            'type' => $op->type,
            'error_message' => $op->errorMessage,
            'failed_items' => $op->failedItems ?? [],
            'retries' => $op->retries,
            'created_at' => $op->scheduledAt,
            'updated_at' => $op->completedAt,
            'error_details' => $this->parseErrorMessage($op->errorMessage ?? ''),
         ];
      }, $operations);
      return new WP_REST_Response([
         'errors' => $formatted,
         'total' => count($formatted)
      ]);
   }
   protected function parseErrorMessage(string $error_message): array
   {
      if (str_contains($error_message, ' | ')) {
         $parts = explode(' | ', $error_message);
         return [
            'original_error' => $parts[0] ?? '',
            'cleanup_reason' => $parts[1] ?? ''
         ];
      }
      return [
         'original_error' => $error_message,
         'cleanup_reason' => null
         'processed_ids' => $processedIds,
         'message' => $processed > 0
            ? "{$processed} operation(s) {$pastTense[$action]}"
            : "No operations were {$pastTense[$action]}",
      ];
   }
}