From d7e7d248cbe41cd7a9ef9c2fb022b6c4831f99a3 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Sun, 31 May 2026 15:22:56 +0000
Subject: [PATCH] =jakevan complete
---
inc/rest/routes/QueueRoutes.php | 701 ++++++++++++++++++++++++++--------------------------------
1 files changed, 317 insertions(+), 384 deletions(-)
diff --git a/inc/rest/routes/QueueRoutes.php b/inc/rest/routes/QueueRoutes.php
index 8ea77cd..954e0a6 100644
--- a/inc/rest/routes/QueueRoutes.php
+++ b/inc/rest/routes/QueueRoutes.php
@@ -1,233 +1,345 @@
<?php
namespace JVBase\rest\routes;
-use Exception;
-use JVBase\JVB;
-use JVBase\managers\CacheManager;
-use JVBase\rest\RestRouteManager;
+use JVBase\managers\Cache;
+use JVBase\managers\queue\Operation;
+use JVBase\rest\Rest;
+use JVBase\rest\Route;
+use JVBase\rest\Response;
use WP_REST_Request;
use WP_REST_Response;
-use DateTime;
-use DateTimeZone;
if (!defined('ABSPATH')) {
- exit; // Exit if accessed directly
+ exit; // Exit if accessed directly
}
-class QueueRoutes extends RestRouteManager
+class QueueRoutes extends Rest
{
+ public function __construct()
+ {
+ $this->cacheName = 'queue';
+ $this->cacheTtl = 300;
+ parent::__construct();
- protected string $table = BASE.'_operation_queue';
- protected string $metricsTable = BASE.'stats__operation_queue';
+ if (JVB_TESTING) {
+ $this->cache->flush();
+ }
+ }
- public function __construct()
- {
- $this->cache_name = 'queue';
- $this->cache_ttl = 300;
- parent::__construct();
- }
+ /**
+ * Registers queue routes
+ * @return void
+ */
+ public function registerRoutes():void
+ {
+ // Main queue endpoint - GET and POST
+ Route::for('queue')
+ ->get([$this, 'getQueue'])
+ ->args([
+ 'status' => 'string|enum:all,queued,pending,processing,completed,failed,failed_permanent|default:all',
+ 'ids' => 'string',
+ 'limit' => 'integer|default:50|min:1|max:100',
+ ])
+ ->auth('user')
+ ->rateLimit()
+ ->post([$this, 'handleAction'])
+ ->args([
+ 'ids' => 'array|required',
+ 'action' => 'string|required|enum:dismiss,retry,cancel',
+ ])
+ ->auth('user')
+ ->rateLimit()
+ ->register();
- /**
- * Registers queue routes
- * @return void
- */
- public function registerRoutes():void
- {
- register_rest_route($this->namespace, '/queue', [
- [
- 'methods' => 'GET',
- 'callback' => [$this, 'getQueue'],
- 'permission_callback' => [$this, 'checkPermission'],
- 'args' => [
- 'status' => [
- 'type' => 'string',
- 'enum' => ['all', 'queued', 'pending', 'processing', 'completed', 'failed', 'failed_permanent'],
- 'default' => 'all'
- ],
- 'ids' => [
- 'required' => false,
- 'type' => 'string',
- 'description' => 'Comma-separated list of operation IDs'
- ],
- 'limit' => [
- 'type' => 'integer',
- 'default' => 50,
- 'minimum' => 1,
- 'maximum' => 100
- ]
- ]
- ],
- [
- 'methods' => 'POST',
- 'callback' => [$this, 'handleAction'],
- 'permission_callback' => [$this, 'checkPermission'],
- 'args' => [
- 'ids' => [
- 'required' => true,
- 'type' => 'array',
- 'items' => [
- 'type' => 'string'
- ],
- 'description' => 'Array of operation IDs (single or multiple)'
- ],
- 'action' => [
- 'required' => true,
- 'type' => 'string',
- 'enum' => ['dismiss', 'retry', 'cancel'],
- 'description' => 'Action to perform on the operations'
- ]
- ]
- ]
- ]);
- }
+ // Poll endpoint
+ Route::for('queue/poll')
+ ->get([$this, 'pollQueue'])
+ ->args([
+ 'since' => 'string',
+ 'ids' => 'string',
+ ])
+ ->auth('user')
+ ->rateLimit()
+ ->register();
- /**
- * Get queue operations with optional filtering
- *
- * @param WP_REST_Request $request
- * @return WP_REST_Response
- */
+ // Errors endpoint
+ Route::for('queue/errors')
+ ->get([$this, 'getOperationErrors'])
+ ->auth('user')
+ ->rateLimit()
+ ->register();
+
+ // Single operation with dynamic ID
+ Route::for(Route::pattern('queue/{id}'))
+ ->get([$this, 'getOperation'])
+ ->arg('id', 'string|required')
+ ->auth('user')
+ ->rateLimit()
+ ->register();
+ }
+
+ /**
+ * Get queue operations with optional filtering
+ *
+ * @param WP_REST_Request $request
+ * @return WP_REST_Response
+ */
public function getQueue(WP_REST_Request $request): WP_REST_Response
{
- $user_id = get_current_user_id();
- $status = $request->get_param('status');
- $ids = $request->get_param('ids');
- $limit = intval($request->get_param('limit'));
+ $params = $request->get_params();
+ $user_id = absint($params['user']);
+ $this->cache = Cache::for($user_id.'_queue');
+ $status = sanitize_text_field($params['status']);
+ $ids = !empty($params['ids'])
+ ? array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])))
+ : [];
+ $limit = absint($params['limit']);
- // Use base class user-specific header checking
- // This checks both 'queue' and 'user_{$user_id}' timestamps
- $cache_check = $this->checkUserHeaders($request, $user_id, 'queue');
- if ($cache_check) {
- return $cache_check; // Returns 304 Not Modified
+ $cacheKey = $this->cache->generateKey(compact('user_id', 'status', 'ids', 'limit'));
+ if ($cached = $this->checkHeaders($request, $cacheKey)) {
+ return $cached;
}
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ $data = $this->cache->tag("user:{$user_id}")->remember($cacheKey, function() use ($user_id, $params) {
+ $filters = $this->buildFilters($params);
+ $operations = JVB()->queue()->getUserOperations($user_id, $filters);
- $sql = "SELECT * FROM $table WHERE user_id = %d AND COALESCE(user_dismissed, 0) = 0";
- $params = [$user_id];
+ return [
+ 'items' => array_map([$this, 'formatOperation'], $operations),
+ 'total' => count($operations),
+ 'queue_stats' => $this->getQueueStats($user_id),
+ 'server_time' => date('c'),
+ ];
+ });
- if ($status !== 'all') {
- $sql .= " AND status = %s";
- $params[] = $status;
+ $response = Response::success($data);
+ return $this->addCacheHeaders($response);
+ }
+
+ private function buildFilters(array $params): array
+ {
+ $filters = [
+ 'not_dismissed' => true,
+ 'limit' => min(absint($params['limit'] ?? 50), 100),
+ ];
+
+ if (!empty($params['status']) && $params['status'] !== 'all') {
+ $filters['state'] = $params['status'];
}
- if (!empty($ids)) {
- $id_array = array_map('trim', explode(',', $ids));
- if (!empty($id_array)) {
- $placeholders = implode(',', array_fill(0, count($id_array), '%s'));
- $sql .= " AND id IN ($placeholders)";
- $params = array_merge($params, $id_array);
- }
+ if (!empty($params['ids'])) {
+ $filters['ids'] = array_map('trim', array_map('sanitize_text_field', explode(',', $params['ids'])));
}
- $sql .= " ORDER BY FIELD(status, 'processing', 'pending', 'failed', 'completed'), created_at DESC";
+ return $filters;
+ }
- if ($limit > 0) {
- $sql .= " LIMIT %d";
- $params[] = $limit;
+ /**
+ * Update operation status (dismiss or retry)
+ *
+ * @param WP_REST_Request $request
+ * @return WP_REST_Response
+ */
+ public function handleAction(WP_REST_Request $request): WP_REST_Response
+ {
+ $data = $request->get_params();
+ $ids = is_array($data['ids'])
+ ? array_map('trim', array_map('sanitize_text_field', $data['ids']))
+ : array_map('trim', array_map('sanitize_text_field', explode(',', $data['ids'])));
+
+ $action = sanitize_text_field($data['action'] ?? '');
+ $user_id = absint($data['user']);
+
+ $this->cache = Cache::for($user_id.'_queue');
+
+ // Validate input
+ if (empty($ids)) {
+ return Response::validationError(['ids' => 'Missing or invalid operation IDs']);
}
- $operations = $wpdb->get_results($wpdb->prepare($sql, $params), ARRAY_A);
-
- // Format operations
- foreach ($operations as &$op) {
- $op = $this->formatOperation($op);
+ if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
+ return Response::validationError(['action' => 'Invalid action. Must be: dismiss, retry, or cancel']);
}
- $response = new WP_REST_Response([
- 'items' => $operations,
- 'total' => count($operations),
- 'timestamp' => date('c'),
- 'has_more' => count($operations) === $limit,
- 'queue_stats' => $this->getQueueStats($user_id),
- 'server_time' => date('c')
+ // Get operations via Queue - verifies ownership
+ $operations = JVB()->queue()->getUserOperations($user_id, [
+ 'ids' => $ids,
+ 'limit' => count($ids),
]);
- // Add cache headers (ETag, Last-Modified)
- return $this->addCacheHeaders($response);
+ if (empty($operations)) {
+ return Response::notFound('No valid operations found');
+ }
+
+ $result = $this->processAction($action, $operations, $user_id);
+
+ if ($result['success']) {
+ Cache::touch($user_id);
+ }
+
+ return Response::success($result);
+ }
+
+ public function pollQueue(WP_REST_Request $request): WP_REST_Response
+ {
+ $userId = $request->get_param('user');
+ $this->cache = Cache::for($userId.'_queue');
+ $since = $request->get_param('since');
+ $ids = $request->get_param('ids');
+
+ $filters = ['not_dismissed' => true, 'limit' => 50];
+
+ if (!empty($ids)) {
+ $filters['ids'] = array_map('trim', explode(',', $ids));
+ }
+
+ $operations = JVB()->queue()->getUserOperations($userId, $filters);
+
+ if ($since) {
+ $sinceTime = strtotime($since);
+ $operations = array_filter($operations, function($op) use ($sinceTime) {
+ return strtotime($op->completedAt ?? $op->startedAt ?? $op->scheduledAt) > $sinceTime;
+ });
+ }
+
+ $items = array_map(fn($op) => [
+ 'id' => $op->id,
+ 'status' => $this->mapStateToStatus($op->state, $op->outcome),
+ 'progress_percentage' => $this->formatPercentage($op),
+ 'progress_count' => $op->processedItems,
+ 'count' => $op->totalItems,
+ 'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
+ 'error_message' => $op->errorMessage,
+ ], $operations);
+
+ return Response::success([
+ 'items' => array_values($items),
+ 'server_time' => date('c'),
+ 'has_active' => count(array_filter($items, fn($i) => in_array($i['status'], ['pending', 'processing']))) > 0,
+ ]);
+ }
+
+ public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
+ {
+ $user_id = absint($request->get_param('user'));
+ $this->cache = Cache::for($user_id.'_queue');
+ $operations = JVB()->queue()->getUserOperations($user_id, [
+ 'state' => 'completed',
+ 'outcome' => ['failed', 'failed_permanent', 'partial'],
+ 'has_errors' => true,
+ 'order_by' => 'updated_at DESC',
+ 'limit' => 20,
+ ]);
+
+ $formatted = array_map(fn($op) => [
+ 'id' => $op->id,
+ 'type' => $op->type,
+ 'error_message' => $op->errorMessage,
+ 'failed_items' => $op->failedItems ?? [],
+ 'retries' => $op->retries,
+ 'created_at' => $op->scheduledAt,
+ 'updated_at' => $op->completedAt,
+ ], $operations);
+
+ return Response::collection($formatted);
+ }
+
+ public function getOperation(WP_REST_Request $request): WP_REST_Response
+ {
+ $id = $request->get_param('id');
+ $userId = $request->get_param('user');
+ $this->cache = Cache::for($userId.'_queue');
+
+ $op = JVB()->queue()->get($id);
+
+ if (!$op || $op->userId !== $userId) {
+ return Response::notFound('Operation not found');
+ }
+
+ return Response::item($this->formatOperation($op, true), 'operation');
}
/**
* Get queue statistics for user
*/
- protected function getQueueStats(int $user_id): array
+ private function getQueueStats(int $userId): array
{
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ return array_merge(
+ ['queued' => 0, 'localProcessing' => 0, 'uploading' => 0],
+ JVB()->queue()->getUserStats($userId)
+ );
+ }
- $stats = $wpdb->get_results($wpdb->prepare(
- "SELECT status, COUNT(*) as count FROM $table WHERE user_id = %d GROUP BY status",
- $user_id
- ), OBJECT_K);
-
- $formatted_stats = [
- 'queued' => 0,
- 'localProcessing' => 0,
- 'uploading' => 0,
- 'pending' => 0,
- 'processing' => 0,
- 'completed' => 0,
- 'failed' => 0,
- 'failed_permanent' => 0
+ private function formatOperation(Operation $op, bool $full = true): array
+ {
+ $formatted = [
+ 'id' => $op->id,
+ 'type' => $op->type,
+ 'status' => $this->mapStateToStatus($op->state, $op->outcome),
+ 'progress_count' => $op->processedItems,
+ 'progress_percentage' => $this->formatPercentage($op),
+ 'count' => $op->totalItems,
+ 'title' => $this->getOperationTitle($op->type, $op->requestData),
+ 'created_at' => $this->formatTimestamp($op->scheduledAt),
+ 'updated_at' => $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt),
];
- foreach ($stats as $status => $data) {
- if (isset($formatted_stats[$status])) {
- $formatted_stats[$status] = intval($data->count);
+ if (!empty($op->result['merged_into'])) {
+ $formatted['merged_into'] = $op->result['merged_into'];
+ }
+
+ if ($op->errorMessage) {
+ $formatted['error_message'] = $op->errorMessage;
+ }
+
+ if ($full) {
+ $formatted += [
+ 'data' => $op->requestData,
+ 'result' => $op->result ?? [],
+ 'retries' => $op->retries,
+ 'user_dismissed' => $op->userDismissed,
+ ];
+
+ if ($op->state === 'completed' && $op->completedAt) {
+ $formatted['completed_at'] = $this->formatTimestamp($op->completedAt);
}
}
- return $formatted_stats;
- }
- /**
- * Format operation for API response
- */
- protected function formatOperation(array $operation): array
- {
- $formatted = [
- 'id' => $operation['id'],
- 'type' => $operation['type'],
- 'status' => $operation['status'],
- 'progress_count' => intval($operation['progress_count'] ?? 0),
- 'count' => intval($operation['count'] ?? 1),
- 'retries' => intval($operation['retries'] ?? 0),
- 'data' => json_decode($operation['request_data'] ?? '{}', true),
- 'result' => json_decode($operation['result']??'{}', true)
- ];
-
- // Convert timestamps to ISO 8601 format with proper timezone
- $formatted['created_at'] = $this->formatTimestamp($operation['created_at']);
- $formatted['updated_at'] = $this->formatTimestamp($operation['updated_at']);
-
- // Add completed_at if status is completed
- if ($operation['status'] === 'completed' && !empty($operation['completed_at'])) {
- $formatted['completed_at'] = $this->formatTimestamp($operation['completed_at']);
- }
-
- // Add error message if failed
- if (!empty($operation['error_message'])) {
- $formatted['error_message'] = $operation['error_message'];
- }
-
- // Simple progress percentage calculation
- if ($formatted['count'] > 0) {
- $formatted['progress_percentage'] = round(
- ($formatted['progress_count'] / $formatted['count']) * 100
- );
- }
-
- // Add human-readable title for easier frontend display
- $formatted['title'] = $this->getOperationTitle($operation['type'], $formatted['data']);
-
-
- // Add user dismissal status
- $formatted['user_dismissed'] = !empty($operation['user_dismissed']);
-
return $formatted;
}
+ protected function formatPercentage(Operation $op):int
+ {
+ if ($op->totalItems > 0){
+ return round(($op->processedItems / $op->totalItems) * 100);
+ } else {
+ return match($op->state) {
+ 'pending','scheduled=' => 0,
+ 'processing' => 45,
+ 'completed' => 100
+ };
+ }
+ }
+ /**
+ * Map backend state/outcome to frontend status
+ * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent)
+ * Frontend expects: queued, pending, processing, completed, failed, failed_permanent
+ */
+ private function mapStateToStatus(string $state, ?string $outcome): string
+ {
+ if ($state === 'completed') {
+ return match ($outcome) {
+ 'failed' => 'failed',
+ 'failed_permanent' => 'failed_permanent',
+ default => 'completed',
+ };
+ }
+
+ return $state === 'scheduled' ? 'pending' : $state;
+ }
+
+
/**
* Get human-readable operation title
*/
@@ -259,218 +371,39 @@
return $base_title;
}
- /**
- * Update operation status (dismiss or retry)
- *
- * @param WP_REST_Request $request
- * @return WP_REST_Response
- */
- public function handleAction(WP_REST_Request $request): WP_REST_Response
+
+
+ private function processAction(string $action, array $operations, int $userId): array
{
- $data = $request->get_json_params();
- $ids = $data['ids'] ?? [];
- $action = $data['action'] ?? '';
+ $queue = JVB()->queue();
+ $processed = 0;
+ $processedIds = [];
- $user_id = (int)$data['user'];
+ foreach ($operations as $op) {
+ $result = match ($action) {
+ 'dismiss' => $queue->dismiss($op->id),
+ 'retry' => $queue->retry($op->id, $userId),
+ 'cancel' => $queue->cancel($op->id, $userId),
+ default => false,
+ };
-
- // Validate input
- if (empty($ids) || !is_array($ids)) {
- return new WP_REST_Response([
- 'success' => false,
- 'message' => 'Missing or invalid operation IDs'
- ], 400);
- }
-
- if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
- return new WP_REST_Response([
- 'success' => false,
- 'message' => 'Invalid action. Must be: dismiss, retry, or cancel'
- ], 400);
- }
-
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
-
- // Verify operations exist and belong to user
- $placeholders = implode(',', array_fill(0, count($ids), '%s'));
- $valid_operations = $wpdb->get_results($wpdb->prepare(
- "SELECT id, status FROM $table WHERE id IN ($placeholders) AND user_id = %d",
- array_merge($ids, [$user_id])
- ));
-
- if (empty($valid_operations)) {
- return new WP_REST_Response([
- 'success' => false,
- 'message' => 'No valid operations found'
- ], 404);
- }
-
- $valid_ids = array_column($valid_operations, 'id');
- $placeholders = implode(',', array_fill(0, count($valid_ids), '%s'));
-
- // Process action using foreach approach as suggested
- $result = $this->processQueueAction($action, $valid_operations, $user_id);
-
- if ($result['success']) {
- // Update timestamp for this user's queue
- CacheManager::updateTimestamp("user_{$user_id}");
- }
-
- return new WP_REST_Response($result);
- }
-
- protected function processQueueAction(string $action, array $operations, int $user_id): array
- {
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
-
- $processed_count = 0;
- $errors = [];
- $valid_ids = [];
-
- // Process each operation individually
- foreach ($operations as $operation) {
- $operation_id = $operation->id;
- $operation_status = $operation->status;
-
- try {
- $result = false;
-
- switch ($action) {
- case 'dismiss':
- // Can dismiss any operation
- $result = $wpdb->update(
- $table,
- ['user_dismissed' => 1],
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
-
- case 'retry':
- // Can only retry failed operations
- if (!in_array($operation_status, ['failed', 'failed_permanent'])) {
- $errors[] = "Operation {$operation_id} cannot be retried (status: {$operation_status})";
- continue 2;
- }
-
- $result = $wpdb->update(
- $table,
- [
- 'status' => 'pending',
- 'error_message' => null,
- 'updated_at' => current_time('mysql'),
- 'retries' => $wpdb->get_var($wpdb->prepare(
- "SELECT retries FROM $table WHERE id = %s",
- $operation_id
- )) + 1
- ],
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
-
- case 'cancel':
- // Can only cancel pending/queued operations
- if (!in_array($operation_status, ['pending', 'queued'])) {
- $errors[] = "Operation {$operation_id} cannot be cancelled (status: {$operation_status})";
- continue 2;
- }
-
- $result = $wpdb->delete(
- $table,
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
- }
-
- if ($result !== false) {
- $processed_count++;
- $valid_ids[] = $operation_id;
- } else {
- $errors[] = "Failed to {$action} operation {$operation_id}";
- }
-
- } catch (Exception $e) {
- $errors[] = "Error processing operation {$operation_id}: " . $e->getMessage();
+ if ($result) {
+ $processed++;
+ $processedIds[] = $op->id;
}
}
- // Prepare response
- $message = $this->getActionMessage($action, $processed_count);
-
- if (!empty($errors)) {
- $message .= ". Errors: " . implode(', ', $errors);
- }
+ $pastTense = ['dismiss' => 'dismissed', 'retry' => 'retried', 'cancel' => 'cancelled'];
return [
- 'success' => $processed_count > 0,
+ 'success' => $processed > 0,
'action' => $action,
- 'processed_count' => $processed_count,
+ 'processed_count' => $processed,
'total_requested' => count($operations),
- 'processed_ids' => $valid_ids,
- 'errors' => $errors,
- 'message' => $message
- ];
- }
-
- /**
- * Get user-friendly message for action results
- */
- protected function getActionMessage(string $action, int $count): string
- {
- if ($count === 0) {
- return "No operations were {$action}ed";
- }
-
- $past_tense = [
- 'dismiss' => 'dismissed',
- 'retry' => 'retried',
- 'cancel' => 'cancelled'
- ];
-
- return "{$count} operation" . ($count === 1 ? '' : 's') . " {$past_tense[$action]}";
- }
-
- public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
- {
- $user_id = get_current_user_id();
-
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
-
- $failed_operations = $wpdb->get_results($wpdb->prepare("
- SELECT id, type, error_message, failed_items, retries, created_at, updated_at
- FROM $table
- WHERE user_id = %d
- AND status IN ('failed', 'completed_with_errors')
- AND (error_message IS NOT NULL OR failed_items IS NOT NULL)
- ORDER BY updated_at DESC
- LIMIT 20
- ", $user_id), ARRAY_A);
-
- foreach ($failed_operations as &$op) {
- $op['failed_items'] = json_decode($op['failed_items'] ?? '[]', true);
- $op['error_details'] = $this->parseErrorMessage($op['error_message']);
- }
-
- return new WP_REST_Response([
- 'errors' => $failed_operations,
- 'total' => count($failed_operations)
- ]);
- }
- protected function parseErrorMessage(string $error_message): array
- {
- if (str_contains($error_message, ' | ')) {
- $parts = explode(' | ', $error_message);
- return [
- 'original_error' => $parts[0] ?? '',
- 'cleanup_reason' => $parts[1] ?? ''
- ];
- }
-
- return [
- 'original_error' => $error_message,
- 'cleanup_reason' => null
+ 'processed_ids' => $processedIds,
+ 'message' => $processed > 0
+ ? "{$processed} operation(s) {$pastTense[$action]}"
+ : "No operations were {$pastTense[$action]}",
];
}
}
--
Gitblit v1.10.0