From 7a9054bb3f033c98067b3196378311dae54c5fbf Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Tue, 20 Jan 2026 01:31:53 +0000
Subject: [PATCH] =OperationQueue refactor to the JVBase/managers/queue namespace
---
inc/rest/routes/QueueRoutes.php | 296 ++++++++++++++++++++++-------------------------------------
1 files changed, 111 insertions(+), 185 deletions(-)
diff --git a/inc/rest/routes/QueueRoutes.php b/inc/rest/routes/QueueRoutes.php
index 21bce8f..824ec55 100644
--- a/inc/rest/routes/QueueRoutes.php
+++ b/inc/rest/routes/QueueRoutes.php
@@ -16,23 +16,21 @@
class QueueRoutes extends RestRouteManager
{
-
- protected string $table = BASE.'_operation_queue';
- protected string $metricsTable = BASE.'stats__operation_queue';
-
public function __construct()
{
$this->cache_name = 'queue';
$this->cache_ttl = 300;
parent::__construct();
+
+ $this->cache->clear();
}
/**
* Registers queue routes
* @return void
*/
- public function registerRoutes():void
- {
+ public function registerRoutes():void
+ {
register_rest_route($this->namespace, '/queue', [
[
'methods' => 'GET',
@@ -79,7 +77,7 @@
]
]
]);
- }
+ }
/**
* Get queue operations with optional filtering
@@ -89,62 +87,45 @@
*/
public function getQueue(WP_REST_Request $request): WP_REST_Response
{
- $user_id = get_current_user_id();
- $status = $request->get_param('status');
+ $user_id = $request->get_param('user');
+ $status = sanitize_text_field($request->get_param('status'));
$ids = $request->get_param('ids');
$limit = intval($request->get_param('limit'));
-
// Use base class user-specific header checking
- // This checks both 'queue' and 'user_{$user_id}' timestamps
$cache_check = $this->checkUserHeaders($request, $user_id, 'queue');
if ($cache_check) {
- return $cache_check; // Returns 304 Not Modified
+ return $cache_check;
}
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ // Build filters for getUserOperations
+ $filters = [
+ 'not_dismissed' => true,
+ 'limit' => $limit ?: 50,
+ ];
- $sql = "SELECT * FROM $table WHERE user_id = %d AND COALESCE(user_dismissed, 0) = 0";
- $params = [$user_id];
-
- if ($status !== 'all') {
- $sql .= " AND status = %s";
- $params[] = $status;
+ if ($status && $status !== 'all') {
+ $filters['state'] = $status;
}
if (!empty($ids)) {
- $id_array = array_map('trim', explode(',', $ids));
- if (!empty($id_array)) {
- $placeholders = implode(',', array_fill(0, count($id_array), '%s'));
- $sql .= " AND id IN ($placeholders)";
- $params = array_merge($params, $id_array);
- }
+ $filters['ids'] = array_map('trim', explode(',', $ids));
}
- $sql .= " ORDER BY FIELD(status, 'processing', 'pending', 'failed', 'completed'), created_at DESC";
+ // Get operations via Queue
+ $operations = JVB()->queue()->getUserOperations($user_id, $filters);
- if ($limit > 0) {
- $sql .= " LIMIT %d";
- $params[] = $limit;
- }
-
- $operations = $wpdb->get_results($wpdb->prepare($sql, $params), ARRAY_A);
-
- // Format operations
- foreach ($operations as &$op) {
- $op = $this->formatOperation($op);
- }
+ // Format operations for API response
+ $formatted = array_map([$this, 'formatOperationFromObject'], $operations);
$response = new WP_REST_Response([
- 'items' => $operations,
- 'total' => count($operations),
+ 'items' => $formatted,
+ 'total' => count($formatted),
'timestamp' => date('c'),
- 'has_more' => count($operations) === $limit,
+ 'has_more' => count($formatted) === ($limit ?: 50),
'queue_stats' => $this->getQueueStats($user_id),
'server_time' => date('c')
]);
- // Add cache headers (ETag, Last-Modified)
return $this->addCacheHeaders($response);
}
@@ -154,76 +135,75 @@
*/
protected function getQueueStats(int $user_id): array
{
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ $stats = JVB()->queue()->getUserStats($user_id);
- $stats = $wpdb->get_results($wpdb->prepare(
- "SELECT status, COUNT(*) as count FROM $table WHERE user_id = %d GROUP BY status",
- $user_id
- ), OBJECT_K);
-
- $formatted_stats = [
+ // Add frontend-only statuses that don't exist in backend
+ return array_merge([
'queued' => 0,
'localProcessing' => 0,
'uploading' => 0,
- 'pending' => 0,
- 'processing' => 0,
- 'completed' => 0,
- 'failed' => 0,
- 'failed_permanent' => 0
- ];
+ ], $stats);
+ }
- foreach ($stats as $status => $data) {
- if (isset($formatted_stats[$status])) {
- $formatted_stats[$status] = intval($data->count);
- }
+ /**
+ * Map backend state/outcome to frontend status
+ * Backend uses: state (pending, scheduled, processing, completed) + outcome (pending, success, partial, failed, failed_permanent)
+ * Frontend expects: queued, pending, processing, completed, failed, failed_permanent
+ */
+ protected function mapStateToStatus(string $state, ?string $outcome): string
+ {
+ // If completed, check outcome for failure states
+ if ($state === 'completed') {
+ return match($outcome) {
+ 'failed' => 'failed',
+ 'failed_permanent' => 'failed_permanent',
+ 'partial' => 'completed', // or could be 'partial' if JS supports it
+ default => 'completed'
+ };
}
- return $formatted_stats;
+ // Map other states directly
+ return match($state) {
+ 'scheduled' => 'pending',
+ default => $state
+ };
}
+
/**
- * Format operation for API response
+ * Format Operation object for API response
*/
- protected function formatOperation(array $operation): array
+ protected function formatOperationFromObject(\JVBase\managers\queue\Operation $op): array
{
$formatted = [
- 'id' => $operation['id'],
- 'type' => $operation['type'],
- 'status' => $operation['status'],
- 'progress_count' => intval($operation['progress_count'] ?? 0),
- 'count' => intval($operation['count'] ?? 1),
- 'retries' => intval($operation['retries'] ?? 0),
- 'data' => json_decode($operation['request_data'] ?? '{}', true),
- 'result' => json_decode($operation['result']??'{}', true)
+ 'id' => $op->id,
+ 'type' => $op->type,
+ 'status' => $this->mapStateToStatus($op->state, $op->outcome),
+ 'progress_count' => $op->processedItems,
+ 'count' => $op->totalItems,
+ 'retries' => $op->retries,
+ 'data' => $op->requestData,
+ 'result' => $op->result ?? [],
];
- // Convert timestamps to ISO 8601 format with proper timezone
- $formatted['created_at'] = $this->formatTimestamp($operation['created_at']);
- $formatted['updated_at'] = $this->formatTimestamp($operation['updated_at']);
+ $formatted['created_at'] = $this->formatTimestamp($op->scheduledAt);
+ $formatted['updated_at'] = $this->formatTimestamp($op->completedAt ?? $op->startedAt ?? $op->scheduledAt);
- // Add completed_at if status is completed
- if ($operation['status'] === 'completed' && !empty($operation['completed_at'])) {
- $formatted['completed_at'] = $this->formatTimestamp($operation['completed_at']);
+ if ($op->state === 'completed' && $op->completedAt) {
+ $formatted['completed_at'] = $this->formatTimestamp($op->completedAt);
}
- // Add error message if failed
- if (!empty($operation['error_message'])) {
- $formatted['error_message'] = $operation['error_message'];
+ if ($op->errorMessage) {
+ $formatted['error_message'] = $op->errorMessage;
}
- // Simple progress percentage calculation
if ($formatted['count'] > 0) {
$formatted['progress_percentage'] = round(
($formatted['progress_count'] / $formatted['count']) * 100
);
}
- // Add human-readable title for easier frontend display
- $formatted['title'] = $this->getOperationTitle($operation['type'], $formatted['data']);
-
-
- // Add user dismissal status
- $formatted['user_dismissed'] = !empty($operation['user_dismissed']);
+ $formatted['title'] = $this->getOperationTitle($op->type, $op->requestData);
+ $formatted['user_dismissed'] = $op->userDismissed;
return $formatted;
}
@@ -270,10 +250,8 @@
$data = $request->get_json_params();
$ids = $data['ids'] ?? [];
$action = $data['action'] ?? '';
-
$user_id = (int)$data['user'];
-
// Validate input
if (empty($ids) || !is_array($ids)) {
return new WP_REST_Response([
@@ -289,31 +267,22 @@
], 400);
}
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ // Get operations via Queue - verifies ownership
+ $operations = JVB()->queue()->getUserOperations($user_id, [
+ 'ids' => $ids,
+ 'limit' => count($ids),
+ ]);
- // Verify operations exist and belong to user
- $placeholders = implode(',', array_fill(0, count($ids), '%s'));
- $valid_operations = $wpdb->get_results($wpdb->prepare(
- "SELECT id, status FROM $table WHERE id IN ($placeholders) AND user_id = %d",
- array_merge($ids, [$user_id])
- ));
-
- if (empty($valid_operations)) {
+ if (empty($operations)) {
return new WP_REST_Response([
'success' => false,
'message' => 'No valid operations found'
], 404);
}
- $valid_ids = array_column($valid_operations, 'id');
- $placeholders = implode(',', array_fill(0, count($valid_ids), '%s'));
-
- // Process action using foreach approach as suggested
- $result = $this->processQueueAction($action, $valid_operations, $user_id);
+ $result = $this->processQueueAction($action, $operations, $user_id);
if ($result['success']) {
- // Update timestamp for this user's queue
CacheManager::updateTimestamp("user_{$user_id}");
}
@@ -322,82 +291,36 @@
protected function processQueueAction(string $action, array $operations, int $user_id): array
{
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
-
+ $queue = JVB()->queue();
$processed_count = 0;
$errors = [];
$valid_ids = [];
- // Process each operation individually
- foreach ($operations as $operation) {
- $operation_id = $operation->id;
- $operation_status = $operation->status;
-
+ foreach ($operations as $op) {
try {
- $result = false;
+ $result = match($action) {
+ 'dismiss' => $queue->dismiss($op->id),
+ 'retry' => $queue->retry($op->id, $user_id),
+ 'cancel' => $queue->cancel($op->id, $user_id),
+ default => false,
+ };
- switch ($action) {
- case 'dismiss':
- // Can dismiss any operation
- $result = $wpdb->update(
- $table,
- ['user_dismissed' => 1],
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
-
- case 'retry':
- // Can only retry failed operations
- if (!in_array($operation_status, ['failed', 'failed_permanent'])) {
- $errors[] = "Operation {$operation_id} cannot be retried (status: {$operation_status})";
- continue 2;
- }
-
- $result = $wpdb->update(
- $table,
- [
- 'status' => 'pending',
- 'error_message' => null,
- 'updated_at' => current_time('mysql'),
- 'retries' => $wpdb->get_var($wpdb->prepare(
- "SELECT retries FROM $table WHERE id = %s",
- $operation_id
- )) + 1
- ],
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
-
- case 'cancel':
- // Can only cancel pending/queued operations
- if (!in_array($operation_status, ['pending', 'queued'])) {
-// $errors[] = "Operation {$operation_id} cannot be cancelled (status: {$operation_status})";
- continue 2;
- }
-
- $result = $wpdb->delete(
- $table,
- ['id' => $operation_id, 'user_id' => $user_id]
- );
- break;
- }
-
- if ($result !== false) {
+ if ($result) {
$processed_count++;
- $valid_ids[] = $operation_id;
+ $valid_ids[] = $op->id;
} else {
- $errors[] = "Failed to {$action} operation {$operation_id}";
+ // Only add errors for meaningful failures
+ if ($action === 'retry' && ($op->state !== 'completed' || !in_array($op->outcome, ['failed', 'failed_permanent']))) {
+ $errors[] = "Operation {$op->id} cannot be retried (state: {$op->state}, outcome: {$op->outcome})";
+ }
+ // Silently skip cancel failures (can't cancel processing/completed)
}
-
} catch (Exception $e) {
- $errors[] = "Error processing operation {$operation_id}: " . $e->getMessage();
+ $errors[] = "Error processing operation {$op->id}: " . $e->getMessage();
}
}
- // Prepare response
$message = $this->getActionMessage($action, $processed_count);
-
if (!empty($errors)) {
$message .= ". Errors: " . implode(', ', $errors);
}
@@ -435,27 +358,30 @@
{
$user_id = get_current_user_id();
- global $wpdb;
- $table = $wpdb->prefix . $this->table;
+ $operations = JVB()->queue()->getUserOperations($user_id, [
+ 'state' => 'completed',
+ 'outcome' => ['failed', 'failed_permanent', 'partial'],
+ 'has_errors' => true,
+ 'order_by' => 'updated_at DESC',
+ 'limit' => 20,
+ ]);
- $failed_operations = $wpdb->get_results($wpdb->prepare("
- SELECT id, type, error_message, failed_items, retries, created_at, updated_at
- FROM $table
- WHERE user_id = %d
- AND status IN ('failed', 'completed_with_errors')
- AND (error_message IS NOT NULL OR failed_items IS NOT NULL)
- ORDER BY updated_at DESC
- LIMIT 20
- ", $user_id), ARRAY_A);
-
- foreach ($failed_operations as &$op) {
- $op['failed_items'] = json_decode($op['failed_items'] ?? '[]', true);
- $op['error_details'] = $this->parseErrorMessage($op['error_message']);
- }
+ $formatted = array_map(function($op) {
+ return [
+ 'id' => $op->id,
+ 'type' => $op->type,
+ 'error_message' => $op->errorMessage,
+ 'failed_items' => $op->failedItems ?? [],
+ 'retries' => $op->retries,
+ 'created_at' => $op->scheduledAt,
+ 'updated_at' => $op->completedAt,
+ 'error_details' => $this->parseErrorMessage($op->errorMessage ?? ''),
+ ];
+ }, $operations);
return new WP_REST_Response([
- 'errors' => $failed_operations,
- 'total' => count($failed_operations)
+ 'errors' => $formatted,
+ 'total' => count($formatted)
]);
}
protected function parseErrorMessage(string $error_message): array
--
Gitblit v1.10.0