80, // Max CPU usage percentage 'memory' => 85, // Max memory usage percentage 'load_avg' => 1.6 // Max load average (80% of 2 cores) ]; protected int $maxPerBatch = 25; // Queue configuration protected array $config = [ 'max_operations_per_batch' => 25, // Reduced from 100 for better control 'max_concurrent_operations' => 100, // Adjusted for 2 CPU cores ]; protected ?Cache $cache = null; protected int $ttl = 300; // Cache keys for different data types private const CACHE_QUEUE_STATUS = 'status'; private const CACHE_HAS_ITEMS = 'has_items'; private const CACHE_OPERATION_PREFIX = 'op_'; private const CACHE_USER_QUEUE_PREFIX = 'user_queue_'; private const CACHE_QUEUE_SIZE = 'queue_size'; public function __construct() { global $wpdb; $this->wpdb = $wpdb; $this->cache = Cache::for('queue', DAY_IN_SECONDS)->connect('user'); add_action('jvb_process_queue', [ $this, 'checkQueue' ]); add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']); add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']); jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']); add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3); // add_filter('jvbAdminSubpages', [$this, 'addSubpage'], 10, 1); if ( ! wp_next_scheduled( 'jvb_process_queue' ) ) { wp_schedule_event( time(), 'every-minute', 'jvb_process_queue' ); } if ( ! wp_next_scheduled( 'jvb_queue_maintenance' ) ) { wp_schedule_event( time(), 'hourly', 'jvb_queue_maintenance' ); } } public function addSubpage(array $subpages):array { $subpages[] = [ 'page_title' => 'Operation Queue', // page_title 'menu_title' => 'Queue', // menu_title 'capability' => 'manage_options', // capability 'menu_slug' => 'operation-queue', // menu_slug (will become BASE.'integrations') 'callback' => [$this, 'renderAdminPage'], // callback ]; return $subpages; } public function registerAdminAction():void { $admin = JVB()->admin(); $admin->registerAction( 'Restart Stuck Operations', 'restart-stuck-operations', 'manage_options', 'all' ); $admin->registerAction( 'Unlock Queue', 'unlock-operation-queue', 'manage_options', 'all' ); } /** * @param WP_REST_Response $response * @param string $action * * @return bool|WP_REST_Response */ public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool { switch ($action) { case 'unlock-operation-queue': $this->unlockQueue(); return new WP_REST_Response([ 'success' => true, 'message' => 'Unlocked Queue' ]); case 'restart-stuck-operations': $this->restartStuckOperations(); return new WP_REST_Response([ 'success' => true, 'message' => 'Restarted Stuck Operations' ]); default: return $response; } } protected function getQueueInfo(): array { $cacheKey = 'queue_info'; $cached = $this->cache->get($cacheKey); if ($cached !== false) { return $cached; } $table = $this->wpdb->prefix . $this->table; $current_time = current_time('mysql'); // Get both count and existence in one query $result = $this->wpdb->get_row($this->wpdb->prepare(" SELECT COUNT(*) as total, SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active, SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled FROM $table WHERE status IN ('pending', 'processing', 'scheduled') ", $current_time)); $info = [ 'total' => (int)$result->total, 'active' => (int)$result->active, 'ready' => (int)$result->active + (int)$result->ready_scheduled, 'has_items' => ((int)$result->active + (int)$result->ready_scheduled) > 0 ]; $this->cache->set($cacheKey, $info, 30); return $info; } protected function hasItemsInQueue(): bool { return $this->getQueueInfo()['has_items']; } /** * Cron job that checks if the queue needs to be processed * @return void */ public function checkQueue(): void { if (!$this->hasItemsInQueue()) { return; } if ($this->isQueueLocked()) { return; } // Peek at what operations we might process $batch_size = $this->getAdaptiveBatchSize(); $operations = $this->getOperations($batch_size); // Only check server load for heavy operations if ($this->shouldCheckServerLoad($operations) && !$this->isServerIdle()) { error_log('Server not idle for heavy operations'); return; } $this->lockQueue(); $this->applyServerSettings(); $this->processQueue($operations); $this->unlockQueue(); } public function hourlyMaintenance(): void { if ($this->isQueueLocked()) { return; // Don't run maintenance while processing } $this->cleanupStuckOperations(); $this->restartStuckOperations(); // $this->pruneOldMetrics(); // Optional: clean old metrics } protected function isHeavyOperation(string $type): bool { $heavyOperations = [ 'file_upload' => true, 'image_processing' => true, 'rebuild_user_term_index' => true, 'sync_content_taxonomy_tables' => true, 'email_notification_digest' => true, 'taxonomy_relationships' => true, ]; return isset($heavyOperations[$type]); } protected function shouldCheckServerLoad(array $operations): bool { // Check if any operations are heavy foreach ($operations as $op) { if ($this->isHeavyOperation($op->type)) { return true; } } return false; } /** * Check if the queue is currently locked for processing * @return bool */ protected function isQueueLocked(): bool { $lock_name = BASE . '_queue_lock'; // Check if lock exists and is recent $result = $this->wpdb->get_var($this->wpdb->prepare( "SELECT IS_USED_LOCK(%s)", $lock_name )); return $result !== null; } /** * Check if server is relatively idle (very low load) * @return bool */ private function isServerIdle():bool { // Consider server idle if load is below 50% of our threshold if (function_exists('sys_getloadavg')) { $load = sys_getloadavg(); if ($load[0] > ($this->load_thresholds['load_avg'] * 0.5)) { return false; } } // Consider memory idle if usage is below 60% of our threshold $memory_usage = $this->getMemoryUsage(); if ($memory_usage > ($this->load_thresholds['memory'] * 0.6)) { return false; } return true; } /** * Acquire a lock on the queue * @return bool */ protected function lockQueue(): bool { $lock_name = BASE . '_queue_lock'; // Try to acquire lock with 0 timeout (non-blocking) $result = $this->wpdb->get_var($this->wpdb->prepare( "SELECT GET_LOCK(%s, 0)", $lock_name )); return $result === '1'; } /** * Release the queue lock * @return void */ protected function unlockQueue(): void { $lock_name = BASE . '_queue_lock'; $this->wpdb->query($this->wpdb->prepare( "SELECT RELEASE_LOCK(%s)", $lock_name )); } /** * @return void */ protected function applyServerSettings():void { // Set max execution time for background processes set_time_limit(300); // 5 minutes // Optimize garbage collection gc_enable(); // Set process priority if possible if (function_exists('proc_nice')) { proc_nice(10); // Lower priority for background tasks } } /** * @param object $operation * * @return bool */ protected function areDependenciesMet(object $operation): bool { if (empty($operation->dependencies)) { return true; } // Add timeout check for stuck dependencies // $created_time = strtotime($operation->created_at); // $max_wait = 3600; // 1 hour max wait // // if (time() - $created_time > $max_wait) { // error_log("Operation {$operation->id} timed out waiting for dependencies"); // $this->markAsPermanentlyFailed($operation->id, 'Dependencies timeout after 1 hour'); // return false; // } $dependencies = is_string($operation->dependencies) ? json_decode($operation->dependencies, true) : $operation->dependencies; if (is_string($dependencies)) { $dependencies = explode(',',$dependencies); } // Batch check all dependencies at once $statuses = $this->getOperationStatuses($dependencies); foreach ($dependencies as $dep_id) { if (!isset($statuses[$dep_id]) || $statuses[$dep_id]->status !== 'completed') { return false; } } return true; } public function getOperationStatuses(array $operation_ids): array { if (empty($operation_ids)) { return []; } $placeholders = implode(',', array_fill(0, count($operation_ids), '%s')); $table = $this->wpdb->prefix . $this->table; $results = $this->wpdb->get_results($this->wpdb->prepare( "SELECT id, status FROM $table WHERE id IN ($placeholders)", ...$operation_ids ), OBJECT_K); return $results ?: []; } /** * Queue or update an operation with intelligent batching * * @param string $type Operation type * @param int $user_id User ID * @param array $data Operation data * @param array $options Operation options * - operation_id: string, * - priority: string, (high, normal, low) Operation priority * - delay: int Time window in seconds before processing * - scheduled: string, as formatted by sanitizeDateTime in MetaSanitizer.php * - merge: string, //replace, append, merge * - chunk_key: string The array key to split large operations by * - chunk_size: int The batch size to process at once * @return array|WP_Error */ public function queueOperation(string $type, int $user_id, array $data, array $options = []): array|WP_Error { try { $table = $this->wpdb->prefix . $this->table; // Extract options $priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal'; $delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0; $scheduledFor = (array_key_exists('scheduled', $options)) ? $options['scheduled'] : ''; $merge = (array_key_exists('merge', $options) && in_array($options['merge'], ['merge', 'append', 'replace'])) ? $options['merge'] : 'merge'; $operation_id = $options['operation_id'] ?? false; $chunk_key = $options['chunk_key'] ?? null; $chunk_size = $options['chunk_size'] ?? null; // Calculate operation count based on chunk_key if provided $count = 1; // Default if ($chunk_key && isset($data[$chunk_key])) { if (is_array($chunk_key)) { // Support multiple keys (like favourites_batch with 'adds' and 'removes') $total = 0; foreach ($chunk_key as $key) { if (isset($data[$key]) && is_array($data[$key])) { $total += count($data[$key]); } } $count = max(1, $total); } else { // Single key $count = max(1, count($data[$chunk_key])); } } // Generate operation ID $operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id); // Check for existing operation $existing = $this->getOperation($operation_id); $dependencies = $options['depends_on'] ?? []; if (is_string($dependencies)) { $dependencies = explode(',',$dependencies); } $scheduled_at = $this->calculateScheduledTime($delay === 0 ? $scheduledFor : $delay); // Prepare chunk config to store $chunk_config = null; if ($chunk_key && $chunk_size) { $chunk_config = [ 'key' => $chunk_key, 'size' => $chunk_size ]; } if ($existing) { if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) { if ($merge === 'merge') { $existing_data = json_decode($existing->request_data ?? '{}', true); $data = $this->deepMerge($existing_data, $data); $existing_dependencies = json_decode($existing->dependencies ?? '[]', true); if (!is_array($existing_dependencies)) { $existing_dependencies = []; } $dependencies = array_unique(array_merge($existing_dependencies, $dependencies)); } // Keep earliest scheduled time if ($delay > 0 || $scheduledFor !== '') { $scheduled_at = $this->getEarliestScheduledTime( $existing->scheduled_at ?? $existing->created_at, $scheduled_at ); } // Store chunking config in metadata $metadata = json_decode($existing->metadata ?? '{}', true); if ($chunk_config) { $metadata['chunk_config'] = $chunk_config; } $result = $this->wpdb->update( $table, [ 'request_data' => json_encode($data), 'dependencies' => json_encode($dependencies), 'merge' => $merge, 'count' => $count, 'priority' => $priority, 'scheduled_at' => $scheduled_at, 'metadata' => json_encode($metadata), 'updated_at' => current_time('mysql'), ], ['id' => $operation_id], ['%s', '%s', '%s', '%d', '%s', '%s', '%s', '%s'], ['%s'] ); if ($result === false) { throw new Exception("Failed to update operation: " . $this->wpdb->last_error); } } else if ($merge === 'append') { $operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4); $existing = null; } } if (!$existing) { // Prepare metadata with chunk config $metadata = $chunk_config ? ['chunk_config' => $chunk_config] : []; $result = $this->wpdb->insert( $table, [ 'id' => $operation_id, 'type' => $type, 'user_id' => $user_id, 'request_data' => json_encode($data), 'count' => $count, 'priority' => $priority, 'status' => $delay > 0 ? 'scheduled' : 'pending', 'dependencies' => json_encode($dependencies), 'metadata' => json_encode($metadata), 'merge' => $merge, 'scheduled_at' => $scheduled_at, 'created_at' => current_time('mysql'), 'updated_at' => current_time('mysql'), ], ['%s', '%s', '%d', '%s', '%d', '%s', '%s', '%s', '%s', '%s', '%s', '%s'] ); if ($result === false) { throw new Exception("Failed to insert operation: " . $this->wpdb->last_error); } } $this->invalidateUserQueue($user_id); $this->runQueueOnShutdown(); return [ 'success' => true, 'operation_id' => $operation_id, 'updated_existing' => !empty($existing), 'scheduled_at' => $scheduled_at, 'queue_status' => $this->getQueueStatus() ]; } catch (Exception $e) { JVB()->error()->log('queue', $e->getMessage(), $data, 'high'); return new WP_Error('queue_failed', $e->getMessage()); } } protected function deepMerge(array $existing, array $new): array { $merged = $existing; if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) { return array_merge($existing, $new); } foreach ($new as $key => $newValue) { if (!array_key_exists($key, $existing)) { $merged[$key] = $newValue; } else { $existingValue = $existing[$key]; if (is_array($existingValue) && is_array($newValue)) { if ($this->isAssociativeArray($existingValue) || $this->isAssociativeArray($newValue)) { // Recursive merge going deeper, if any of them are associative arrays $merged[$key] = $this->deepMerge($existingValue, $newValue); } else { $containsComplex = $this->containsComplexData($existingValue) || $this->containsComplexData($newValue); if ($containsComplex) { // Just merge and re-index - preserves all items from chunks $merged[$key] = array_values(array_merge($existingValue, $newValue)); } else { // Simple scalar arrays - use unique merge $merged[$key] = array_unique(array_merge($existingValue, $newValue), SORT_REGULAR); } } } elseif (is_array($existingValue) && !is_array($newValue)) { // The existing value is an array, but the new one isn't // Check if it's safe to use in_array (only if existing doesn't contain arrays) $containsArrays = false; foreach ($existingValue as $item) { if (is_array($item) || is_object($item)) { $containsArrays = true; break; } } if (!$containsArrays) { // Safe to use in_array with strict comparison if (!in_array($newValue, $existingValue, true)) { $merged[$key][] = $newValue; } } else { // For arrays containing complex types, just append // (avoiding comparison that might cause issues) $merged[$key][] = $newValue; } } elseif (!is_array($existingValue) && is_array($newValue)) { // The opposite check as above $merged[$key] = array_unique(array_merge([$existingValue], $newValue), SORT_REGULAR); } else { // Override the existing with the new, both are scalars $merged[$key] = $newValue; } } } return $merged; } /** * Check if an array is associative * @param array $arr * @return bool */ protected function isAssociativeArray(array $arr): bool { if (empty($arr)) { return false; } return array_keys($arr) !== range(0, count($arr) - 1); } /** * Check if an array contains complex data (arrays or objects) * @param array $arr * @return bool */ protected function containsComplexData(array $arr): bool { foreach ($arr as $item) { if (is_array($item) || is_object($item)) { return true; } } return false; } protected function getEarliestScheduledTime(string $existing, string $new): string { $existing_time = strtotime($existing); $new_time = strtotime($new); return $existing_time < $new_time ? $existing : $new; } protected function calculateScheduledTime(int|string $delay): string { if (is_int($delay) && $delay > 0) { return date('Y-m-d H:i:s', current_time('timestamp') + $delay); } if ($delay !== '') { return strtotime($delay); } return current_time('mysql'); } protected function checkOperationId(?string $operation_id, string $type, string $merge, int $user_id): string { if ($merge === 'merge') { $existing = $this->getUserOperations( $user_id, [ 'status' => 'pending', 'type' => $type, 'merge' => $merge, 'limit' => 1 ]); if ($existing) { $operation_id = $existing[0]->id; } } if (!$operation_id) { $operation_id = uniqid('op_'); } if (!str_starts_with($operation_id, 'u')) { $operation_id = 'u' . $user_id . '_' . $operation_id; } return $operation_id; } protected function runQueueOnShutdown(): void { // Only add if not already scheduled AND if operation is high priority or small if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) { add_action('shutdown', [$this, 'processQueueOnShutdown'], 100); } } /** * Process queue after request is complete */ public function processQueueOnShutdown(): void { // Remove so it only runs once per request remove_action('shutdown', [$this, 'processQueueOnShutdown']); // Send response to user first if using PHP-FPM if (function_exists('fastcgi_finish_request')) { fastcgi_finish_request(); } // Now process the queue $this->checkQueue(); } /** * @param string $operation_id * @return object|false */ public function getOperation(string $operation_id, bool $skipCache = false): object|false { try { $table = $this->wpdb->prefix . $this->table; $operation = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM $table WHERE id = %s", $operation_id )); return $operation ?: false; } catch (Exception) { return false; } } /** * Process queue with adaptive batch sizing * @return void */ public function processQueue(array $operations = []): void { try { // If no operations passed, fetch them if (empty($operations)) { $batch_size = $this->getAdaptiveBatchSize(); $operations = $this->getOperations($batch_size); } // Pre-warm cache for dependencies check $this->prewarmDependencyCache($operations); foreach ($operations as $operation) { if (!$this->canProcessOperation($operation)) { continue; } if (!$this->areDependenciesMet($operation)) { continue; } $this->processOperation($operation); // Invalidate operation cache after processing $this->cache->forget(self::CACHE_OPERATION_PREFIX . $operation->id); $this->cache->forget(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id); } // Batch invalidate caches at the end $this->invalidateQueueCache(); $this->runQueueOnShutdown(); } catch (Exception $e) { error_log('Error processing queue: ' . $e->getMessage()); } } public function cancelOperation(string $operation_id): bool { $table = $this->wpdb->prefix . $this->table; // Only cancel if pending or scheduled $result = $this->wpdb->update( $table, [ 'status' => 'cancelled', 'completed_at' => current_time('mysql'), 'updated_at' => current_time('mysql'), 'error_message' => 'Cancelled by user' ], [ 'id' => $operation_id, 'status' => ['pending', 'scheduled'] ] ); if ($result) { $this->invalidateQueueCache('status'); return true; } return false; } /** * Pre-warm cache for dependency checks */ protected function prewarmDependencyCache(array $operations): void { $allDependencyIds = []; foreach ($operations as $op) { $deps = json_decode($op->dependencies ?? '[]', true); if (!is_array($deps)) { $deps = [$deps]; } $allDependencyIds = array_merge($allDependencyIds, $deps); } if (empty($allDependencyIds)) { return; } // Get all statuses at once $statuses = $this->getOperationStatuses(array_unique($allDependencyIds)); // Cache them individually foreach ($statuses as $id => $status) { $this->cache->set(self::CACHE_OPERATION_PREFIX . $id, $status, 300); } } /** * Get operations with optimized query and caching */ protected function getOperations(int $batch_size): array { $table = $this->wpdb->prefix . $this->table; $current_time = current_time('mysql'); $operations = $this->wpdb->get_results($this->wpdb->prepare(" SELECT * FROM $table WHERE ( status = 'pending' OR (status = 'scheduled' AND scheduled_at <= %s) ) AND retries < %d ORDER BY FIELD(priority, 'high', 'normal', 'low'), COALESCE(scheduled_at, created_at) LIMIT %d ", $current_time, $this->max_attempts, $batch_size)); return $operations ?: []; } protected function canProcessOperation(object $operation): bool { if ($operation->retries >= $this->max_attempts) { $this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts'); return false; } return true; } protected function markAsPermanentlyFailed(string $operation_id, string $reason): void { $table = $this->wpdb->prefix . $this->table; // Get the current operation to preserve original error $operation = $this->wpdb->get_row($this->wpdb->prepare( "SELECT error_message FROM $table WHERE id = %s", $operation_id )); // Preserve original error message, append cleanup reason $final_error_message = $reason; if (!empty($operation->error_message)) { $final_error_message = $operation->error_message . " | " . $reason; } $result = $this->wpdb->update($table, [ 'status' => 'failed', 'error_message' => $final_error_message, // ✅ Preserves original error 'completed_at' => current_time('mysql'), 'updated_at' => current_time('mysql') ], ['id' => $operation_id]); if ($result) { $this->invalidateQueueCache('status'); } } /** * Get operations for a specific user with caching */ public function getUserOperations(int $user_id, array $filters = []): array { $cacheKey = self::CACHE_USER_QUEUE_PREFIX . $user_id . '_' . md5(serialize($filters)); // // // Check cache first // $cached = $this->cache->get($cacheKey); // if ($cached !== false) { // return $cached; // } $table = $this->wpdb->prefix . $this->table; $where = ['user_id = %d']; $params = [$user_id]; if (!empty($filters['status'])) { $where[] = 'status = %s'; $params[] = $filters['status']; } if (!empty($filters['type'])) { $where[] = 'type = %s'; $params[] = $filters['type']; } if (!empty($filters['merge'])) { $where[] = 'merge = %s'; $params[] = $filters['merge']; } $limit = $filters['limit'] ?? 50; $query = "SELECT * FROM $table WHERE " . implode(' AND ', $where) . " ORDER BY created_at DESC LIMIT %d"; $params[] = $limit; $operations = $this->wpdb->get_results($this->wpdb->prepare($query, ...$params)); // Cache user operations $this->cache->set($cacheKey, $operations); return $operations ?: []; } public function getUserOperationStatuses(int $user_id): array { // Get user's operation IDs $table = $this->wpdb->prefix . $this->table; $operation_ids = $this->wpdb->get_col($this->wpdb->prepare( "SELECT id FROM $table WHERE user_id = %d ORDER BY created_at DESC LIMIT 20", $user_id )); return $this->getOperationStatuses($operation_ids); } /** * Check if user's queue has been modified since given timestamp * Returns true if modified, false if not (can send 304) */ public function isUserQueueModified(int $user_id, int $since_timestamp): bool { return $this->cache::lastModified("user_{$user_id}") > $since_timestamp; } protected function invalidateUserQueue(int $user_id): void { // This automatically: // 1. Updates HTTP timestamp for user_{$user_id} // 2. Flushes user-specific caches // 3. Triggers connected cache invalidation Cache::for($user_id)->flush(); } /** * Invalidate all queue-related caches */ protected function invalidateQueueCache(string $scope = 'all'): void { $cacheKeys = [ 'status' => [self::CACHE_QUEUE_STATUS, self::CACHE_QUEUE_SIZE], 'items' => [self::CACHE_HAS_ITEMS, 'queue_info'], 'all' => [ self::CACHE_QUEUE_STATUS, self::CACHE_HAS_ITEMS, self::CACHE_QUEUE_SIZE, 'queue_info' ] ]; $keys = $cacheKeys[$scope] ?? $cacheKeys['all']; foreach ($keys as $key) { $this->cache->forget($key); } $this->cache->touch(); if ($scope === 'all') { delete_transient('jvb_queue_status_counts'); } } /** * @return void */ protected function restartStuckOperations(): void { $table = $this->wpdb->prefix . $this->table; // Find operations that have been processing for too long $stuck_operations = $this->wpdb->get_results($this->wpdb->prepare(" SELECT * FROM $table WHERE status = 'processing' AND started_at < %s LIMIT 5", date('Y-m-d H:i:s', strtotime('-30 minutes')) )); foreach ($stuck_operations as $operation) { $retries = (int) $operation->retries + 1 ?? 1; if ($retries >= $this->max_attempts) { // Too many retries, mark as permanently failed $this->wpdb->update( $table, [ 'status' => 'failed_permanent', 'error_message' => 'Operation exceeded maximum retry attempts', 'updated_at' => current_time('mysql'), 'completed_at' => current_time('mysql') ], [ 'id' => $operation->id ] ); // Log this as a critical error JVB()->error()->log( '[OperationQueue]:restartStuckOperations', "Operation failed after maximum retries", [ 'operation_id' => $operation->id, 'type' => $operation->type, 'user_id' => $operation->user_id, 'retries' => $retries, 'data' => $operation->request_data ], 'critical' ); $this->notifyAdmin($operation); } else { // Reset to pending for another attempt with backoff $this->wpdb->update( $table, [ 'status' => 'pending', 'updated_at' => current_time('mysql'), 'started_at' => null, 'retries' => $retries, ], [ 'id' => $operation->id ] ); // Log the backoff JVB()->error()->log( '[OperationQueue]:restartStuckOperations', "Operation requeued with exponential backoff", [ 'operation_id' => $operation->id, 'retry_number' => $retries, ], 'warning' ); } } } /** * Implement intelligent retry delays with exponential backoff */ public function scheduleRetry(string $operation_id, int $attempt): void { // Calculate delay with exponential backoff and jitter $base_delay = 5; // 5 seconds $max_delay = 3600; // 1 hour max $delay = min($base_delay * pow(2, $attempt), $max_delay); // Add jitter to prevent thundering herd $jitter = rand(0, $delay * 0.1); $delay += $jitter; $scheduled_at = date('Y-m-d H:i:s', time() + $delay); $table = $this->wpdb->prefix . $this->table; $this->wpdb->update( $table, [ 'status' => 'scheduled', 'scheduled_at' => $scheduled_at, 'retries' => $attempt ], ['id' => $operation_id], ['%s', '%s', '%d'], ['%s'] ); } /** * Notify admin about repeatedly failed operation * * @param object $operation The failed operation * * @return bool */ protected function notifyAdmin(object $operation):bool { $admin_email = get_option('admin_email'); $site_name = get_bloginfo('name'); $subject = "[$site_name] Queue Operation Failed After Multiple Attempts"; $message = "A queue operation has failed after multiple retry attempts:\n\n"; $message .= "Operation ID: $operation->id\n"; $message .= "Operation Type: $operation->type\n"; $message .= "User ID: $operation->user_id\n"; $message .= "Created At: $operation->created_at\n"; $message .= "Retries: $operation->retries\n\n"; if (!empty($operation->error_message)) { $message .= "Error Message: $operation->error_message\n\n"; } $message .= "Please check the error logs for more details."; return JVB()->email()->sendEmail($admin_email, $subject, $message); } /** * @param object $operation * * @return array */ protected function processOperation(object $operation): array { try { $table = $this->wpdb->prefix . $this->table; // Mark as processing... $updated = $this->wpdb->query($this->wpdb->prepare( "UPDATE $table SET status = 'processing', started_at = %s, updated_at = %s WHERE id = %s AND status IN ('pending', 'scheduled') AND (scheduled_at IS NULL OR scheduled_at <= %s)", current_time('mysql'), current_time('mysql'), $operation->id, current_time('mysql') )); if (!$updated) { throw new Exception('Operation no longer available for processing'); } $this->invalidateUserQueue($operation->user_id); $data = json_decode($operation->request_data, true); $progress_count = (int) $operation->progress_count; $count = (int) $operation->count; $failed_items = json_decode($operation->failed_items ?: '[]', true); // Process chunk $chunk = $this->getNextChunk($operation); // Apply filter - will return ['success' => bool, 'result' => mixed] $filterResult = apply_filters( BASE.'handle_bulk_operation', ['success' => false, 'result' => 'No handler for operation item'], // Default if no filter $operation, $chunk ); // Ensure we have the expected format if (!is_array($filterResult) || !isset($filterResult['success'])) { // Log invalid filter response and quit early $invalid_response = is_object($filterResult) ? get_class($filterResult) : gettype($filterResult); JVB()->error()->log( '[OperationQueue]:processOperation', "Invalid filter response format - expected array with 'success' key", [ 'operation_id' => $operation->id, 'operation_type' => $operation->type, 'filter_response_type' => $invalid_response, 'filter_response' => $filterResult ], 'error' ); // Mark operation as failed $this->wpdb->update( $table, [ 'status' => 'failed', 'error_message' => "Invalid filter response: expected array with 'success' key, got {$invalid_response}", 'result' => json_encode([ 'success' => false, 'error' => "Invalid filter response format" ]), 'updated_at' => current_time('mysql'), 'completed_at' => current_time('mysql') ], ['id' => $operation->id] ); $this->invalidateQueueCache('status'); return [ 'success' => false, 'result' => "Invalid filter response format" ]; } // Additional validation for WP_Error handling if (is_wp_error($filterResult)) { $filterResult = [ 'success' => false, 'result' => $filterResult->get_error_message() ]; } $newCount = $progress_count + $chunk['progress']; if ($filterResult['success']) { // Success path $oldResult = json_decode($operation->result ?: '{}', true); if (!is_array($oldResult)) { $oldResult = []; } if (!is_array($filterResult['result'])) { $filterResult['result'] = [$filterResult['result']]; } // Store the result data $resultToStore = $this->deepMerge($oldResult, $filterResult['result']); $resultToStore['processed_at'] = current_time('mysql'); // Check if operation is complete if ($newCount >= $count) { $status = empty($failed_items) ? 'completed' : 'completed_with_errors'; $result = $this->wpdb->update( $table, [ 'progress_count' => $newCount, 'result' => json_encode($resultToStore), 'status' => $status, 'completed_at' => current_time('mysql'), 'updated_at' => current_time('mysql') ], ['id' => $operation->id], ['%d', '%s', '%s', '%s', '%s'], ['%s'] ); // Now do post-completion tasks $this->invalidateUserQueue($operation->user_id); $this->trackOperationMetrics($operation->id); } else { // More work to do - just update progress and result $this->wpdb->update( $table, [ 'progress_count' => $newCount, 'result' => json_encode($resultToStore), 'status' => 'pending', // Back to pending for next chunk 'updated_at' => current_time('mysql') ], ['id' => $operation->id], ['%d', '%s', '%s', '%s'], ['%s'] ); $this->invalidateUserQueue($operation->user_id); } } else { // Error path $error_message = is_string($filterResult['result']) ? $filterResult['result'] : json_encode($filterResult['result']); $error_context = [ 'index' => $progress_count, 'error' => $error_message, 'timestamp' => current_time('mysql'), 'retry_count' => $operation->retries, 'type' => $operation->type ]; $failed_items[] = $error_context; if ($newCount >= $count) { // Failed but complete $this->wpdb->update( $table, [ 'failed_items' => json_encode($failed_items), 'progress_count' => $newCount, 'result' => json_encode([ 'success' => false, 'error' => $error_message, 'failed_items' => $failed_items ]), 'status' => 'failed', 'error_message' => $error_message, 'completed_at' => current_time('mysql'), 'updated_at' => current_time('mysql') ], ['id' => $operation->id], ['%s', '%d', '%s', '%s', '%s', '%s', '%s'], ['%s'] ); $this->invalidateUserQueue($operation->user_id); } else { // Failed but more to process - continue with next chunk $this->wpdb->update( $table, [ 'failed_items' => json_encode($failed_items), 'progress_count' => $newCount, 'status' => 'pending', 'error_message' => $error_message, 'updated_at' => current_time('mysql') ], ['id' => $operation->id], ['%s', '%d', '%s', '%s', '%s'], ['%s'] ); } } // Clear operation cache after any update $this->invalidateUserQueue($operation->user_id); return $filterResult; } catch (Exception $e) { // Handle exception... JVB()->error()->log( '[OperationQueue]:processOperation', "Exception during operation processing: " . $e->getMessage(), [ 'operation_id' => $operation->id, 'type' => $operation->type, 'progress_count' => $progress_count ?? 0, 'user_id' => $operation->user_id, 'file' => $e->getFile(), 'line' => $e->getLine(), 'trace' => $e->getTraceAsString() ], 'error' ); // Mark operation as failed $this->wpdb->update( $table, [ 'status' => 'failed', 'error_message' => $e->getMessage(), 'result' => json_encode([ 'success' => false, 'error' => $e->getMessage() ]), 'updated_at' => current_time('mysql'), 'completed_at' => current_time('mysql') ], ['id' => $operation->id] ); $this->invalidateQueueCache('status'); return [ 'success' => false, 'result' => $e->getMessage() ]; } } /** * Extract X amount of items from the operation data to process, based on complexity * * @param object $operation The current operation * * @return bool|array false if not batched, the extracted data to process if batchable */ protected function getNextChunk(object $operation): array { $data = json_decode($operation->request_data, true); // Get chunk config from metadata $metadata = json_decode($operation->metadata ?? '{}', true); $chunk_config = $metadata['chunk_config'] ?? null; // If no chunk config, return all data with progress of 1 if (!$chunk_config) { return array_merge($data, ['progress' => 1]); } $chunk_key = $chunk_config['key']; $chunk_size = $chunk_config['size']; $current_progress = (int)$operation->progress_count; // Handle single or multiple keys uniformly $keys = (array)$chunk_key; $batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size); // Merge non-chunked data with chunked batch $result = array_filter($data, function ($k) use ($keys) { return !in_array($k, $keys); }, ARRAY_FILTER_USE_KEY); // Add the batched data foreach ($batch as $k => $v) { $result[$k] = $v; } return $result; } protected function extractBatch(array $data, array $keys, int $currentProgress, int $batchSize): array { $result = ['progress' => 0]; $itemsExtracted = 0; // Calculate total items across all keys $totalItems = 0; $keyData = []; foreach ($keys as $key) { if (isset($data[$key]) && is_array($data[$key])) { $keyData[$key] = $data[$key]; $totalItems += count($data[$key]); } } // If current progress >= total, we're done if ($currentProgress >= $totalItems) { return ['progress' => 0]; } // Extract items starting from currentProgress $globalIndex = 0; foreach ($keyData as $key => $items) { $keyItemCount = count($items); // Skip this key if we haven't reached it yet if ($globalIndex + $keyItemCount <= $currentProgress) { $globalIndex += $keyItemCount; continue; } // Calculate start position within this key $startInKey = max(0, $currentProgress - $globalIndex); $remainingInBatch = $batchSize - $itemsExtracted; // Extract what we can from this key $extracted = array_slice($items, $startInKey, $remainingInBatch, true); if (!empty($extracted)) { $result[$key] = $extracted; $itemsExtracted += count($extracted); $result['progress'] += count($extracted); } // Stop if we've filled the batch if ($itemsExtracted >= $batchSize) { break; } $globalIndex += $keyItemCount; } return $result; } protected function updateUserQueueTimestamp(int $user_id) { Cache::touch("user_{$user_id}"); } /** * Track operation metrics with proper success checking * @param string $operationId * @return void */ protected function trackOperationMetrics(string $operationId): void { $metrics_table = $this->wpdb->prefix . $this->metricsTable; $operation = $this->getOperation($operationId, true); $today = date('Y-m-d'); // Calculate duration if available $duration = null; if (!empty($operation->started_at) && !empty($operation->completed_at)) { $started = strtotime($operation->started_at); $completed = strtotime($operation->completed_at); $duration = $completed - $started; } // Determine success status from the result JSON $success = false; if (!empty($operation->result)) { $result = json_decode($operation->result, true); if (is_array($result) && isset($result['success'])) { $success = (bool) $result['success']; } else { // Fallback: check status field $success = ($operation->status === 'completed' || $operation->status === 'completed_with_errors'); } } // Get or create today's record for this operation type $existing = $this->wpdb->get_row($this->wpdb->prepare( "SELECT * FROM $metrics_table WHERE date = %s AND type = %s", $today, $operation->type )); if ($existing) { // Update existing record $this->wpdb->update( $metrics_table, [ 'total_operations' => $existing->total_operations + 1, 'successful_operations' => $existing->successful_operations + ($success ? 1 : 0), 'failed_operations' => $existing->failed_operations + ($success ? 0 : 1), 'average_duration' => $duration ? (($existing->average_duration * $existing->total_operations) + $duration) / ($existing->total_operations + 1) : $existing->average_duration, 'total_items_processed' => $existing->total_items_processed + $operation->count, 'peak_queue_size' => max($existing->peak_queue_size, $this->getCurrentQueueSize()), 'peak_memory_usage' => max($existing->peak_memory_usage, memory_get_peak_usage(true)), 'peak_cpu_usage' => function_exists('sys_getloadavg') ? max($existing->peak_cpu_usage, sys_getloadavg()[0]) : $existing->peak_cpu_usage ], ['id' => $existing->id] ); } else { // Create new record $this->wpdb->insert( $metrics_table, [ 'date' => $today, 'type' => $operation->type, 'total_operations' => 1, 'successful_operations' => $success ? 1 : 0, 'failed_operations' => $success ? 0 : 1, 'average_duration' => $duration, 'total_items_processed' => $operation->count, 'peak_queue_size' => $this->getCurrentQueueSize(), 'peak_memory_usage' => memory_get_peak_usage(true), 'peak_cpu_usage' => function_exists('sys_getloadavg') ? sys_getloadavg()[0] : null ] ); } } /** * Helper method to check if an operation was successful * @param object|array $operation Operation object or decoded result array * @return bool */ public function isOperationSuccessful($operation): bool { if (is_object($operation)) { // Check the result field if (!empty($operation->result)) { $result = json_decode($operation->result, true); if (is_array($result) && isset($result['success'])) { return (bool) $result['success']; } } // Fallback to status field return in_array($operation->status, ['completed', 'completed_with_errors']); } if (is_array($operation)) { // Direct result array return isset($operation['success']) && $operation['success']; } return false; } /** * Get current size of queue * @return int */ /** * Get current queue size with caching */ protected function getCurrentQueueSize(): int { return $this->getQueueInfo()['total']; } /** * Send daily metrics report to admin * @return void */ public function emailDailyMetricsReport():void { $metrics_table = $this->wpdb->prefix . $this->metricsTable; $yesterday = date('Y-m-d', strtotime('-1 day')); $metrics = $this->wpdb->get_results($this->wpdb->prepare( "SELECT * FROM $metrics_table WHERE date = %s", $yesterday )); if (empty($metrics)) { return; } $admin_email = get_option('admin_email'); $site_name = get_bloginfo('name'); $subject = "[$site_name] Daily Queue Performance - " . $yesterday; // Calculate totals $total_ops = 0; $total_success = 0; $total_failed = 0; $total_items = 0; foreach ($metrics as $metric) { $total_ops += $metric->total_operations; $total_success += $metric->successful_operations; $total_failed += $metric->failed_operations; $total_items += $metric->total_items_processed; } $success_rate = round(($total_success / max(1, $total_ops)) * 100, 1); $message = JVB()->email()->h1('Daily Queue Performance Report'); $message .= sprintf('

