<?php
|
namespace JVBase\rest\routes;
|
|
use JVBase\JVB;
|
use JVBase\rest\RestRouteManager;
|
use WP_REST_Request;
|
use WP_REST_Response;
|
use DateTime;
|
use DateTimeZone;
|
|
if (!defined('ABSPATH')) {
|
exit; // Exit if accessed directly
|
}
|
|
class QueueRoutes extends RestRouteManager
|
{
|
|
protected string $table = BASE.'_operation_queue';
|
protected string $metricsTable = BASE.'stats__operation_queue';
|
|
public function __construct()
|
{
|
$this->cache_name = 'queue';
|
$this->cache_ttl = 300;
|
parent::__construct();
|
}
|
|
/**
|
* Registers queue routes
|
* @return void
|
*/
|
public function registerRoutes():void
|
{
|
register_rest_route($this->namespace, '/queue', [
|
[
|
'methods' => 'GET',
|
'callback' => [$this, 'getQueue'],
|
'permission_callback' => [$this, 'checkPermission'],
|
'args' => [
|
'status' => [
|
'type' => 'string',
|
'enum' => ['all', 'queued', 'pending', 'processing', 'completed', 'failed', 'failed_permanent'],
|
'default' => 'all'
|
],
|
'ids' => [
|
'required' => false,
|
'type' => 'string',
|
'description' => 'Comma-separated list of operation IDs'
|
],
|
'limit' => [
|
'type' => 'integer',
|
'default' => 50,
|
'minimum' => 1,
|
'maximum' => 100
|
]
|
]
|
],
|
[
|
'methods' => 'POST',
|
'callback' => [$this, 'handleAction'],
|
'permission_callback' => [$this, 'checkPermission'],
|
'args' => [
|
'ids' => [
|
'required' => true,
|
'type' => 'array',
|
'items' => [
|
'type' => 'string'
|
],
|
'description' => 'Array of operation IDs (single or multiple)'
|
],
|
'action' => [
|
'required' => true,
|
'type' => 'string',
|
'enum' => ['dismiss', 'retry', 'cancel'],
|
'description' => 'Action to perform on the operations'
|
]
|
]
|
]
|
]);
|
}
|
|
/**
|
* Get queue operations with optional filtering
|
*
|
* @param WP_REST_Request $request
|
* @return WP_REST_Response
|
*/
|
public function getQueue(WP_REST_Request $request): WP_REST_Response
|
{
|
$user_id = get_current_user_id();
|
$status = $request->get_param('status');
|
$ids = $request->get_param('ids');
|
$limit = intval($request->get_param('limit'));
|
|
// User-specific caching (keep this)
|
$user_queue_timestamp = $this->getUserQueueTimestamp($user_id);
|
|
$if_modified_since = $request->get_header('If-Modified-Since');
|
if ($if_modified_since) {
|
$if_modified_timestamp = strtotime($if_modified_since);
|
if ($user_queue_timestamp <= $if_modified_timestamp) {
|
return new WP_REST_Response(null, 304);
|
}
|
}
|
|
header('Last-Modified: ' . gmdate('D, d M Y H:i:s', $user_queue_timestamp) . ' GMT');
|
header('Cache-Control: private, max-age=30');
|
|
global $wpdb;
|
$table = $wpdb->prefix . $this->table;
|
|
$sql = "SELECT * FROM $table WHERE user_id = %d AND COALESCE(user_dismissed, 0) = 0";
|
$params = [$user_id];
|
|
if ($status !== 'all') {
|
$sql .= " AND status = %s";
|
$params[] = $status;
|
}
|
|
if (!empty($ids)) {
|
$id_array = array_map('trim', explode(',', $ids));
|
if (!empty($id_array)) {
|
$placeholders = implode(',', array_fill(0, count($id_array), '%s'));
|
$sql .= " AND id IN ($placeholders)";
|
$params = array_merge($params, $id_array);
|
}
|
}
|
|
$sql .= " ORDER BY FIELD(status, 'processing', 'pending', 'failed', 'completed'), created_at DESC";
|
|
if ($limit > 0) {
|
$sql .= " LIMIT %d";
|
$params[] = $limit;
|
}
|
|
$operations = $wpdb->get_results($wpdb->prepare($sql, $params), ARRAY_A);
|
|
// Format operations with improved data structure
|
foreach ($operations as &$op) {
|
$op = $this->formatOperation($op);
|
}
|
|
return new WP_REST_Response([
|
'items' => $operations,
|
'total' => count($operations),
|
'timestamp' => date('c'), // ISO format
|
'has_more' => count($operations) === $limit,
|
'queue_stats' => $this->getQueueStats($user_id),
|
'server_time' => date('c') // Helpful for frontend time sync
|
]);
|
}
|
|
|
/**
|
* Get queue statistics for user
|
*/
|
protected function getQueueStats(int $user_id): array
|
{
|
global $wpdb;
|
$table = $wpdb->prefix . $this->table;
|
|
$stats = $wpdb->get_results($wpdb->prepare(
|
"SELECT status, COUNT(*) as count FROM $table WHERE user_id = %d GROUP BY status",
|
$user_id
|
), OBJECT_K);
|
|
$formatted_stats = [
|
'queued' => 0,
|
'localProcessing' => 0,
|
'uploading' => 0,
|
'pending' => 0,
|
'processing' => 0,
|
'completed' => 0,
|
'failed' => 0,
|
'failed_permanent' => 0
|
];
|
|
foreach ($stats as $status => $data) {
|
if (isset($formatted_stats[$status])) {
|
$formatted_stats[$status] = intval($data->count);
|
}
|
}
|
|
return $formatted_stats;
|
}
|
/**
|
* Format operation for API response
|
*/
|
protected function formatOperation(array $operation): array
|
{
|
$formatted = [
|
'id' => $operation['id'],
|
'type' => $operation['type'],
|
'status' => $operation['status'],
|
'progress_count' => intval($operation['progress_count'] ?? 0),
|
'count' => intval($operation['count'] ?? 1),
|
'retries' => intval($operation['retries'] ?? 0),
|
'data' => json_decode($operation['request_data'] ?? '{}', true)
|
];
|
|
// Convert timestamps to ISO 8601 format with proper timezone
|
$formatted['created_at'] = $this->formatTimestamp($operation['created_at']);
|
$formatted['updated_at'] = $this->formatTimestamp($operation['updated_at']);
|
|
// Add completed_at if status is completed
|
if ($operation['status'] === 'completed' && !empty($operation['completed_at'])) {
|
$formatted['completed_at'] = $this->formatTimestamp($operation['completed_at']);
|
}
|
|
// Add error message if failed
|
if (!empty($operation['error_message'])) {
|
$formatted['error_message'] = $operation['error_message'];
|
}
|
|
// Simple progress percentage calculation
|
if ($formatted['count'] > 0) {
|
$formatted['progress_percentage'] = round(
|
($formatted['progress_count'] / $formatted['count']) * 100
|
);
|
}
|
|
// Add human-readable title for easier frontend display
|
$formatted['title'] = $this->getOperationTitle($operation['type'], $formatted['data']);
|
|
|
// Add user dismissal status
|
$formatted['user_dismissed'] = !empty($operation['user_dismissed']);
|
|
return $formatted;
|
}
|
|
/**
|
* Update user's queue timestamp when any operation changes
|
* This should be called whenever an operation status changes
|
*/
|
public function updateUserQueueTimestamp(int $user_id): void
|
{
|
$key = "{$user_id}_queue_timestamp";
|
$this->cache->set($key, time());
|
}
|
|
/**
|
* Convert MySQL datetime to ISO 8601 timestamp with proper timezone
|
*/
|
protected function formatTimestamp(?string $mysql_datetime): ?string
|
{
|
if (empty($mysql_datetime)) {
|
return null;
|
}
|
|
try {
|
// Create DateTime object from MySQL datetime (assuming UTC storage)
|
$date = new DateTime($mysql_datetime, new DateTimeZone('UTC'));
|
|
// Return ISO 8601 format with UTC timezone indicator
|
return $date->format('c'); // e.g., "2025-07-23T22:57:35+00:00"
|
|
} catch (Exception $e) {
|
// Fallback: return null if datetime is invalid
|
return null;
|
}
|
}
|
|
/**
|
* Get human-readable operation title
|
*/
|
protected function getOperationTitle(string $type, array $data): string
|
{
|
$titles = [
|
'attach_upload_to_content' => 'Attaching Upload',
|
'content_update' => 'Updating Content',
|
'user_settings' => 'Updating Settings',
|
'bulk_operation' => 'Bulk Operation',
|
'image_processing' => 'Processing Images',
|
'notification_send' => 'Sending Notification',
|
'cache_clear' => 'Clearing Cache',
|
'data_export' => 'Exporting Data',
|
'data_import' => 'Importing Data'
|
];
|
|
$base_title = $titles[$type] ?? ucwords(str_replace('_', ' ', $type));
|
|
if ($type === 'attach_upload_to_content' && $data['mode'] === 'selection') {
|
$base_title .= '; Waiting on your Groupings to proceed...';
|
}
|
// Add context if available
|
if (!empty($data['content'])) {
|
$content_type = ucfirst($data['content']);
|
$base_title = str_replace('Content', $content_type, $base_title);
|
}
|
|
return $base_title;
|
}
|
|
/**
|
* Get user's queue last update timestamp from cache
|
*/
|
protected function getUserQueueTimestamp(int $user_id): int
|
{
|
$key = "{$user_id}_queue_timestamp";
|
|
// Use CacheManager for consistency
|
$timestamp = $this->cache->get($key);
|
|
if ($timestamp === false) {
|
$timestamp = time();
|
$this->cache->set($key, $timestamp);
|
}
|
|
return $timestamp;
|
}
|
|
/**
|
* Update operation status (dismiss or retry)
|
*
|
* @param WP_REST_Request $request
|
* @return WP_REST_Response
|
*/
|
public function handleAction(WP_REST_Request $request): WP_REST_Response
|
{
|
$data = $request->get_json_params();
|
$ids = $data['ids'] ?? [];
|
$action = $data['action'] ?? '';
|
$user_id = get_current_user_id();
|
|
// Validate input
|
if (empty($ids) || !is_array($ids)) {
|
return new WP_REST_Response([
|
'success' => false,
|
'message' => 'Missing or invalid operation IDs'
|
], 400);
|
}
|
|
if (!in_array($action, ['dismiss', 'retry', 'cancel'])) {
|
return new WP_REST_Response([
|
'success' => false,
|
'message' => 'Invalid action. Must be: dismiss, retry, or cancel'
|
], 400);
|
}
|
|
global $wpdb;
|
$table = $wpdb->prefix . $this->table;
|
|
// Verify operations exist and belong to user
|
$placeholders = implode(',', array_fill(0, count($ids), '%s'));
|
$valid_operations = $wpdb->get_results($wpdb->prepare(
|
"SELECT id, status FROM $table WHERE id IN ($placeholders) AND user_id = %d",
|
array_merge($ids, [$user_id])
|
));
|
|
if (empty($valid_operations)) {
|
return new WP_REST_Response([
|
'success' => false,
|
'message' => 'No valid operations found'
|
], 404);
|
}
|
|
$valid_ids = array_column($valid_operations, 'id');
|
$placeholders = implode(',', array_fill(0, count($valid_ids), '%s'));
|
|
// Process action using foreach approach as suggested
|
$result = $this->processQueueAction($action, $valid_operations, $user_id);
|
|
if ($result['success']) {
|
$this->invalidateUserCache($user_id);
|
}
|
|
return new WP_REST_Response($result);
|
}
|
|
protected function invalidateUserCache(int $user_id): void
|
{
|
$key = "{$user_id}_queue_timestamp";
|
$this->cache->invalidate($key);
|
$this->cache->set($key, time());
|
}
|
|
protected function processQueueAction(string $action, array $operations, int $user_id): array
|
{
|
global $wpdb;
|
$table = $wpdb->prefix . $this->table;
|
|
$processed_count = 0;
|
$errors = [];
|
$valid_ids = [];
|
|
// Process each operation individually
|
foreach ($operations as $operation) {
|
$operation_id = $operation->id;
|
$operation_status = $operation->status;
|
|
try {
|
$result = false;
|
|
switch ($action) {
|
case 'dismiss':
|
// Can dismiss any operation
|
$result = $wpdb->update(
|
$table,
|
['user_dismissed' => 1],
|
['id' => $operation_id, 'user_id' => $user_id]
|
);
|
break;
|
|
case 'retry':
|
// Can only retry failed operations
|
if (!in_array($operation_status, ['failed', 'failed_permanent'])) {
|
$errors[] = "Operation {$operation_id} cannot be retried (status: {$operation_status})";
|
continue 2;
|
}
|
|
$result = $wpdb->update(
|
$table,
|
[
|
'status' => 'pending',
|
'error_message' => null,
|
'updated_at' => current_time('mysql'),
|
'retries' => $wpdb->get_var($wpdb->prepare(
|
"SELECT retries FROM $table WHERE id = %s",
|
$operation_id
|
)) + 1
|
],
|
['id' => $operation_id, 'user_id' => $user_id]
|
);
|
break;
|
|
case 'cancel':
|
// Can only cancel pending/queued operations
|
if (!in_array($operation_status, ['pending', 'queued'])) {
|
$errors[] = "Operation {$operation_id} cannot be cancelled (status: {$operation_status})";
|
continue 2;
|
}
|
|
$result = $wpdb->delete(
|
$table,
|
['id' => $operation_id, 'user_id' => $user_id]
|
);
|
break;
|
}
|
|
if ($result !== false) {
|
$processed_count++;
|
$valid_ids[] = $operation_id;
|
} else {
|
$errors[] = "Failed to {$action} operation {$operation_id}";
|
}
|
|
} catch (Exception $e) {
|
$errors[] = "Error processing operation {$operation_id}: " . $e->getMessage();
|
}
|
}
|
|
// Prepare response
|
$message = $this->getActionMessage($action, $processed_count);
|
|
if (!empty($errors)) {
|
$message .= ". Errors: " . implode(', ', $errors);
|
}
|
|
return [
|
'success' => $processed_count > 0,
|
'action' => $action,
|
'processed_count' => $processed_count,
|
'total_requested' => count($operations),
|
'processed_ids' => $valid_ids,
|
'errors' => $errors,
|
'message' => $message
|
];
|
}
|
|
/**
|
* Get user-friendly message for action results
|
*/
|
protected function getActionMessage(string $action, int $count): string
|
{
|
if ($count === 0) {
|
return "No operations were {$action}ed";
|
}
|
|
$past_tense = [
|
'dismiss' => 'dismissed',
|
'retry' => 'retried',
|
'cancel' => 'cancelled'
|
];
|
|
return "{$count} operation" . ($count === 1 ? '' : 's') . " {$past_tense[$action]}";
|
}
|
|
public function getOperationErrors(WP_REST_Request $request): WP_REST_Response
|
{
|
$user_id = get_current_user_id();
|
|
global $wpdb;
|
$table = $wpdb->prefix . $this->table;
|
|
$failed_operations = $wpdb->get_results($wpdb->prepare("
|
SELECT id, type, error_message, failed_items, retries, created_at, updated_at
|
FROM $table
|
WHERE user_id = %d
|
AND status IN ('failed', 'completed_with_errors')
|
AND (error_message IS NOT NULL OR failed_items IS NOT NULL)
|
ORDER BY updated_at DESC
|
LIMIT 20
|
", $user_id), ARRAY_A);
|
|
foreach ($failed_operations as &$op) {
|
$op['failed_items'] = json_decode($op['failed_items'] ?? '[]', true);
|
$op['error_details'] = $this->parseErrorMessage($op['error_message']);
|
}
|
|
return new WP_REST_Response([
|
'errors' => $failed_operations,
|
'total' => count($failed_operations)
|
]);
|
}
|
protected function parseErrorMessage(string $error_message): array
|
{
|
if (str_contains($error_message, ' | ')) {
|
$parts = explode(' | ', $error_message);
|
return [
|
'original_error' => $parts[0] ?? '',
|
'cleanup_reason' => $parts[1] ?? ''
|
];
|
}
|
|
return [
|
'original_error' => $error_message,
|
'cleanup_reason' => null
|
];
|
}
|
}
|