| | |
| | | <?php |
| | | namespace JVBase\managers; |
| | | |
| | | use JVBase\managers\CacheManager; |
| | | use Exception; |
| | | use WP_Error; |
| | | use WP_REST_Response; |
| | |
| | | exit; // Exit if accessed directly |
| | | } |
| | | |
| | | //TODO: Register a server cron job for jvb_process_queue |
| | | // 1) Log in to your Ploi dashboard |
| | | // 2) Navigate to your server and select the site where Edmonton Ink is installed |
| | | // 3) Go to the "Cron Jobs" tab in the site management interface |
| | | // 4) Click "Create Cron Job" to add a new scheduled task |
| | | // 5) Configure the cron job with these settings: |
| | | // - Command: Use the WP-CLI to trigger your custom wp cron event: |
| | | // cd /path/to/your/wordpress && php wp-cli.phar cron event run jvb_process_queue |
| | | // - Or if WP-CLI is installed globally: |
| | | // cd /path/to/your/wordpress && wp cron event run ei_process_queue |
| | | // - User: Select the appropriate system user (usually the one associated with your site) |
| | | // - Frequency: Set to run every 5 minutes for queue processing: |
| | | // */5 * * * * |
| | | |
| | | |
| | | //# Every minute - main queue processing |
| | | //* * * * * cd /path/to/wordpress && wp cron event run jvb_process_queue |
| | | // |
| | |
| | | |
| | | ]; |
| | | |
| | | protected ?CacheManager $cache = null; |
| | | protected ?Cache $cache = null; |
| | | protected int $ttl = 300; |
| | | protected string $cacheGroup = 'queue'; |
| | | // Cache keys for different data types |
| | | private const CACHE_QUEUE_STATUS = 'status'; |
| | | private const CACHE_HAS_ITEMS = 'has_items'; |
| | | private const CACHE_OPERATION_PREFIX = 'op_'; |
| | | private const CACHE_USER_QUEUE_PREFIX = 'user_queue_'; |
| | | private const CACHE_METRICS_PREFIX = 'metrics_'; |
| | | private const CACHE_QUEUE_SIZE = 'queue_size'; |
| | | |
| | | // Prepared statement cache |
| | | protected array $preparedStatements = []; |
| | | |
| | | public function __construct() |
| | | { |
| | | global $wpdb; |
| | | $this->wpdb = $wpdb; |
| | | $this->cache = new CacheManager('queue'); |
| | | $this->cache = Cache::for('queue', DAY_IN_SECONDS)->connect('user')->user(); |
| | | add_action('jvb_process_queue', [ $this, 'checkQueue' ]); |
| | | add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']); |
| | | add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']); |
| | |
| | | * |
| | | * @return bool|WP_REST_Response |
| | | */ |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action) |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | | error_log('Unlocking Queue from admin action'); |
| | | $this->unlockQueue(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | |
| | | $result = $this->wpdb->get_row($this->wpdb->prepare(" |
| | | SELECT |
| | | COUNT(*) as total, |
| | | SUM(CASE WHEN status IN ('pending', 'processing') THEN 1 ELSE 0 END) as active, |
| | | SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= %s THEN 1 ELSE 0 END) as ready_scheduled |
| | | SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active, |
| | | SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled |
| | | FROM $table |
| | | WHERE status IN ('pending', 'processing', 'scheduled') |
| | | ", $current_time)); |
| | |
| | | return; |
| | | } |
| | | |
| | | error_log('[queue] Checking queue...'); |
| | | |
| | | // Peek at what operations we might process |
| | | $batch_size = $this->getAdaptiveBatchSize(); |
| | |
| | | // $this->pruneOldMetrics(); // Optional: clean old metrics |
| | | } |
| | | |
| | | protected function pruneOldMetrics(): void |
| | | { |
| | | $metricsTable = $this->wpdb->prefix . $this->metricsTable; |
| | | |
| | | // Keep only last 90 days of metrics |
| | | $this->wpdb->query($this->wpdb->prepare( |
| | | "DELETE FROM $metricsTable WHERE date < %s", |
| | | date('Y-m-d', strtotime('-90 days')) |
| | | )); |
| | | } |
| | | |
| | | protected function isHeavyOperation(string $type): bool |
| | | { |
| | | $heavyOperations = [ |
| | |
| | | * Check if the queue is currently locked for processing |
| | | * @return bool |
| | | */ |
| | | protected function isQueueLocked():bool |
| | | { |
| | | $lock = get_transient(BASE . 'queue_lock'); |
| | | protected function isQueueLocked(): bool |
| | | { |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | if ($lock) { |
| | | // Check if the lock is stale (process might have crashed) |
| | | $lock_time = (int)$lock; |
| | | $current_time = time(); |
| | | // Check if lock exists and is recent |
| | | $result = $this->wpdb->get_var($this->wpdb->prepare( |
| | | "SELECT IS_USED_LOCK(%s)", |
| | | $lock_name |
| | | )); |
| | | |
| | | // If lock is older than 5 minutes, it's probably stale |
| | | if (($current_time - $lock_time) > 300) { |
| | | $this->unlockQueue(); // Clear stale lock |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | return $result !== null; |
| | | } |
| | | |
| | | /** |
| | | * Check if server is relatively idle (very low load) |
| | |
| | | * Acquire a lock on the queue |
| | | * @return bool |
| | | */ |
| | | protected function lockQueue():bool |
| | | { |
| | | // Try to acquire the lock |
| | | $result = set_transient(BASE . 'queue_lock', time()); |
| | | protected function lockQueue(): bool |
| | | { |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | if ($result) { |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | // Try to acquire lock with 0 timeout (non-blocking) |
| | | $result = $this->wpdb->get_var($this->wpdb->prepare( |
| | | "SELECT GET_LOCK(%s, 0)", |
| | | $lock_name |
| | | )); |
| | | |
| | | return $result === '1'; |
| | | } |
| | | |
| | | /** |
| | | * Release the queue lock |
| | | * @return void |
| | | */ |
| | | protected function unlockQueue():void |
| | | { |
| | | delete_transient(BASE . 'queue_lock'); |
| | | } |
| | | protected function unlockQueue(): void |
| | | { |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | $this->wpdb->query($this->wpdb->prepare( |
| | | "SELECT RELEASE_LOCK(%s)", |
| | | $lock_name |
| | | )); |
| | | } |
| | | |
| | | |
| | | /** |
| | |
| | | { |
| | | try { |
| | | $table = $this->wpdb->prefix . $this->table; |
| | | error_log('QUEUING OPERATION: '.print_r([ |
| | | 'type' => $type, |
| | | 'user_id' => $user_id, |
| | | 'data' => $data, |
| | | 'options' => $options, |
| | | ],true)); |
| | | |
| | | // Extract options |
| | | $priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal'; |
| | | $delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0; |
| | |
| | | // Generate operation ID |
| | | $operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id); |
| | | |
| | | error_log('Checked OperationId: '.print_r($operation_id, true)); |
| | | // Check for existing operation |
| | | $existing = $this->getOperation($operation_id); |
| | | error_log('Existing: '.print_r($existing, true)); |
| | | |
| | | $dependencies = $options['depends_on'] ?? []; |
| | | if (is_string($dependencies)) { |
| | |
| | | } |
| | | |
| | | if ($existing) { |
| | | error_log('[queue]:queueOperation - Already Existing operation... updating'); |
| | | |
| | | if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) { |
| | | error_log('Not appending operation, checking for merge'); |
| | | |
| | | if ($merge === 'merge') { |
| | | $existing_data = json_decode($existing->request_data ?? '{}', true); |
| | | $data = $this->deepMerge($existing_data, $data); |
| | |
| | | } |
| | | |
| | | } else if ($merge === 'append') { |
| | | error_log('Append operation, creating a new one'); |
| | | $operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4); |
| | | $existing = null; |
| | | } |
| | | } |
| | | |
| | | if (!$existing) { |
| | | error_log('Inserting new operation into table'); |
| | | // Prepare metadata with chunk config |
| | | $metadata = $chunk_config ? ['chunk_config' => $chunk_config] : []; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | $this->updateLastModified($user_id); |
| | | $this->invalidateQueueCache(); |
| | | $this->cache->invalidate(self::CACHE_USER_QUEUE_PREFIX . $user_id); |
| | | $this->invalidateUserQueue($user_id); |
| | | $this->runQueueOnShutdown(); |
| | | |
| | | return [ |
| | |
| | | ]; |
| | | |
| | | } catch (Exception $e) { |
| | | error_log("Error queueing operation: " . $e->getMessage()); |
| | | JVB()->error()->log('queue', $e->getMessage(), $data, 'high'); |
| | | return new WP_Error('queue_failed', $e->getMessage()); |
| | | } |
| | | } |
| | | |
| | | protected function updateLastModified(int $user_id) { |
| | | JVB()->routes('queue')->updateUserQueueTimestamp($user_id); |
| | | } |
| | | |
| | | protected function deepMerge(array $existing, array $new): array |
| | | { |
| | | $merged = $existing; |
| | | |
| | | if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) { |
| | | return array_merge($existing, $new); |
| | | } |
| | | foreach ($new as $key => $newValue) { |
| | | if (!array_key_exists($key, $existing)) { |
| | | $merged[$key] = $newValue; |
| | |
| | | // Recursive merge going deeper, if any of them are associative arrays |
| | | $merged[$key] = $this->deepMerge($existingValue, $newValue); |
| | | } else { |
| | | // Unique merge if indexed arrays |
| | | $merged[$key] = array_unique(array_merge($existingValue, $newValue), SORT_REGULAR); |
| | | $containsComplex = $this->containsComplexData($existingValue) || $this->containsComplexData($newValue); |
| | | |
| | | if ($containsComplex) { |
| | | // Just merge and re-index - preserves all items from chunks |
| | | $merged[$key] = array_values(array_merge($existingValue, $newValue)); |
| | | } else { |
| | | // Simple scalar arrays - use unique merge |
| | | $merged[$key] = array_unique(array_merge($existingValue, $newValue), SORT_REGULAR); |
| | | } |
| | | } |
| | | } elseif (is_array($existingValue) && !is_array($newValue)) { |
| | | // The existing value is an array, but the new one isn't |
| | |
| | | return array_keys($arr) !== range(0, count($arr) - 1); |
| | | } |
| | | |
| | | /** |
| | | * Check if an array contains complex data (arrays or objects) |
| | | * @param array $arr |
| | | * @return bool |
| | | */ |
| | | protected function containsComplexData(array $arr): bool |
| | | { |
| | | foreach ($arr as $item) { |
| | | if (is_array($item) || is_object($item)) { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | protected function getEarliestScheduledTime(string $existing, string $new): string |
| | | { |
| | | $existing_time = strtotime($existing); |
| | |
| | | 'limit' => 1 |
| | | ]); |
| | | if ($existing) { |
| | | error_log('Found existing User operations: '.print_r($existing, true)); |
| | | $operation_id = $existing[0]->id; |
| | | } |
| | | } |
| | | |
| | | if (!$operation_id) { |
| | | $operation_id = uniqid('op_'); |
| | | error_log('Generated operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | if (!str_starts_with($operation_id, 'u')) { |
| | | $operation_id = 'u' . $user_id . '_' . $operation_id; |
| | | error_log('Added user ID to operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | return $operation_id; |
| | |
| | | { |
| | | // Only add if not already scheduled AND if operation is high priority or small |
| | | if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) { |
| | | error_log('Adding shutdown call'); |
| | | add_action('shutdown', [$this, 'processQueueOnShutdown'], 100); |
| | | } |
| | | } |
| | |
| | | $this->processOperation($operation); |
| | | |
| | | // Invalidate operation cache after processing |
| | | $this->cache->invalidate(self::CACHE_OPERATION_PREFIX . $operation->id); |
| | | $this->cache->invalidate(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id); |
| | | $this->cache->forget(self::CACHE_OPERATION_PREFIX . $operation->id); |
| | | $this->cache->forget(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id); |
| | | } |
| | | |
| | | // Batch invalidate caches at the end |
| | |
| | | AND retries < %d |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | COALESCE(scheduled_at, created_at) ASC |
| | | COALESCE(scheduled_at, created_at) |
| | | LIMIT %d |
| | | ", $current_time, $this->max_attempts, $batch_size)); |
| | | |
| | |
| | | protected function canProcessOperation(object $operation): bool |
| | | { |
| | | if ($operation->retries >= $this->max_attempts) { |
| | | error_log("Operation {$operation->id} exceeded retry limit: {$operation->retries} >= {$this->max_attempts}"); |
| | | $this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts'); |
| | | return false; |
| | | } |
| | |
| | | |
| | | if ($result) { |
| | | $this->invalidateQueueCache('status'); |
| | | error_log("Marked operation {$operation_id} as permanently failed: {$final_error_message}"); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if user's queue has been modified since given timestamp |
| | | * Returns true if modified, false if not (can send 304) |
| | | */ |
| | | public function isUserQueueModified(int $user_id, int $since_timestamp): bool |
| | | { |
| | | return $this->cache::lastModified("user_{$user_id}") > $since_timestamp; |
| | | } |
| | | protected function invalidateUserQueue(int $user_id): void |
| | | { |
| | | // This automatically: |
| | | // 1. Updates HTTP timestamp for user_{$user_id} |
| | | // 2. Flushes user-specific caches |
| | | // 3. Triggers connected cache invalidation |
| | | Cache::for($user_id)->flush(); |
| | | } |
| | | |
| | | /** |
| | | * Invalidate all queue-related caches |
| | | */ |
| | | protected function invalidateQueueCache(string $scope = 'all'): void |
| | |
| | | $keys = $cacheKeys[$scope] ?? $cacheKeys['all']; |
| | | |
| | | foreach ($keys as $key) { |
| | | $this->cache->invalidate($key); |
| | | $this->cache->forget($key); |
| | | } |
| | | |
| | | $this->cache->touch(); |
| | | |
| | | if ($scope === 'all') { |
| | | // Clear entire group for complete refresh |
| | | $this->cache->invalidateGroup($this->cacheGroup); |
| | | jvbUpdateCacheTimestamp('queue'); |
| | | delete_transient('jvb_queue_status_counts'); |
| | | } |
| | | } |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | // Log retry schedule |
| | | error_log(sprintf( |
| | | "Operation %s scheduled for retry #%d at %s (delay: %ds)", |
| | | $operation_id, |
| | | $attempt, |
| | | $scheduled_at, |
| | | $delay |
| | | )); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | $message .= "Please check the error logs for more details."; |
| | | |
| | | return jvbMail($admin_email, $subject, $message); |
| | | return JVB()->email()->sendEmail($admin_email, $subject, $message); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (!$updated) { |
| | | throw new Exception('Operation no longer available for processing'); |
| | | } |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | |
| | | $data = json_decode($operation->request_data, true); |
| | | $progress_count = (int) $operation->progress_count; |
| | |
| | | ]; |
| | | } |
| | | |
| | | error_log('Filtered Result: '.print_r($filterResult, true)); |
| | | |
| | | $newCount = $progress_count + $chunk['progress']; |
| | | |
| | | if ($filterResult['success']) { |
| | |
| | | } |
| | | // Store the result data |
| | | $resultToStore = $this->deepMerge($oldResult, $filterResult['result']); |
| | | |
| | | |
| | | $resultToStore['processed_at'] = current_time('mysql'); |
| | | |
| | | // Check if operation is complete |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | |
| | | |
| | | |
| | | error_log('Completion result: '.print_r($result, true)); |
| | | // Now do post-completion tasks |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->updateUserQueueTimestamp($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | |
| | | $this->trackOperationMetrics($operation->id); |
| | | |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | } |
| | | |
| | | } else { |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | } else { |
| | | // Failed but more to process - continue with next chunk |
| | | $this->wpdb->update( |
| | |
| | | } |
| | | } |
| | | // Clear operation cache after any update |
| | | $this->cache->invalidate(self::CACHE_OPERATION_PREFIX . $operation->id); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | return $filterResult; |
| | | |
| | | } catch (Exception $e) { |
| | |
| | | $batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size); |
| | | |
| | | // Merge non-chunked data with chunked batch |
| | | $result = []; |
| | | foreach ($data as $k => $v) { |
| | | if (!in_array($k, $keys)) { |
| | | $result[$k] = $v; // Keep non-batch data |
| | | } |
| | | } |
| | | $result = array_filter($data, function ($k) use ($keys) { |
| | | return !in_array($k, $keys); |
| | | }, ARRAY_FILTER_USE_KEY); |
| | | |
| | | // Add the batched data |
| | | foreach ($batch as $k => $v) { |
| | |
| | | |
| | | protected function updateUserQueueTimestamp(int $user_id) |
| | | { |
| | | $key = "{$user_id}_queue_timestamp"; |
| | | $this->cache->set($key, time()); |
| | | |
| | | Cache::touch("user_{$user_id}"); |
| | | } |
| | | |
| | | /** |
| | |
| | | * Send daily metrics report to admin |
| | | * @return void |
| | | */ |
| | | public function emailDailyMetricsReport():void |
| | | { |
| | | public function emailDailyMetricsReport():void |
| | | { |
| | | $metrics_table = $this->wpdb->prefix . $this->metricsTable; |
| | | $yesterday = date('Y-m-d', strtotime('-1 day')); |
| | | |
| | | $metrics_table = $this->wpdb->prefix . $this->metricsTable; |
| | | $yesterday = date('Y-m-d', strtotime('-1 day')); |
| | | $metrics = $this->wpdb->get_results($this->wpdb->prepare( |
| | | "SELECT * FROM $metrics_table WHERE date = %s", |
| | | $yesterday |
| | | )); |
| | | |
| | | // Get yesterday's metrics |
| | | $metrics = $this->wpdb->get_results($this->wpdb->prepare( |
| | | "SELECT * FROM $metrics_table WHERE date = %s", |
| | | $yesterday |
| | | )); |
| | | if (empty($metrics)) { |
| | | return; |
| | | } |
| | | |
| | | if (empty($metrics)) { |
| | | return; // No metrics to report |
| | | } |
| | | $admin_email = get_option('admin_email'); |
| | | $site_name = get_bloginfo('name'); |
| | | $subject = "[$site_name] Daily Queue Performance - " . $yesterday; |
| | | |
| | | $admin_email = get_option('admin_email'); |
| | | $site_name = get_bloginfo('name'); |
| | | // Calculate totals |
| | | $total_ops = 0; |
| | | $total_success = 0; |
| | | $total_failed = 0; |
| | | $total_items = 0; |
| | | |
| | | $subject = "[$site_name] Daily Queue Performance Report - " . date('Y-m-d', strtotime('-1 day')); |
| | | foreach ($metrics as $metric) { |
| | | $total_ops += $metric->total_operations; |
| | | $total_success += $metric->successful_operations; |
| | | $total_failed += $metric->failed_operations; |
| | | $total_items += $metric->total_items_processed; |
| | | } |
| | | |
| | | $message = "Daily Queue Performance Report for $yesterday\n\n"; |
| | | $success_rate = round(($total_success / max(1, $total_ops)) * 100, 1); |
| | | |
| | | $message .= "SUMMARY:\n"; |
| | | $total_ops = 0; |
| | | $total_success = 0; |
| | | $total_failed = 0; |
| | | $total_items = 0; |
| | | $message = JVB()->email()->h1('Daily Queue Performance Report'); |
| | | $message .= sprintf('<p>Report for <strong>%s</strong></p>', $yesterday); |
| | | |
| | | foreach ($metrics as $metric) { |
| | | $total_ops += $metric->total_operations; |
| | | $total_success += $metric->successful_operations; |
| | | $total_failed += $metric->failed_operations; |
| | | $total_items += $metric->total_items_processed; |
| | | } |
| | | // Summary stats in grid |
| | | $stats = [ |
| | | JVB()->email()->stat($total_ops, 'Operations'), |
| | | JVB()->email()->stat($total_success, 'Successful', '✓'), |
| | | JVB()->email()->stat($total_failed, 'Failed', $total_failed > 0 ? '⚠' : ''), |
| | | JVB()->email()->stat($success_rate . '%', 'Success Rate') |
| | | ]; |
| | | $message .= JVB()->email()->grid($stats, 4); |
| | | |
| | | $message .= "- Total Operations: $total_ops\n"; |
| | | $message .= "- Successful: $total_success\n"; |
| | | $message .= "- Failed: $total_failed\n"; |
| | | $message .= "- Success Rate: " . round(($total_success / max(1, $total_ops)) * 100, 2) . "%\n"; |
| | | $message .= "- Total Items Processed: $total_items\n\n"; |
| | | $message .= JVB()->email()->spacer(20); |
| | | |
| | | $message .= "DETAILS BY OPERATION TYPE:\n"; |
| | | // Alert if success rate is low |
| | | if ($success_rate < 90) { |
| | | $message .= JVB()->email()->alert( |
| | | sprintf('Success rate of %s%% is below the 90%% threshold', $success_rate), |
| | | 'warning' |
| | | ); |
| | | } |
| | | |
| | | foreach ($metrics as $metric) { |
| | | $message .= "• $metric->type:\n"; |
| | | $message .= " - Operations: $metric->total_operations\n"; |
| | | $message .= " - Success: $metric->successful_operations\n"; |
| | | $message .= " - Failed: $metric->failed_operations\n"; |
| | | $message .= JVB()->email()->h2('Details by Operation Type'); |
| | | |
| | | if ($metric->average_duration) { |
| | | $message .= " - Avg. Duration: " . round($metric->average_duration, 2) . " seconds\n"; |
| | | } |
| | | // Details for each operation type |
| | | foreach ($metrics as $metric) { |
| | | $details = []; |
| | | $details[] = ['label' => 'Total', 'value' => $metric->total_operations]; |
| | | $details[] = ['label' => 'Success', 'value' => JVB()->email()->badge($metric->successful_operations, 'success')]; |
| | | $details[] = ['label' => 'Failed', 'value' => $metric->failed_operations > 0 ? JVB()->email()->badge($metric->failed_operations, 'error') : '0']; |
| | | |
| | | $message .= " - Items Processed: $metric->total_items_processed\n"; |
| | | if ($metric->average_duration) { |
| | | $details[] = ['label' => 'Avg Duration', 'value' => round($metric->average_duration, 2) . 's']; |
| | | } |
| | | |
| | | if ($metric->peak_queue_size) { |
| | | $message .= " - Peak Queue Size: $metric->peak_queue_size\n"; |
| | | } |
| | | $details[] = ['label' => 'Items Processed', 'value' => number_format($metric->total_items_processed)]; |
| | | |
| | | if ($metric->peak_memory_usage) { |
| | | $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2); |
| | | $message .= " - Peak Memory Usage: $memory_mb MB\n"; |
| | | } |
| | | if ($metric->peak_memory_usage) { |
| | | $memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2); |
| | | $details[] = ['label' => 'Peak Memory', 'value' => $memory_mb . ' MB']; |
| | | } |
| | | |
| | | if ($metric->peak_cpu_usage) { |
| | | $cpu_percent = round($metric->peak_cpu_usage * 50, 2); // Assuming 2 cores |
| | | $message .= " - Peak CPU Usage: $cpu_percent%\n"; |
| | | } |
| | | $message .= JVB()->email()->card( |
| | | JVB()->email()->table($details), |
| | | esc_html($metric->type) |
| | | ); |
| | | } |
| | | |
| | | $message .= "\n"; |
| | | } |
| | | // Current queue status |
| | | $pending_count = $this->getCurrentQueueSize(); |
| | | if ($pending_count > 0) { |
| | | $message .= JVB()->email()->spacer(20); |
| | | $message .= JVB()->email()->notice( |
| | | sprintf('<strong>Current Queue:</strong> %d operations pending', $pending_count) |
| | | ); |
| | | } |
| | | |
| | | // Add any outstanding queue items |
| | | $pending_count = $this->getCurrentQueueSize(); |
| | | if ($pending_count > 0) { |
| | | $message .= "CURRENT QUEUE STATUS:\n"; |
| | | $message .= "- $pending_count operations currently pending in the queue\n\n"; |
| | | } |
| | | $message .= JVB()->email()->spacer(20); |
| | | $message .= JVB()->email()->button(admin_url('admin.php?page=jvb-queue'), 'View Queue Dashboard'); |
| | | |
| | | $message .= "This is an automated report. Please check the admin dashboard for more details."; |
| | | |
| | | // Send email |
| | | jvbMail($admin_email, $subject, $message); |
| | | } |
| | | JVB()->email()->sendEmail($admin_email, $subject, $message, 'QUEUE REPORT'); |
| | | } |
| | | |
| | | /** |
| | | * @return int |
| | |
| | | SELECT |
| | | status, |
| | | COUNT(*) as count, |
| | | SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= '$current_time' THEN 1 ELSE 0 END) as scheduled_ready |
| | | SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready |
| | | FROM $table |
| | | GROUP BY status |
| | | ", OBJECT_K); |
| | |
| | | ", $this->max_attempts)); |
| | | |
| | | if (!empty($stuck_operations)) { |
| | | error_log("Found " . count($stuck_operations) . " stuck operations to clean up"); |
| | | |
| | | foreach ($stuck_operations as $operation) { |
| | | $this->markAsPermanentlyFailed( |
| | | $operation->id, |
| | |
| | | ", date('Y-m-d H:i:s', strtotime('-1 hour')))); |
| | | |
| | | if (!empty($long_processing)) { |
| | | error_log("Found " . count($long_processing) . " long-running operations to reset"); |
| | | |
| | | foreach ($long_processing as $operation) { |
| | | $this->wpdb->update($table, [ |
| | | 'status' => 'pending', |
| | |
| | | DATE(completed_at) as date, |
| | | type, |
| | | COUNT(*) as total_operations, |
| | | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful, |
| | | SUM(CASE WHEN status LIKE 'failed%' THEN 1 ELSE 0 END) as failed, |
| | | SUM(IF(status = 'completed', 1, 0)) as successful, |
| | | SUM(IF(status LIKE 'failed%', 1, 0)) as failed, |
| | | AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration, |
| | | SUM(count) as items_processed |
| | | FROM $table |