Report for %s

', $yesterday); // Summary stats in grid $stats = [ JVB()->email()->stat($total_ops, 'Operations'), JVB()->email()->stat($total_success, 'Successful', '✓'), JVB()->email()->stat($total_failed, 'Failed', $total_failed > 0 ? '⚠' : ''), JVB()->email()->stat($success_rate . '%', 'Success Rate') ]; $message .= JVB()->email()->grid($stats, 4); $message .= JVB()->email()->spacer(20); // Alert if success rate is low if ($success_rate < 90) { $message .= JVB()->email()->alert( sprintf('Success rate of %s%% is below the 90%% threshold', $success_rate), 'warning' ); } $message .= JVB()->email()->h2('Details by Operation Type'); // Details for each operation type foreach ($metrics as $metric) { $details = []; $details[] = ['label' => 'Total', 'value' => $metric->total_operations]; $details[] = ['label' => 'Success', 'value' => JVB()->email()->badge($metric->successful_operations, 'success')]; $details[] = ['label' => 'Failed', 'value' => $metric->failed_operations > 0 ? JVB()->email()->badge($metric->failed_operations, 'error') : '0']; if ($metric->average_duration) { $details[] = ['label' => 'Avg Duration', 'value' => round($metric->average_duration, 2) . 's']; } $details[] = ['label' => 'Items Processed', 'value' => number_format($metric->total_items_processed)]; if ($metric->peak_memory_usage) { $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2); $details[] = ['label' => 'Peak Memory', 'value' => $memory_mb . ' MB']; } $message .= JVB()->email()->card( JVB()->email()->table($details), esc_html($metric->type) ); } // Current queue status $pending_count = $this->getCurrentQueueSize(); if ($pending_count > 0) { $message .= JVB()->email()->spacer(20); $message .= JVB()->email()->notice( sprintf('Current Queue: %d operations pending', $pending_count) ); } $message .= JVB()->email()->spacer(20); $message .= JVB()->email()->button(admin_url('admin.php?page=jvb-queue'), 'View Queue Dashboard'); JVB()->email()->sendEmail($admin_email, $subject, $message, 'QUEUE REPORT'); } /** * @return int */ protected function getCPUUsage():int { $cpu = @sys_getloadavg(); return $cpu ? ( $cpu[0] * 100 / 2 ) : 0; // Normalize for 2 cores } /** * @return int */ protected function getMemoryUsage():int { $memory_usage = memory_get_usage(true); $memory_limit = $this->getMemoryLimitBytes(); return (int)( $memory_usage / $memory_limit ) * 100; } /** * @return int */ protected function getMemoryLimitBytes():int { $memory_limit = ini_get('memory_limit'); if (preg_match('/^(\d+)(.)$/', $memory_limit, $matches)) { $limit = $matches[1]; $unit = strtolower($matches[2]); switch ($unit) { case 'g': $limit *= 1024; case 'm': $limit *= 1024; case 'k': $limit *= 1024; } return $limit; } return 134217728; // 128MB default } /** * @return array */ /** * Get queue status with scheduled operations */ /** * Get queue status with caching */ public function getQueueStatus(): array { // Try cache first $cached = $this->cache->get(self::CACHE_QUEUE_STATUS); if ($cached !== false) { return $cached; } // If not cached, fetch from database $table = $this->wpdb->prefix . $this->table; $current_time = current_time('mysql'); // Get counts by status with single optimized query $status_counts = $this->wpdb->get_results(" SELECT status, COUNT(*) as count, SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready FROM $table GROUP BY status ", OBJECT_K); $result = [ 'pending' => $status_counts['pending']->count ?? 0, 'scheduled' => $status_counts['scheduled']->count ?? 0, 'scheduled_ready' => $status_counts['scheduled']->scheduled_ready ?? 0, 'processing' => $status_counts['processing']->count ?? 0, 'completed' => $status_counts['completed']->count ?? 0, 'failed' => $status_counts['failed']->count ?? 0, 'total' => array_sum(array_column($status_counts, 'count')) ]; // Cache the result $this->cache->set(self::CACHE_QUEUE_STATUS, $result); return $result; } /** * Calculate server load factor * * @return float Load factor (1.0 = normal, >1.0 = high load) * @return float */ private function calculateServerLoadFactor():float { // Get memory usage percentage $memory_usage = memory_get_usage(true) / $this->getMemoryLimitBytes(); // Get CPU load if available if (function_exists('sys_getloadavg')) { $load = sys_getloadavg(); $cpu_cores = 2; // Adjust for your server $load_factor = $load[0] / $cpu_cores; } else { $load_factor = 1.0; // Default if not available } // Calculate combined factor return ( $load_factor * 0.7 ) + ( $memory_usage * 0.3 ); } /** * Dynamically adjust batch size based on server load * @return float */ protected function getAdaptiveBatchSize():float { $load_factor = $this->calculateServerLoadFactor(); if ($load_factor > 1.5) { // Server is under heavy load return max(5, floor($this->maxPerBatch / 5)); } elseif ($load_factor > 1.0) { // Moderate load return max(10, floor($this->maxPerBatch / 2)); } else { // Normal load return $this->maxPerBatch; } } /** * Get a specific column value from an operation by ID * * @param string $operation_id The ID of the operation * @param string $column The column name to retrieve (default: 'result') * @param bool $decode_json Whether to JSON decode the retrieved value (default: true) * @param string|null $status Filter by operation status (default: null - any status) * * @return mixed|false The requested value or false if not found */ public function getOperationValue(string $operation_id, string $column, bool $decode_json = true, string|null $status = null):mixed { try { $table = $this->wpdb->prefix . $this->table; // Prepare the query with status filter if provided $sql = "SELECT {$this->wpdb->_escape($column)} FROM $table WHERE id = %s"; $params = [ $operation_id ]; // Get the column value $value = $this->wpdb->get_var($this->wpdb->prepare($sql, $params)); if ($value === null) { return false; } // Decode JSON if requested and column appears to be JSON if ($decode_json && $value && (str_starts_with(trim($value), '{') || str_starts_with(trim($value), '['))) { $decoded = json_decode($value, true); // Return the decoded value only if decode was successful if (json_last_error() === JSON_ERROR_NONE) { return $decoded; } } return $value; } catch (Exception $e) { JVB()->error()->log( '[OperationQueue]:getOperationValue', 'Error retrieving operation value: ' . $e->getMessage(), [ 'operation_id' => $operation_id, 'column' => $column ], 'error' ); return false; } } /** * Get average processing time for an operation type * * @param string $type The type of operation * * @return float Average seconds per operation */ private function getAverageProcessingTime(string $type):float { $stats = get_option(BASE . 'average_operation_time', []); //Return stored stat, if available if (!empty($stats) && array_key_exists($type, $stats)) { return (float) $stats[$type]; } //Build statistics if not available $stats[$type] = $this->buildAverageProcessingTime($type); return $stats[$type]; } /** * @return array */ private function getAllAverageProcessingTimes():array { return get_option(BASE . 'average_operation_time', []); } /** * @param string $type * * @return float */ private function buildAverageProcessingTime(string $type): float { // Get existing stats from options table $stats = get_option(BASE . 'average_operation_time', []); // If there's already a saved value for this operation type, return it if (isset($stats[$type]) && is_numeric($stats[$type])) { return (float)$stats[$type]; } // Query the database for average processing time $table = $this->wpdb->prefix . $this->table; $avgTime = $this->wpdb->get_var( $this->wpdb->prepare( "SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at) / count) FROM {$table} WHERE type = %s AND status = 'completed'", $type ) ); // If query returns null or 0, use default values based on operation type if ($avgTime === null || $avgTime === false || (float)$avgTime <= 0) { // Default times for different operation types $defaultTimes = [ 'email_notification_digest' => 20.0, 'image_upload' => 10.0, 'image_processing' => 5.0, 'batch_upload' => 10.0, 'artist_bulk_approval' => 15.0, 'taxonomy_relationships' => 60.0, 'rebuild_user_term_index' => 10.0, 'bio_update' => 12.0, 'featured_image' => 15.0, 'content_update' => 8.0, 'batch_creation' => 20.0, 'favourites_batch' => 10.0, 'favourite_list_add' => 5.0, 'favourite_list_remove' => 5.0, 'notification_mark_all_as_read' => 15.0, 'sync_content_taxonomy_tables' => 20.0, 'temporary_cleanup' => 10.0, ]; // Use type-specific default or general default $avgTime = (array_key_exists($type, $defaultTimes)) ? $defaultTimes[$type] : 3.0; } // Ensure it's a float and save it $avgTime = (float)$avgTime; $stats[$type] = $avgTime; update_option(BASE . 'average_operation_time', $stats); return $avgTime; } public function cleanupStuckOperations(): void { $table = $this->wpdb->prefix . $this->table; // Find operations that exceeded retry limits but weren't marked as failed $stuck_operations = $this->wpdb->get_results($this->wpdb->prepare(" SELECT id, type, retries FROM $table WHERE retries >= %d AND status NOT IN ('failed', 'completed') ", $this->max_attempts)); if (!empty($stuck_operations)) { foreach ($stuck_operations as $operation) { $this->markAsPermanentlyFailed( $operation->id, "Cleanup: Operation exceeded {$this->max_attempts} attempts (had {$operation->retries})" ); } } // Also find operations stuck in processing for too long $long_processing = $this->wpdb->get_results($this->wpdb->prepare(" SELECT id, type, started_at FROM $table WHERE status = 'processing' AND started_at < %s ", date('Y-m-d H:i:s', strtotime('-1 hour')))); if (!empty($long_processing)) { foreach ($long_processing as $operation) { $this->wpdb->update($table, [ 'status' => 'pending', 'started_at' => null, 'retries' => $this->wpdb->get_var($this->wpdb->prepare( "SELECT retries FROM $table WHERE id = %s", $operation->id )) + 1, 'updated_at' => current_time('mysql') ], ['id' => $operation->id]); } } } public function renderAdminPage(): void { $status = $this->getQueueStatus(); $health = $this->getQueueHealthReport(); ?>

Operation Queue Status

Current Status

⚠️ Stuck Operations

ID Type Status Age (minutes) Error
wpdb->prefix . BASE . '_operation_queue'; $report = [ 'status_counts' => [], 'stuck_operations' => [], 'error_patterns' => [], 'performance_stats' => [] ]; // Status counts $status_counts = $this->wpdb->get_results(" SELECT status, COUNT(*) as count, AVG(retries) as avg_retries FROM $table GROUP BY status ", OBJECT_K); foreach ($status_counts as $status => $data) { $report['status_counts'][$status] = [ 'count' => intval($data->count), 'avg_retries' => round(floatval($data->avg_retries), 2) ]; } // Stuck operations $report['stuck_operations'] = $this->wpdb->get_results(" SELECT id, type, status, retries, TIMESTAMPDIFF(MINUTE, created_at, NOW()) as age_minutes, error_message FROM $table WHERE ( retries >= 3 OR (status = 'processing' AND started_at < DATE_SUB(NOW(), INTERVAL 30 MINUTE)) OR (status = 'pending' AND created_at < DATE_SUB(NOW(), INTERVAL 2 HOUR)) ) ORDER BY created_at DESC LIMIT 10 ", ARRAY_A); // Error patterns $report['error_patterns'] = $this->wpdb->get_results(" SELECT SUBSTRING(error_message, 1, 100) as error_pattern, COUNT(*) as frequency, type FROM $table WHERE error_message IS NOT NULL AND updated_at > DATE_SUB(NOW(), INTERVAL 24 HOUR) GROUP BY type, SUBSTRING(error_message, 1, 100) HAVING frequency > 1 ORDER BY frequency DESC LIMIT 5 ", ARRAY_A); return $report; } protected function archiveToMetrics(int $days_to_keep): void { $table = $this->wpdb->prefix . $this->table; $metricsTable = $this->wpdb->prefix . $this->metricsTable; // Aggregate and insert metrics $this->wpdb->query($this->wpdb->prepare(" INSERT INTO $metricsTable (date, type, total_operations, successful_operations, failed_operations, average_duration, total_items_processed) SELECT DATE(completed_at) as date, type, COUNT(*) as total_operations, SUM(IF(status = 'completed', 1, 0)) as successful, SUM(IF(status LIKE 'failed%', 1, 0)) as failed, AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration, SUM(count) as items_processed FROM $table WHERE completed_at < DATE_SUB(NOW(), INTERVAL %d DAY) AND completed_at >= DATE_SUB(NOW(), INTERVAL %d DAY) GROUP BY DATE(completed_at), type ON DUPLICATE KEY UPDATE total_operations = VALUES(total_operations), successful_operations = VALUES(successful_operations), failed_operations = VALUES(failed_operations), average_duration = VALUES(average_duration), total_items_processed = VALUES(total_items_processed) ", $days_to_keep, $days_to_keep + 1)); } }