| | |
| | | |
| | | class QueueRoutes extends RestRouteManager |
| | | { |
| | | |
| | | protected string $table = BASE.'_operation_queue'; |
| | | protected string $metricsTable = BASE.'stats__operation_queue'; |
| | | |
| | | public function __construct() |
| | | { |
| | | $this->cache_name = 'queue'; |
| | | $this->cache_ttl = 300; |
| | | parent::__construct(); |
| | | |
| | | $this->cache->clear(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public function getQueue(WP_REST_Request $request): WP_REST_Response |
| | | { |
| | | $user_id = get_current_user_id(); |
| | | $status = $request->get_param('status'); |
| | | $user_id = $request->get_param('user'); |
| | | $status = sanitize_text_field($request->get_param('status')); |
| | | $ids = $request->get_param('ids'); |
| | | $limit = intval($request->get_param('limit')); |
| | | |
| | | // Use base class user-specific header checking |
| | | // This checks both 'queue' and 'user_{$user_id}' timestamps |
| | | $cache_check = $this->checkUserHeaders($request, $user_id, 'queue'); |
| | | if ($cache_check) { |
| | | return $cache_check; // Returns 304 Not Modified |
| | | return $cache_check; |
| | | } |
| | | |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . $this->table; |
| | | // Build filters for getUserOperations |
| | | $filters = [ |
| | | 'not_dismissed' => true, |
| | | 'limit' => $limit ?: 50, |
| | | ]; |
| | | |
| | | $sql = "SELECT * FROM $table WHERE user_id = %d AND COALESCE(user_dismissed, 0) = 0"; |
| | | $params = [$user_id]; |
| | | |
| | | if ($status !== 'all') { |
| | | $sql .= " AND status = %s"; |
| | | $params[] = $status; |
| | | if ($status && $status !== 'all') { |
| | | $filters['state'] = $status; |
| | | } |
| | | |
| | | if (!empty($ids)) { |
| | | $id_array = array_map('trim', explode(',', $ids)); |
| | | if (!empty($id_array)) { |
| | | $placeholders = implode(',', array_fill(0, count($id_array), '%s')); |
| | | $sql .= " AND id IN ($placeholders)"; |
| | | $params = array_merge($params, $id_array); |
| | | } |
| | | $filters['ids'] = array_map('trim', explode(',', $ids)); |
| | | } |
| | | |
| | | $sql .= " ORDER BY FIELD(status, 'processing', 'pending', 'failed', 'completed'), created_at DESC"; |
| | | // Get operations via Queue |
| | | $operations = JVB()->queue()->getUserOperations($user_id, $filters); |
| | | |
| | | if ($limit > 0) { |
| | | $sql .= " LIMIT %d"; |
| | | $params[] = $limit; |
| | | } |
| | | |
| | | $operations = $wpdb->get_results($wpdb->prepare($sql, $params), ARRAY_A); |
| | | |
| | | // Format operations |
| | | foreach ($operations as &$op) { |
| | | $op = $this->formatOperation($op); |
| | | } |
| | | // Format operations for API response |
| | | $formatted = array_map([$this, 'formatOperationFromObject'], $operations); |
| | | |
| | | $response = new WP_REST_Response([ |
| | | 'items' => $operations, |
| | | 'total' => count($operations), |
| | | 'items' => $formatted, |
| | | 'total' => count($formatted), |
| | | 'timestamp' => date('c'), |
| | | 'has_more' => count($operations) === $limit, |
| | | 'has_more' => count($formatted) === ($limit ?: 50), |
| | | 'queue_stats' => $this->getQueueStats($user_id), |
| | | 'server_time' => date('c') |
| | | ]); |
| | | |
| | | // Add cache headers (ETag, Last-Modified) |
| | | return $this->addCacheHeaders($response); |
| | | } |
| | | |
| | |
| | | */ |
| | | protected function getQueueStats(int $user_id): array |
| | | { |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . $this->table; |
| | | $stats = JVB()->queue()->getUserStats($user_id); |
| | | |
| | | $stats = $wpdb->get_results($wpdb->prepare( |
| | | "SELECT status, COUNT(*) as count FROM $table WHERE user_id = %d GROUP BY status", |
| | | $user_id |
| | | ), OBJECT_K); |
| | | |
| | | $formatted_stats = [ |
| | | // Add frontend-only statuses that don't exist in backend |
| | | return array_merge([ |
| | | 'queued' => 0, |
| | | 'localProcessing' => 0, |
| | | 'uploading' => 0, |
| | | 'pending' => 0, |
| | | 'processing' => 0, |
| | | 'completed' => 0, |
| | | 'failed' => 0, |
| | | 'failed_permanent' => 0 |
| | | ]; |
| | | |
| | | foreach ($stats as $status => $data) { |
| | | if (isset($formatted_stats[$status])) { |
| | | $formatted_stats[$status] = intval($data->count); |
| | | } |
| | | ], $stats); |
| | | } |
| | | |
| | | return $formatted_stats; |
| | | } |
| | | /** |
| | | * Format operation for API response |
| | | * Map backend state/outcome to frontend status |
| | | * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent) |
| | | * Frontend expects: queued, pending, processing, completed, failed, failed_permanent |
| | | */ |
| | | protected function formatOperation(array $operation): array |
| | | protected function mapStateToStatus(string $state, ?string $outcome): string |
| | | { |
| | | // If completed, check outcome for failure states |
| | | if ($state === 'completed') { |
| | | return match($outcome) { |
| | | 'failed' => 'failed', |
| | | 'failed_permanent' => 'failed_permanent', |
| | | 'partial' => 'completed', // or could be 'partial' if JS supports it |
| | | default => 'completed' |
| | | }; |
| | | } |
| | | |
| | | // Map other states directly |
| | | return match($state) { |
| | | 'scheduled' => 'pending', |
| | | default => $state |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Format Operation object for API response |
| | | */ |
| | | protected function formatOperationFromObject(\JVBase\managers\queue\Operation $op): array |
| | | { |
| | | $formatted = [ |
| | | 'id' => $operation['id'], |
| | | 'type' => $operation['type'], |
| | | 'status' => $operation['status'], |
| | | 'progress_count' => intval($operation['progress_count'] ?? 0), |
| | | 'count' => intval($operation['count'] ?? 1), |
| | | 'retries' => intval($operation['retries'] ?? 0), |
| | | 'data' => json_decode($operation['request_data'] ?? '{}', true), |
| | | 'result' => json_decode($operation['result']??'{}', true) |
| | | 'id' => $op->id, |
| | | 'type' => $op->type, |
| | | 'status' => $this->mapStateToStatus($op->state, $op->outcome), |
| | | 'progress_count' => $op->processedItems, |
| | | 'count' => $op->totalItems, |
| | | 'retries' => $op->retries, |
| | | 'data' => $op->requestData, |
| | | 'result' => $op->result ?? [], |
| | | ]; |
| | | |
| | | // Convert timestamps to ISO 8601 format with proper timezone |
| | | $formatted['created_at'] = $this->formatTimestamp($operation['created_at']); |
| | | $formatted['updated_at'] = $this->formatTimestamp($operation['updated_at']); |
| | | $formatted['created_at'] = $this->formatTimestamp($op->scheduledAt); |
| | | $formatted['updated_at'] = $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt); |
| | | |
| | | // Add completed_at if status is completed |
| | | if ($operation['status'] === 'completed' && !empty($operation['completed_at'])) { |
| | | $formatted['completed_at'] = $this->formatTimestamp($operation['completed_at']); |
| | | if ($op->state === 'completed' && $op->completedAt) { |
| | | $formatted['completed_at'] = $this->formatTimestamp($op->completedAt); |
| | | } |
| | | |
| | | // Add error message if failed |
| | | if (!empty($operation['error_message'])) { |
| | | $formatted['error_message'] = $operation['error_message']; |
| | | if ($op->errorMessage) { |
| | | $formatted['error_message'] = $op->errorMessage; |
| | | } |
| | | |
| | | // Simple progress percentage calculation |
| | | if ($formatted['count'] > 0) { |
| | | $formatted['progress_percentage'] = round( |
| | | ($formatted['progress_count'] / $formatted['count']) * 100 |
| | | ); |
| | | } |
| | | |
| | | // Add human-readable title for easier frontend display |
| | | $formatted['title'] = $this->getOperationTitle($operation['type'], $formatted['data']); |
| | | |
| | | |
| | | // Add user dismissal status |
| | | $formatted['user_dismissed'] = !empty($operation['user_dismissed']); |
| | | $formatted['title'] = $this->getOperationTitle($op->type, $op->requestData); |
| | | $formatted['user_dismissed'] = $op->userDismissed; |
| | | |
| | | return $formatted; |
| | | } |
| | |
| | | $data = $request->get_json_params(); |
| | | $ids = $data['ids'] ?? []; |
| | | $action = $data['action'] ?? ''; |
| | | |
| | | $user_id = (int)$data['user']; |
| | | |
| | | |
| | | // Validate input |
| | | if (empty($ids) || !is_array($ids)) { |
| | | return new WP_REST_Response([ |
| | |
| | | ], 400); |
| | | } |
| | | |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . $this->table; |
| | | // Get operations via Queue - verifies ownership |
| | | $operations = JVB()->queue()->getUserOperations($user_id, [ |
| | | 'ids' => $ids, |
| | | 'limit' => count($ids), |
| | | ]); |
| | | |
| | | // Verify operations exist and belong to user |
| | | $placeholders = implode(',', array_fill(0, count($ids), '%s')); |
| | | $valid_operations = $wpdb->get_results($wpdb->prepare( |
| | | "SELECT id, status FROM $table WHERE id IN ($placeholders) AND user_id = %d", |
| | | array_merge($ids, [$user_id]) |
| | | )); |
| | | |
| | | if (empty($valid_operations)) { |
| | | if (empty($operations)) { |
| | | return new WP_REST_Response([ |
| | | 'success' => false, |
| | | 'message' => 'No valid operations found' |
| | | ], 404); |
| | | } |
| | | |
| | | $valid_ids = array_column($valid_operations, 'id'); |
| | | $placeholders = implode(',', array_fill(0, count($valid_ids), '%s')); |
| | | |
| | | // Process action using foreach approach as suggested |
| | | $result = $this->processQueueAction($action, $valid_operations, $user_id); |
| | | $result = $this->processQueueAction($action, $operations, $user_id); |
| | | |
| | | if ($result['success']) { |
| | | // Update timestamp for this user's queue |
| | | CacheManager::updateTimestamp("user_{$user_id}"); |
| | | } |
| | | |
| | |
| | | |
| | | protected function processQueueAction(string $action, array $operations, int $user_id): array |
| | | { |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . $this->table; |
| | | |
| | | $queue = JVB()->queue(); |
| | | $processed_count = 0; |
| | | $errors = []; |
| | | $valid_ids = []; |
| | | |
| | | // Process each operation individually |
| | | foreach ($operations as $operation) { |
| | | $operation_id = $operation->id; |
| | | $operation_status = $operation->status; |
| | | |
| | | foreach ($operations as $op) { |
| | | try { |
| | | $result = false; |
| | | $result = match($action) { |
| | | 'dismiss' => $queue->dismiss($op->id), |
| | | 'retry' => $queue->retry($op->id, $user_id), |
| | | 'cancel' => $queue->cancel($op->id, $user_id), |
| | | default => false, |
| | | }; |
| | | |
| | | switch ($action) { |
| | | case 'dismiss': |
| | | // Can dismiss any operation |
| | | $result = $wpdb->update( |
| | | $table, |
| | | ['user_dismissed' => 1], |
| | | ['id' => $operation_id, 'user_id' => $user_id] |
| | | ); |
| | | break; |
| | | |
| | | case 'retry': |
| | | // Can only retry failed operations |
| | | if (!in_array($operation_status, ['failed', 'failed_permanent'])) { |
| | | $errors[] = "Operation {$operation_id} cannot be retried (status: {$operation_status})"; |
| | | continue 2; |
| | | } |
| | | |
| | | $result = $wpdb->update( |
| | | $table, |
| | | [ |
| | | 'status' => 'pending', |
| | | 'error_message' => null, |
| | | 'updated_at' => current_time('mysql'), |
| | | 'retries' => $wpdb->get_var($wpdb->prepare( |
| | | "SELECT retries FROM $table WHERE id = %s", |
| | | $operation_id |
| | | )) + 1 |
| | | ], |
| | | ['id' => $operation_id, 'user_id' => $user_id] |
| | | ); |
| | | break; |
| | | |
| | | case 'cancel': |
| | | // Can only cancel pending/queued operations |
| | | if (!in_array($operation_status, ['pending', 'queued'])) { |
| | | // $errors[] = "Operation {$operation_id} cannot be cancelled (status: {$operation_status})"; |
| | | continue 2; |
| | | } |
| | | |
| | | $result = $wpdb->delete( |
| | | $table, |
| | | ['id' => $operation_id, 'user_id' => $user_id] |
| | | ); |
| | | break; |
| | | } |
| | | |
| | | if ($result !== false) { |
| | | if ($result) { |
| | | $processed_count++; |
| | | $valid_ids[] = $operation_id; |
| | | $valid_ids[] = $op->id; |
| | | } else { |
| | | $errors[] = "Failed to {$action} operation {$operation_id}"; |
| | | // Only add errors for meaningful failures |
| | | if ($action === 'retry' && ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent']))) { |
| | | $errors[] = "Operation {$op->id} cannot be retried (state: {$op->state}, outcome: {$op->outcome})"; |
| | | } |
| | | |
| | | // Silently skip cancel failures (can't cancel processing/completed) |
| | | } |
| | | } catch (Exception $e) { |
| | | $errors[] = "Error processing operation {$operation_id}: " . $e->getMessage(); |
| | | $errors[] = "Error processing operation {$op->id}: " . $e->getMessage(); |
| | | } |
| | | } |
| | | |
| | | // Prepare response |
| | | $message = $this->getActionMessage($action, $processed_count); |
| | | |
| | | if (!empty($errors)) { |
| | | $message .= ". Errors: " . implode(', ', $errors); |
| | | } |
| | |
| | | { |
| | | $user_id = get_current_user_id(); |
| | | |
| | | global $wpdb; |
| | | $table = $wpdb->prefix . $this->table; |
| | | $operations = JVB()->queue()->getUserOperations($user_id, [ |
| | | 'state' => 'completed', |
| | | 'outcome' => ['failed', 'failed_permanent', 'partial'], |
| | | 'has_errors' => true, |
| | | 'order_by' => 'updated_at DESC', |
| | | 'limit' => 20, |
| | | ]); |
| | | |
| | | $failed_operations = $wpdb->get_results($wpdb->prepare(" |
| | | SELECT id, type, error_message, failed_items, retries, created_at, updated_at |
| | | FROM $table |
| | | WHERE user_id = %d |
| | | AND status IN ('failed', 'completed_with_errors') |
| | | AND (error_message IS NOT NULL OR failed_items IS NOT NULL) |
| | | ORDER BY updated_at DESC |
| | | LIMIT 20 |
| | | ", $user_id), ARRAY_A); |
| | | |
| | | foreach ($failed_operations as &$op) { |
| | | $op['failed_items'] = json_decode($op['failed_items'] ?? '[]', true); |
| | | $op['error_details'] = $this->parseErrorMessage($op['error_message']); |
| | | } |
| | | $formatted = array_map(function($op) { |
| | | return [ |
| | | 'id' => $op->id, |
| | | 'type' => $op->type, |
| | | 'error_message' => $op->errorMessage, |
| | | 'failed_items' => $op->failedItems ?? [], |
| | | 'retries' => $op->retries, |
| | | 'created_at' => $op->scheduledAt, |
| | | 'updated_at' => $op->completedAt, |
| | | 'error_details' => $this->parseErrorMessage($op->errorMessage ?? ''), |
| | | ]; |
| | | }, $operations); |
| | | |
| | | return new WP_REST_Response([ |
| | | 'errors' => $failed_operations, |
| | | 'total' => count($failed_operations) |
| | | 'errors' => $formatted, |
| | | 'total' => count($formatted) |
| | | ]); |
| | | } |
| | | protected function parseErrorMessage(string $error_message): array |