cacheName = 'queue'; $this->cacheTtl = 300; parent::__construct(); if (JVB_TESTING) { $this->cache->flush(); } } /** * Registers queue routes * @return void */ public function registerRoutes():void { // Main queue endpoint - GET and POST Route::for('queue') ->get([$this, 'getQueue']) ->args([ 'status' => 'string|enum:all,queued,pending,processing,completed,failed,failed_permanent|default:all', 'ids' => 'string', 'limit' => 'integer|default:50|min:1|max:100', ]) ->auth('user') ->rateLimit(30) ->post([$this, 'handleAction']) ->args([ 'ids' => 'array|required', 'action' => 'string|required|enum:dismiss,retry,cancel', ]) ->auth('user') ->rateLimit(30) ->register(); // Poll endpoint Route::for('queue/poll') ->get([$this, 'pollQueue']) ->args([ 'since' => 'string', 'ids' => 'string', ]) ->auth('user') ->rateLimit(15) ->register(); // Errors endpoint Route::for('queue/errors') ->get([$this, 'getOperationErrors']) ->auth('user') ->rateLimit(15) ->register(); // Single operation with dynamic ID Route::for(Route::pattern('queue/{id}')) ->get([$this, 'getOperation']) ->arg('id', 'string|required') ->auth('user') ->rateLimit(15) ->register(); } /** * Get queue operations with optional filtering * * @param WP_REST_Request $request * @return WP_REST_Response */ public function getQueue(WP_REST_Request $request): WP_REST_Response { $params = $request->get_params(); $user_id = absint($params['user']); $this->cache = Cache::for($user_id.'_queue'); $status = sanitize_text_field($params['status']); $ids = !empty($params['ids']) ? array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids']))) : []; $limit = absint($params['limit']); $cacheKey = $this->cache->generateKey(compact('user_id', 'status', 'ids', 'limit')); if ($cached = $this->checkHeaders($request, $cacheKey)) { return $cached; } $data = $this->cache->tag("user:{$user_id}")->remember($cacheKey, function() use ($user_id, $params) { $filters = $this->buildFilters($params); $operations = JVB()->queue()->getUserOperations($user_id, $filters); return [ 'items' => array_map([$this, 'formatOperation'], $operations), 'total' => count($operations), 'queue_stats' => $this->getQueueStats($user_id), 'server_time' => date('c'), ]; }); $response = Response::success($data); return $this->addCacheHeaders($response); } private function buildFilters(array $params): array { $filters = [ 'not_dismissed' => true, 'limit' => min(absint($params['limit'] ?? 50), 100), ]; if (!empty($params['status']) && $params['status'] !== 'all') { $filters['state'] = $params['status']; } if (!empty($params['ids'])) { $filters['ids'] = array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids']))); } return $filters; } /** * Update operation status (dismiss or retry) * * @param WP_REST_Request $request * @return WP_REST_Response */ public function handleAction(WP_REST_Request $request): WP_REST_Response { $data = $request->get_params(); $ids = is_array($data['ids']) ? array_map('trim', array_map('sanitize_text_field', $data['ids'])) : array_map('trim', array_map('sanitize_text_field', explode(',', $data['ids']))); $action = sanitize_text_field($data['action'] ?? ''); $user_id = absint($data['user']); $this->cache = Cache::for($user_id.'_queue'); // Validate input if (empty($ids)) { return Response::validationError(['ids' => 'Missing or invalid operation IDs']); } if (!in_array($action, ['dismiss', 'retry', 'cancel'])) { return Response::validationError(['action' => 'Invalid action. Must be: dismiss, retry, or cancel']); } // Get operations via Queue - verifies ownership $operations = JVB()->queue()->getUserOperations($user_id, [ 'ids' => $ids, 'limit' => count($ids), ]); if (empty($operations)) { return Response::notFound('No valid operations found'); } $result = $this->processAction($action, $operations, $user_id); if ($result['success']) { Cache::touch($user_id); } return Response::success($result); } public function pollQueue(WP_REST_Request $request): WP_REST_Response { $userId = $request->get_param('user'); $this->cache = Cache::for($userId.'_queue'); $since = $request->get_param('since'); $ids = $request->get_param('ids'); $filters = ['not_dismissed' => true, 'limit' => 50]; if (!empty($ids)) { $filters['ids'] = array_map('trim', explode(',', $ids)); } $operations = JVB()->queue()->getUserOperations($userId, $filters); if ($since) { $sinceTime = strtotime($since); $operations = array_filter($operations, function($op) use ($sinceTime) { return strtotime($op->completedAt ?? $op->startedAt ?? $op->scheduledAt) > $sinceTime; }); } $items = array_map(fn($op) => [ 'id' => $op->id, 'status' => $this->mapStateToStatus($op->state, $op->outcome), 'progress_percentage' => $this->formatPercentage($op), 'progress_count' => $op->processedItems, 'count' => $op->totalItems, 'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt), 'error_message' => $op->errorMessage, ], $operations); return Response::success([ 'items' => array_values($items), 'server_time' => date('c'), 'has_active' => count(array_filter($items, fn($i) => in_array($i['status'], ['pending', 'processing']))) > 0, ]); } public function getOperationErrors(WP_REST_Request $request): WP_REST_Response { $user_id = absint($request->get_param('user')); $this->cache = Cache::for($user_id.'_queue'); $operations = JVB()->queue()->getUserOperations($user_id, [ 'state' => 'completed', 'outcome' => ['failed', 'failed_permanent', 'partial'], 'has_errors' => true, 'order_by' => 'updated_at DESC', 'limit' => 20, ]); $formatted = array_map(fn($op) => [ 'id' => $op->id, 'type' => $op->type, 'error_message' => $op->errorMessage, 'failed_items' => $op->failedItems ?? [], 'retries' => $op->retries, 'created_at' => $op->scheduledAt, 'updated_at' => $op->completedAt, ], $operations); return Response::collection($formatted); } public function getOperation(WP_REST_Request $request): WP_REST_Response { $id = $request->get_param('id'); $userId = $request->get_param('user'); $this->cache = Cache::for($userId.'_queue'); $op = JVB()->queue()->get($id); if (!$op || $op->userId !== $userId) { return Response::notFound('Operation not found'); } return Response::item($this->formatOperation($op, true), 'operation'); } /** * Get queue statistics for user */ private function getQueueStats(int $userId): array { return array_merge( ['queued' => 0, 'localProcessing' => 0, 'uploading' => 0], JVB()->queue()->getUserStats($userId) ); } private function formatOperation(Operation $op, bool $full = true): array { $formatted = [ 'id' => $op->id, 'type' => $op->type, 'status' => $this->mapStateToStatus($op->state, $op->outcome), 'progress_count' => $op->processedItems, 'progress_percentage' => $this->formatPercentage($op), 'count' => $op->totalItems, 'title' => $this->getOperationTitle($op->type, $op->requestData), 'created_at' => $this->formatTimestamp($op->scheduledAt), 'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt), ]; if (!empty($op->result['merged_into'])) { $formatted['merged_into'] = $op->result['merged_into']; } if ($op->errorMessage) { $formatted['error_message'] = $op->errorMessage; } if ($full) { $formatted += [ 'data' => $op->requestData, 'result' => $op->result ?? [], 'retries' => $op->retries, 'user_dismissed' => $op->userDismissed, ]; if ($op->state === 'completed' && $op->completedAt) { $formatted['completed_at'] = $this->formatTimestamp($op->completedAt); } } return $formatted; } protected function formatPercentage(Operation $op):int { if ($op->totalItems > 0){ return round(($op->processedItems / $op->totalItems) * 100); } else { return match($op->state) { 'pending','scheduled=' => 0, 'processing' => 45, 'completed' => 100 }; } } /** * Map backend state/outcome to frontend status * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent) * Frontend expects: queued, pending, processing, completed, failed, failed_permanent */ private function mapStateToStatus(string $state, ?string $outcome): string { if ($state === 'completed') { return match ($outcome) { 'failed' => 'failed', 'failed_permanent' => 'failed_permanent', default => 'completed', }; } return $state === 'scheduled' ? 'pending' : $state; } /** * Get human-readable operation title */ protected function getOperationTitle(string $type, array $data): string { $titles = [ 'attach_upload_to_content' => 'Attaching Upload', 'content_update' => 'Updating Content', 'user_settings' => 'Updating Settings', 'bulk_operation' => 'Bulk Operation', 'image_processing' => 'Processing Images', 'notification_send' => 'Sending Notification', 'cache_clear' => 'Clearing Cache', 'data_export' => 'Exporting Data', 'data_import' => 'Importing Data' ]; $base_title = $titles[$type] ?? ucwords(str_replace('_', ' ', $type)); if ($type === 'attach_upload_to_content' && $data['mode'] === 'selection') { $base_title .= '; Waiting on your Groupings to proceed...'; } // Add context if available if (!empty($data['content'])) { $content_type = ucfirst($data['content']); $base_title = str_replace('Content', $content_type, $base_title); } return $base_title; } private function processAction(string $action, array $operations, int $userId): array { $queue = JVB()->queue(); $processed = 0; $processedIds = []; foreach ($operations as $op) { $result = match ($action) { 'dismiss' => $queue->dismiss($op->id), 'retry' => $queue->retry($op->id, $userId), 'cancel' => $queue->cancel($op->id, $userId), default => false, }; if ($result) { $processed++; $processedIds[] = $op->id; } } $pastTense = ['dismiss' => 'dismissed', 'retry' => 'retried', 'cancel' => 'cancelled']; return [ 'success' => $processed > 0, 'action' => $action, 'processed_count' => $processed, 'total_requested' => count($operations), 'processed_ids' => $processedIds, 'message' => $processed > 0 ? "{$processed} operation(s) {$pastTense[$action]}" : "No operations were {$pastTense[$action]}", ]; } }