| | |
| | | |
| | | use JVBase\managers\CacheManager; |
| | | use Exception; |
| | | use JVBase\utility\Features; |
| | | use WP_Error; |
| | | use WP_REST_Response; |
| | | use WP_REST_Request; |
| | |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | | error_log('Unlocking Queue from admin action'); |
| | | $this->unlockQueue(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | |
| | | return; |
| | | } |
| | | |
| | | error_log('[queue] Checking queue...'); |
| | | |
| | | // Peek at what operations we might process |
| | | $batch_size = $this->getAdaptiveBatchSize(); |
| | |
| | | { |
| | | try { |
| | | $table = $this->wpdb->prefix . $this->table; |
| | | error_log('QUEUING OPERATION: '.print_r([ |
| | | 'type' => $type, |
| | | 'user_id' => $user_id, |
| | | 'data' => $data, |
| | | 'options' => $options, |
| | | ],true)); |
| | | |
| | | // Extract options |
| | | $priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal'; |
| | | $delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0; |
| | |
| | | // Generate operation ID |
| | | $operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id); |
| | | |
| | | error_log('Checked OperationId: '.print_r($operation_id, true)); |
| | | // Check for existing operation |
| | | $existing = $this->getOperation($operation_id); |
| | | error_log('Existing: '.print_r($existing, true)); |
| | | |
| | | $dependencies = $options['depends_on'] ?? []; |
| | | if (is_string($dependencies)) { |
| | |
| | | } |
| | | |
| | | if ($existing) { |
| | | error_log('[queue]:queueOperation - Already Existing operation... updating'); |
| | | |
| | | if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) { |
| | | error_log('Not appending operation, checking for merge'); |
| | | |
| | | if ($merge === 'merge') { |
| | | $existing_data = json_decode($existing->request_data ?? '{}', true); |
| | | $data = $this->deepMerge($existing_data, $data); |
| | |
| | | } |
| | | |
| | | } else if ($merge === 'append') { |
| | | error_log('Append operation, creating a new one'); |
| | | $operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4); |
| | | $existing = null; |
| | | } |
| | | } |
| | | |
| | | if (!$existing) { |
| | | error_log('Inserting new operation into table'); |
| | | // Prepare metadata with chunk config |
| | | $metadata = $chunk_config ? ['chunk_config' => $chunk_config] : []; |
| | | |
| | |
| | | ]; |
| | | |
| | | } catch (Exception $e) { |
| | | error_log("Error queueing operation: " . $e->getMessage()); |
| | | JVB()->error()->log('queue', $e->getMessage(), $data, 'high'); |
| | | return new WP_Error('queue_failed', $e->getMessage()); |
| | | } |
| | | } |
| | | |
| | | protected function updateLastModified(int $user_id) { |
| | | JVB()->routes('queue')->updateUserQueueTimestamp($user_id); |
| | | CacheManager::updateTimestamp("user_{$user_id}"); |
| | | } |
| | | |
| | | protected function deepMerge(array $existing, array $new): array |
| | |
| | | $merged = $existing; |
| | | |
| | | if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) { |
| | | error_log('It is an associative array!'); |
| | | return array_merge($existing, $new); |
| | | } |
| | | error_log('Not an associative array... moving on!'); |
| | | foreach ($new as $key => $newValue) { |
| | | if (!array_key_exists($key, $existing)) { |
| | | $merged[$key] = $newValue; |
| | |
| | | 'limit' => 1 |
| | | ]); |
| | | if ($existing) { |
| | | error_log('Found existing User operations: '.print_r($existing, true)); |
| | | $operation_id = $existing[0]->id; |
| | | } |
| | | } |
| | | |
| | | if (!$operation_id) { |
| | | $operation_id = uniqid('op_'); |
| | | error_log('Generated operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | if (!str_starts_with($operation_id, 'u')) { |
| | | $operation_id = 'u' . $user_id . '_' . $operation_id; |
| | | error_log('Added user ID to operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | return $operation_id; |
| | |
| | | { |
| | | // Only add if not already scheduled AND if operation is high priority or small |
| | | if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) { |
| | | error_log('Adding shutdown call'); |
| | | add_action('shutdown', [$this, 'processQueueOnShutdown'], 100); |
| | | } |
| | | } |
| | |
| | | protected function canProcessOperation(object $operation): bool |
| | | { |
| | | if ($operation->retries >= $this->max_attempts) { |
| | | error_log("Operation {$operation->id} exceeded retry limit: {$operation->retries} >= {$this->max_attempts}"); |
| | | $this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts'); |
| | | return false; |
| | | } |
| | |
| | | |
| | | if ($result) { |
| | | $this->invalidateQueueCache('status'); |
| | | error_log("Marked operation {$operation_id} as permanently failed: {$final_error_message}"); |
| | | } |
| | | } |
| | | |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | // Log retry schedule |
| | | error_log(sprintf( |
| | | "Operation %s scheduled for retry #%d at %s (delay: %ds)", |
| | | $operation_id, |
| | | $attempt, |
| | | $scheduled_at, |
| | | $delay |
| | | )); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | $message .= "Please check the error logs for more details."; |
| | | |
| | | return jvbMail($admin_email, $subject, $message); |
| | | return JVB()->email()->sendEmail($admin_email, $subject, $message); |
| | | } |
| | | |
| | | /** |
| | |
| | | ]; |
| | | } |
| | | |
| | | error_log('Filtered Result: '.print_r($filterResult, true)); |
| | | |
| | | $newCount = $progress_count + $chunk['progress']; |
| | | |
| | | if ($filterResult['success']) { |
| | |
| | | $filterResult['result'] = [$filterResult['result']]; |
| | | } |
| | | // Store the result data |
| | | error_log('Merging Old Result: '. print_r($oldResult, true)); |
| | | error_log('With Newer Result: '. print_r($filterResult['result'], true)); |
| | | $resultToStore = $this->deepMerge($oldResult, $filterResult['result']); |
| | | |
| | | error_log('Merged Result: '.print_r($resultToStore, true)); |
| | | |
| | | $resultToStore['processed_at'] = current_time('mysql'); |
| | | |
| | | // Check if operation is complete |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | |
| | | |
| | | |
| | | error_log('Completion result: '.print_r($result, true)); |
| | | // Now do post-completion tasks |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | |
| | | |
| | | protected function updateUserQueueTimestamp(int $user_id) |
| | | { |
| | | $key = "{$user_id}_queue_timestamp"; |
| | | $this->cache->set($key, time()); |
| | | |
| | | CacheManager::updateTimestamp("user_{$user_id}"); |
| | | } |
| | | |
| | | /** |
| | |
| | | * Send daily metrics report to admin |
| | | * @return void |
| | | */ |
| | | public function emailDailyMetricsReport():void |
| | | { |
| | | public function emailDailyMetricsReport():void |
| | | { |
| | | $metrics_table = $this->wpdb->prefix . $this->metricsTable; |
| | | $yesterday = date('Y-m-d', strtotime('-1 day')); |
| | | |
| | | $metrics_table = $this->wpdb->prefix . $this->metricsTable; |
| | | $yesterday = date('Y-m-d', strtotime('-1 day')); |
| | | $metrics = $this->wpdb->get_results($this->wpdb->prepare( |
| | | "SELECT * FROM $metrics_table WHERE date = %s", |
| | | $yesterday |
| | | )); |
| | | |
| | | // Get yesterday's metrics |
| | | $metrics = $this->wpdb->get_results($this->wpdb->prepare( |
| | | "SELECT * FROM $metrics_table WHERE date = %s", |
| | | $yesterday |
| | | )); |
| | | if (empty($metrics)) { |
| | | return; |
| | | } |
| | | |
| | | if (empty($metrics)) { |
| | | return; // No metrics to report |
| | | } |
| | | $admin_email = get_option('admin_email'); |
| | | $site_name = get_bloginfo('name'); |
| | | $subject = "[$site_name] Daily Queue Performance - " . $yesterday; |
| | | |
| | | $admin_email = get_option('admin_email'); |
| | | $site_name = get_bloginfo('name'); |
| | | // Calculate totals |
| | | $total_ops = 0; |
| | | $total_success = 0; |
| | | $total_failed = 0; |
| | | $total_items = 0; |
| | | |
| | | $subject = "[$site_name] Daily Queue Performance Report - " . date('Y-m-d', strtotime('-1 day')); |
| | | foreach ($metrics as $metric) { |
| | | $total_ops += $metric->total_operations; |
| | | $total_success += $metric->successful_operations; |
| | | $total_failed += $metric->failed_operations; |
| | | $total_items += $metric->total_items_processed; |
| | | } |
| | | |
| | | $message = "Daily Queue Performance Report for $yesterday\n\n"; |
| | | $success_rate = round(($total_success / max(1, $total_ops)) * 100, 1); |
| | | |
| | | $message .= "SUMMARY:\n"; |
| | | $total_ops = 0; |
| | | $total_success = 0; |
| | | $total_failed = 0; |
| | | $total_items = 0; |
| | | $message = JVB()->email()->h1('Daily Queue Performance Report'); |
| | | $message .= sprintf('<p>Report for <strong>%s</strong></p>', $yesterday); |
| | | |
| | | foreach ($metrics as $metric) { |
| | | $total_ops += $metric->total_operations; |
| | | $total_success += $metric->successful_operations; |
| | | $total_failed += $metric->failed_operations; |
| | | $total_items += $metric->total_items_processed; |
| | | } |
| | | // Summary stats in grid |
| | | $stats = [ |
| | | JVB()->email()->stat($total_ops, 'Operations'), |
| | | JVB()->email()->stat($total_success, 'Successful', '✓'), |
| | | JVB()->email()->stat($total_failed, 'Failed', $total_failed > 0 ? '⚠' : ''), |
| | | JVB()->email()->stat($success_rate . '%', 'Success Rate') |
| | | ]; |
| | | $message .= JVB()->email()->grid($stats, 4); |
| | | |
| | | $message .= "- Total Operations: $total_ops\n"; |
| | | $message .= "- Successful: $total_success\n"; |
| | | $message .= "- Failed: $total_failed\n"; |
| | | $message .= "- Success Rate: " . round(($total_success / max(1, $total_ops)) * 100, 2) . "%\n"; |
| | | $message .= "- Total Items Processed: $total_items\n\n"; |
| | | $message .= JVB()->email()->spacer(20); |
| | | |
| | | $message .= "DETAILS BY OPERATION TYPE:\n"; |
| | | // Alert if success rate is low |
| | | if ($success_rate < 90) { |
| | | $message .= JVB()->email()->alert( |
| | | sprintf('Success rate of %s%% is below the 90%% threshold', $success_rate), |
| | | 'warning' |
| | | ); |
| | | } |
| | | |
| | | foreach ($metrics as $metric) { |
| | | $message .= "• $metric->type:\n"; |
| | | $message .= " - Operations: $metric->total_operations\n"; |
| | | $message .= " - Success: $metric->successful_operations\n"; |
| | | $message .= " - Failed: $metric->failed_operations\n"; |
| | | $message .= JVB()->email()->h2('Details by Operation Type'); |
| | | |
| | | if ($metric->average_duration) { |
| | | $message .= " - Avg. Duration: " . round($metric->average_duration, 2) . " seconds\n"; |
| | | } |
| | | // Details for each operation type |
| | | foreach ($metrics as $metric) { |
| | | $details = []; |
| | | $details[] = ['label' => 'Total', 'value' => $metric->total_operations]; |
| | | $details[] = ['label' => 'Success', 'value' => JVB()->email()->badge($metric->successful_operations, 'success')]; |
| | | $details[] = ['label' => 'Failed', 'value' => $metric->failed_operations > 0 ? JVB()->email()->badge($metric->failed_operations, 'error') : '0']; |
| | | |
| | | $message .= " - Items Processed: $metric->total_items_processed\n"; |
| | | if ($metric->average_duration) { |
| | | $details[] = ['label' => 'Avg Duration', 'value' => round($metric->average_duration, 2) . 's']; |
| | | } |
| | | |
| | | if ($metric->peak_queue_size) { |
| | | $message .= " - Peak Queue Size: $metric->peak_queue_size\n"; |
| | | } |
| | | $details[] = ['label' => 'Items Processed', 'value' => number_format($metric->total_items_processed)]; |
| | | |
| | | if ($metric->peak_memory_usage) { |
| | | $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2); |
| | | $message .= " - Peak Memory Usage: $memory_mb MB\n"; |
| | | } |
| | | if ($metric->peak_memory_usage) { |
| | | $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2); |
| | | $details[] = ['label' => 'Peak Memory', 'value' => $memory_mb . ' MB']; |
| | | } |
| | | |
| | | if ($metric->peak_cpu_usage) { |
| | | $cpu_percent = round($metric->peak_cpu_usage * 50, 2); // Assuming 2 cores |
| | | $message .= " - Peak CPU Usage: $cpu_percent%\n"; |
| | | } |
| | | $message .= JVB()->email()->card( |
| | | JVB()->email()->table($details), |
| | | esc_html($metric->type) |
| | | ); |
| | | } |
| | | |
| | | $message .= "\n"; |
| | | } |
| | | // Current queue status |
| | | $pending_count = $this->getCurrentQueueSize(); |
| | | if ($pending_count > 0) { |
| | | $message .= JVB()->email()->spacer(20); |
| | | $message .= JVB()->email()->notice( |
| | | sprintf('<strong>Current Queue:</strong> %d operations pending', $pending_count) |
| | | ); |
| | | } |
| | | |
| | | // Add any outstanding queue items |
| | | $pending_count = $this->getCurrentQueueSize(); |
| | | if ($pending_count > 0) { |
| | | $message .= "CURRENT QUEUE STATUS:\n"; |
| | | $message .= "- $pending_count operations currently pending in the queue\n\n"; |
| | | } |
| | | $message .= JVB()->email()->spacer(20); |
| | | $message .= JVB()->email()->button(admin_url('admin.php?page=jvb-queue'), 'View Queue Dashboard'); |
| | | |
| | | $message .= "This is an automated report. Please check the admin dashboard for more details."; |
| | | |
| | | // Send email |
| | | jvbMail($admin_email, $subject, $message); |
| | | } |
| | | JVB()->email()->sendEmail($admin_email, $subject, $message, 'QUEUE REPORT'); |
| | | } |
| | | |
| | | /** |
| | | * @return int |
| | |
| | | ", $this->max_attempts)); |
| | | |
| | | if (!empty($stuck_operations)) { |
| | | error_log("Found " . count($stuck_operations) . " stuck operations to clean up"); |
| | | |
| | | foreach ($stuck_operations as $operation) { |
| | | $this->markAsPermanentlyFailed( |
| | | $operation->id, |
| | |
| | | ", date('Y-m-d H:i:s', strtotime('-1 hour')))); |
| | | |
| | | if (!empty($long_processing)) { |
| | | error_log("Found " . count($long_processing) . " long-running operations to reset"); |
| | | |
| | | foreach ($long_processing as $operation) { |
| | | $this->wpdb->update($table, [ |
| | | 'status' => 'pending', |