<?php
|
namespace JVBase\managers;
|
|
use Exception;
|
use WP_Error;
|
use WP_REST_Response;
|
use WP_REST_Request;
|
|
if (!defined('ABSPATH')) {
|
exit; // Exit if accessed directly
|
}
|
|
//# Every minute - main queue processing
|
//* * * * * cd /path/to/wordpress && wp cron event run jvb_process_queue
|
//
|
//# Every hour - maintenance tasks
|
//0 * * * * cd /path/to/wordpress && wp cron event run jvb_queue_maintenance
|
//
|
//# Daily at 2 AM - metrics report
|
//0 2 * * * cd /path/to/wordpress && wp cron event run jvbEmailDailyMetricsReport
|
//
|
//
|
//
|
//Click "Create" to save the cron job
|
class OperationQueue
|
{
|
protected string $table = BASE.'_operation_queue';
|
protected string $metricsTable = BASE.'stats__operation_queue';
|
protected int $max_attempts = 3;
|
protected \wpdb $wpdb;
|
|
// System load thresholds
|
protected array $load_thresholds = [
|
'cpu' => 80, // Max CPU usage percentage
|
'memory' => 85, // Max memory usage percentage
|
'load_avg' => 1.6 // Max load average (80% of 2 cores)
|
];
|
|
protected int $maxPerBatch = 25;
|
|
// Queue configuration
|
protected array $config = [
|
'max_operations_per_batch' => 25, // Reduced from 100 for better control
|
'max_concurrent_operations' => 100, // Adjusted for 2 CPU cores
|
|
];
|
|
protected ?Cache $cache = null;
|
protected int $ttl = 300;
|
// Cache keys for different data types
|
private const CACHE_QUEUE_STATUS = 'status';
|
private const CACHE_HAS_ITEMS = 'has_items';
|
private const CACHE_OPERATION_PREFIX = 'op_';
|
private const CACHE_USER_QUEUE_PREFIX = 'user_queue_';
|
private const CACHE_QUEUE_SIZE = 'queue_size';
|
|
|
public function __construct()
|
{
|
global $wpdb;
|
$this->wpdb = $wpdb;
|
$this->cache = Cache::for('queue', DAY_IN_SECONDS)->connect('user');
|
add_action('jvb_process_queue', [ $this, 'checkQueue' ]);
|
add_action('jvb_queue_maintenance', [$this, 'hourlyMaintenance']);
|
add_action('jvbEmailDailyMetricsReport', [$this, 'emailDailyMetricsReport']);
|
|
jvb_register_do_once('queue_admin_action_registered', [$this, 'registerAdminAction']);
|
add_filter(BASE.'admin_action_filter', [$this, 'adminActionFilter'], 10, 3);
|
// add_filter('jvbAdminSubpages', [$this, 'addSubpage'], 10, 1);
|
|
if ( ! wp_next_scheduled( 'jvb_process_queue' ) ) {
|
wp_schedule_event( time(), 'every-minute', 'jvb_process_queue' );
|
}
|
if ( ! wp_next_scheduled( 'jvb_queue_maintenance' ) ) {
|
wp_schedule_event( time(), 'hourly', 'jvb_queue_maintenance' );
|
}
|
}
|
|
public function addSubpage(array $subpages):array
|
{
|
$subpages[] = [
|
'page_title' => 'Operation Queue', // page_title
|
'menu_title' => 'Queue', // menu_title
|
'capability' => 'manage_options', // capability
|
'menu_slug' => 'operation-queue', // menu_slug (will become BASE.'integrations')
|
'callback' => [$this, 'renderAdminPage'], // callback
|
];
|
|
return $subpages;
|
}
|
public function registerAdminAction():void
|
{
|
$admin = JVB()->admin();
|
$admin->registerAction(
|
'Restart Stuck Operations',
|
'restart-stuck-operations',
|
'manage_options',
|
'all'
|
);
|
$admin->registerAction(
|
'Unlock Queue',
|
'unlock-operation-queue',
|
'manage_options',
|
'all'
|
);
|
}
|
|
/**
|
* @param WP_REST_Response $response
|
* @param string $action
|
*
|
* @return bool|WP_REST_Response
|
*/
|
public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool
|
{
|
switch ($action) {
|
case 'unlock-operation-queue':
|
$this->unlockQueue();
|
return new WP_REST_Response([
|
'success' => true,
|
'message' => 'Unlocked Queue'
|
]);
|
case 'restart-stuck-operations':
|
$this->restartStuckOperations();
|
return new WP_REST_Response([
|
'success' => true,
|
'message' => 'Restarted Stuck Operations'
|
]);
|
default:
|
return $response;
|
}
|
}
|
|
protected function getQueueInfo(): array
|
{
|
$cacheKey = 'queue_info';
|
$cached = $this->cache->get($cacheKey);
|
if ($cached !== false) {
|
return $cached;
|
}
|
|
$table = $this->wpdb->prefix . $this->table;
|
$current_time = current_time('mysql');
|
|
// Get both count and existence in one query
|
$result = $this->wpdb->get_row($this->wpdb->prepare("
|
SELECT
|
COUNT(*) as total,
|
SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active,
|
SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled
|
FROM $table
|
WHERE status IN ('pending', 'processing', 'scheduled')
|
", $current_time));
|
|
$info = [
|
'total' => (int)$result->total,
|
'active' => (int)$result->active,
|
'ready' => (int)$result->active + (int)$result->ready_scheduled,
|
'has_items' => ((int)$result->active + (int)$result->ready_scheduled) > 0
|
];
|
|
$this->cache->set($cacheKey, $info, 30);
|
return $info;
|
}
|
protected function hasItemsInQueue(): bool
|
{
|
return $this->getQueueInfo()['has_items'];
|
}
|
|
/**
|
* Cron job that checks if the queue needs to be processed
|
* @return void
|
*/
|
public function checkQueue(): void
|
{
|
if (!$this->hasItemsInQueue()) {
|
return;
|
}
|
|
if ($this->isQueueLocked()) {
|
return;
|
}
|
|
|
// Peek at what operations we might process
|
$batch_size = $this->getAdaptiveBatchSize();
|
$operations = $this->getOperations($batch_size);
|
|
// Only check server load for heavy operations
|
if ($this->shouldCheckServerLoad($operations) && !$this->isServerIdle()) {
|
error_log('Server not idle for heavy operations');
|
return;
|
}
|
|
$this->lockQueue();
|
$this->applyServerSettings();
|
$this->processQueue($operations);
|
$this->unlockQueue();
|
}
|
|
public function hourlyMaintenance(): void
|
{
|
if ($this->isQueueLocked()) {
|
return; // Don't run maintenance while processing
|
}
|
|
$this->cleanupStuckOperations();
|
$this->restartStuckOperations();
|
// $this->pruneOldMetrics(); // Optional: clean old metrics
|
}
|
|
protected function isHeavyOperation(string $type): bool
|
{
|
$heavyOperations = [
|
'file_upload' => true,
|
'image_processing' => true,
|
'rebuild_user_term_index' => true,
|
'sync_content_taxonomy_tables' => true,
|
'email_notification_digest' => true,
|
'taxonomy_relationships' => true,
|
];
|
|
return isset($heavyOperations[$type]);
|
}
|
|
protected function shouldCheckServerLoad(array $operations): bool
|
{
|
// Check if any operations are heavy
|
foreach ($operations as $op) {
|
if ($this->isHeavyOperation($op->type)) {
|
return true;
|
}
|
}
|
return false;
|
}
|
|
/**
|
* Check if the queue is currently locked for processing
|
* @return bool
|
*/
|
protected function isQueueLocked(): bool
|
{
|
$lock_name = BASE . '_queue_lock';
|
|
// Check if lock exists and is recent
|
$result = $this->wpdb->get_var($this->wpdb->prepare(
|
"SELECT IS_USED_LOCK(%s)",
|
$lock_name
|
));
|
|
return $result !== null;
|
}
|
|
/**
|
* Check if server is relatively idle (very low load)
|
* @return bool
|
*/
|
private function isServerIdle():bool
|
{
|
// Consider server idle if load is below 50% of our threshold
|
if (function_exists('sys_getloadavg')) {
|
$load = sys_getloadavg();
|
if ($load[0] > ($this->load_thresholds['load_avg'] * 0.5)) {
|
return false;
|
}
|
}
|
|
// Consider memory idle if usage is below 60% of our threshold
|
$memory_usage = $this->getMemoryUsage();
|
if ($memory_usage > ($this->load_thresholds['memory'] * 0.6)) {
|
return false;
|
}
|
return true;
|
}
|
|
/**
|
* Acquire a lock on the queue
|
* @return bool
|
*/
|
protected function lockQueue(): bool
|
{
|
$lock_name = BASE . '_queue_lock';
|
|
// Try to acquire lock with 0 timeout (non-blocking)
|
$result = $this->wpdb->get_var($this->wpdb->prepare(
|
"SELECT GET_LOCK(%s, 0)",
|
$lock_name
|
));
|
|
return $result === '1';
|
}
|
|
/**
|
* Release the queue lock
|
* @return void
|
*/
|
protected function unlockQueue(): void
|
{
|
$lock_name = BASE . '_queue_lock';
|
|
$this->wpdb->query($this->wpdb->prepare(
|
"SELECT RELEASE_LOCK(%s)",
|
$lock_name
|
));
|
}
|
|
|
/**
|
* @return void
|
*/
|
protected function applyServerSettings():void
|
{
|
// Set max execution time for background processes
|
set_time_limit(300); // 5 minutes
|
|
// Optimize garbage collection
|
gc_enable();
|
|
// Set process priority if possible
|
if (function_exists('proc_nice')) {
|
proc_nice(10); // Lower priority for background tasks
|
}
|
}
|
|
/**
|
* @param object $operation
|
*
|
* @return bool
|
*/
|
protected function areDependenciesMet(object $operation): bool
|
{
|
if (empty($operation->dependencies)) {
|
return true;
|
}
|
|
// Add timeout check for stuck dependencies
|
// $created_time = strtotime($operation->created_at);
|
// $max_wait = 3600; // 1 hour max wait
|
//
|
// if (time() - $created_time > $max_wait) {
|
// error_log("Operation {$operation->id} timed out waiting for dependencies");
|
// $this->markAsPermanentlyFailed($operation->id, 'Dependencies timeout after 1 hour');
|
// return false;
|
// }
|
|
$dependencies = is_string($operation->dependencies)
|
? json_decode($operation->dependencies, true)
|
: $operation->dependencies;
|
|
if (is_string($dependencies)) {
|
$dependencies = explode(',',$dependencies);
|
}
|
|
// Batch check all dependencies at once
|
$statuses = $this->getOperationStatuses($dependencies);
|
|
foreach ($dependencies as $dep_id) {
|
if (!isset($statuses[$dep_id]) || $statuses[$dep_id]->status !== 'completed') {
|
return false;
|
}
|
}
|
|
return true;
|
}
|
|
public function getOperationStatuses(array $operation_ids): array
|
{
|
if (empty($operation_ids)) {
|
return [];
|
}
|
|
$placeholders = implode(',', array_fill(0, count($operation_ids), '%s'));
|
$table = $this->wpdb->prefix . $this->table;
|
|
$results = $this->wpdb->get_results($this->wpdb->prepare(
|
"SELECT id, status FROM $table WHERE id IN ($placeholders)",
|
...$operation_ids
|
), OBJECT_K);
|
|
return $results ?: [];
|
}
|
|
/**
|
* Queue or update an operation with intelligent batching
|
*
|
* @param string $type Operation type
|
* @param int $user_id User ID
|
* @param array $data Operation data
|
* @param array $options Operation options
|
* - operation_id: string,
|
* - priority: string, (high, normal, low) Operation priority
|
* - delay: int Time window in seconds before processing
|
* - scheduled: string, as formatted by sanitizeDateTime in MetaSanitizer.php
|
* - merge: string, //replace, append, merge
|
* - chunk_key: string The array key to split large operations by
|
* - chunk_size: int The batch size to process at once
|
* @return array|WP_Error
|
*/
|
public function queueOperation(string $type, int $user_id, array $data, array $options = []): array|WP_Error
|
{
|
try {
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Extract options
|
$priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal';
|
$delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0;
|
$scheduledFor = (array_key_exists('scheduled', $options)) ? $options['scheduled'] : '';
|
$merge = (array_key_exists('merge', $options) && in_array($options['merge'], ['merge', 'append', 'replace'])) ? $options['merge'] : 'merge';
|
$operation_id = $options['operation_id'] ?? false;
|
$chunk_key = $options['chunk_key'] ?? null;
|
$chunk_size = $options['chunk_size'] ?? null;
|
|
// Calculate operation count based on chunk_key if provided
|
$count = 1; // Default
|
if ($chunk_key && isset($data[$chunk_key])) {
|
if (is_array($chunk_key)) {
|
// Support multiple keys (like favourites_batch with 'adds' and 'removes')
|
$total = 0;
|
foreach ($chunk_key as $key) {
|
if (isset($data[$key]) && is_array($data[$key])) {
|
$total += count($data[$key]);
|
}
|
}
|
$count = max(1, $total);
|
} else {
|
// Single key
|
$count = max(1, count($data[$chunk_key]));
|
}
|
}
|
|
// Generate operation ID
|
$operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id);
|
|
// Check for existing operation
|
$existing = $this->getOperation($operation_id);
|
|
$dependencies = $options['depends_on'] ?? [];
|
if (is_string($dependencies)) {
|
$dependencies = explode(',',$dependencies);
|
}
|
|
$scheduled_at = $this->calculateScheduledTime($delay === 0 ? $scheduledFor : $delay);
|
|
// Prepare chunk config to store
|
$chunk_config = null;
|
if ($chunk_key && $chunk_size) {
|
$chunk_config = [
|
'key' => $chunk_key,
|
'size' => $chunk_size
|
];
|
}
|
|
if ($existing) {
|
|
if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) {
|
|
if ($merge === 'merge') {
|
$existing_data = json_decode($existing->request_data ?? '{}', true);
|
$data = $this->deepMerge($existing_data, $data);
|
|
$existing_dependencies = json_decode($existing->dependencies ?? '[]', true);
|
if (!is_array($existing_dependencies)) {
|
$existing_dependencies = [];
|
}
|
$dependencies = array_unique(array_merge($existing_dependencies, $dependencies));
|
}
|
|
// Keep earliest scheduled time
|
if ($delay > 0 || $scheduledFor !== '') {
|
$scheduled_at = $this->getEarliestScheduledTime(
|
$existing->scheduled_at ?? $existing->created_at,
|
$scheduled_at
|
);
|
}
|
|
// Store chunking config in metadata
|
$metadata = json_decode($existing->metadata ?? '{}', true);
|
if ($chunk_config) {
|
$metadata['chunk_config'] = $chunk_config;
|
}
|
|
$result = $this->wpdb->update(
|
$table,
|
[
|
'request_data' => json_encode($data),
|
'dependencies' => json_encode($dependencies),
|
'merge' => $merge,
|
'count' => $count,
|
'priority' => $priority,
|
'scheduled_at' => $scheduled_at,
|
'metadata' => json_encode($metadata),
|
'updated_at' => current_time('mysql'),
|
],
|
['id' => $operation_id],
|
['%s', '%s', '%s', '%d', '%s', '%s', '%s', '%s'],
|
['%s']
|
);
|
|
if ($result === false) {
|
throw new Exception("Failed to update operation: " . $this->wpdb->last_error);
|
}
|
|
} else if ($merge === 'append') {
|
$operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4);
|
$existing = null;
|
}
|
}
|
|
if (!$existing) {
|
// Prepare metadata with chunk config
|
$metadata = $chunk_config ? ['chunk_config' => $chunk_config] : [];
|
|
$result = $this->wpdb->insert(
|
$table,
|
[
|
'id' => $operation_id,
|
'type' => $type,
|
'user_id' => $user_id,
|
'request_data' => json_encode($data),
|
'count' => $count,
|
'priority' => $priority,
|
'status' => $delay > 0 ? 'scheduled' : 'pending',
|
'dependencies' => json_encode($dependencies),
|
'metadata' => json_encode($metadata),
|
'merge' => $merge,
|
'scheduled_at' => $scheduled_at,
|
'created_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql'),
|
],
|
['%s', '%s', '%d', '%s', '%d', '%s', '%s', '%s', '%s', '%s', '%s', '%s']
|
);
|
|
if ($result === false) {
|
throw new Exception("Failed to insert operation: " . $this->wpdb->last_error);
|
}
|
}
|
|
$this->invalidateUserQueue($user_id);
|
$this->runQueueOnShutdown();
|
|
return [
|
'success' => true,
|
'operation_id' => $operation_id,
|
'updated_existing' => !empty($existing),
|
'scheduled_at' => $scheduled_at,
|
'queue_status' => $this->getQueueStatus()
|
];
|
|
} catch (Exception $e) {
|
JVB()->error()->log('queue', $e->getMessage(), $data, 'high');
|
return new WP_Error('queue_failed', $e->getMessage());
|
}
|
}
|
|
protected function deepMerge(array $existing, array $new): array
|
{
|
$merged = $existing;
|
|
if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) {
|
return array_merge($existing, $new);
|
}
|
foreach ($new as $key => $newValue) {
|
if (!array_key_exists($key, $existing)) {
|
$merged[$key] = $newValue;
|
} else {
|
$existingValue = $existing[$key];
|
if (is_array($existingValue) && is_array($newValue)) {
|
if ($this->isAssociativeArray($existingValue) || $this->isAssociativeArray($newValue)) {
|
// Recursive merge going deeper, if any of them are associative arrays
|
$merged[$key] = $this->deepMerge($existingValue, $newValue);
|
} else {
|
$containsComplex = $this->containsComplexData($existingValue) || $this->containsComplexData($newValue);
|
|
if ($containsComplex) {
|
// Just merge and re-index - preserves all items from chunks
|
$merged[$key] = array_values(array_merge($existingValue, $newValue));
|
} else {
|
// Simple scalar arrays - use unique merge
|
$merged[$key] = array_unique(array_merge($existingValue, $newValue), SORT_REGULAR);
|
}
|
}
|
} elseif (is_array($existingValue) && !is_array($newValue)) {
|
// The existing value is an array, but the new one isn't
|
// Check if it's safe to use in_array (only if existing doesn't contain arrays)
|
$containsArrays = false;
|
foreach ($existingValue as $item) {
|
if (is_array($item) || is_object($item)) {
|
$containsArrays = true;
|
break;
|
}
|
}
|
|
if (!$containsArrays) {
|
// Safe to use in_array with strict comparison
|
if (!in_array($newValue, $existingValue, true)) {
|
$merged[$key][] = $newValue;
|
}
|
} else {
|
// For arrays containing complex types, just append
|
// (avoiding comparison that might cause issues)
|
$merged[$key][] = $newValue;
|
}
|
} elseif (!is_array($existingValue) && is_array($newValue)) {
|
// The opposite check as above
|
$merged[$key] = array_unique(array_merge([$existingValue], $newValue), SORT_REGULAR);
|
} else {
|
// Override the existing with the new, both are scalars
|
$merged[$key] = $newValue;
|
}
|
}
|
}
|
return $merged;
|
}
|
|
/**
|
* Check if an array is associative
|
* @param array $arr
|
* @return bool
|
*/
|
protected function isAssociativeArray(array $arr): bool
|
{
|
if (empty($arr)) {
|
return false;
|
}
|
return array_keys($arr) !== range(0, count($arr) - 1);
|
}
|
|
/**
|
* Check if an array contains complex data (arrays or objects)
|
* @param array $arr
|
* @return bool
|
*/
|
protected function containsComplexData(array $arr): bool
|
{
|
foreach ($arr as $item) {
|
if (is_array($item) || is_object($item)) {
|
return true;
|
}
|
}
|
return false;
|
}
|
|
protected function getEarliestScheduledTime(string $existing, string $new): string
|
{
|
$existing_time = strtotime($existing);
|
$new_time = strtotime($new);
|
return $existing_time < $new_time ? $existing : $new;
|
}
|
|
protected function calculateScheduledTime(int|string $delay): string
|
{
|
if (is_int($delay) && $delay > 0) {
|
return date('Y-m-d H:i:s', current_time('timestamp') + $delay);
|
}
|
if ($delay !== '') {
|
return strtotime($delay);
|
}
|
return current_time('mysql');
|
}
|
|
protected function checkOperationId(?string $operation_id, string $type, string $merge, int $user_id): string
|
{
|
if ($merge === 'merge') {
|
$existing = $this->getUserOperations(
|
$user_id,
|
[
|
'status' => 'pending',
|
'type' => $type,
|
'merge' => $merge,
|
'limit' => 1
|
]);
|
if ($existing) {
|
$operation_id = $existing[0]->id;
|
}
|
}
|
|
if (!$operation_id) {
|
$operation_id = uniqid('op_');
|
}
|
|
if (!str_starts_with($operation_id, 'u')) {
|
$operation_id = 'u' . $user_id . '_' . $operation_id;
|
}
|
|
return $operation_id;
|
}
|
|
|
protected function runQueueOnShutdown(): void
|
{
|
// Only add if not already scheduled AND if operation is high priority or small
|
if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) {
|
add_action('shutdown', [$this, 'processQueueOnShutdown'], 100);
|
}
|
}
|
|
/**
|
* Process queue after request is complete
|
*/
|
public function processQueueOnShutdown(): void
|
{
|
// Remove so it only runs once per request
|
remove_action('shutdown', [$this, 'processQueueOnShutdown']);
|
|
// Send response to user first if using PHP-FPM
|
if (function_exists('fastcgi_finish_request')) {
|
fastcgi_finish_request();
|
}
|
|
// Now process the queue
|
$this->checkQueue();
|
}
|
|
/**
|
* @param string $operation_id
|
* @return object|false
|
*/
|
public function getOperation(string $operation_id, bool $skipCache = false): object|false
|
{
|
try {
|
$table = $this->wpdb->prefix . $this->table;
|
|
$operation = $this->wpdb->get_row($this->wpdb->prepare(
|
"SELECT * FROM $table WHERE id = %s",
|
$operation_id
|
));
|
|
return $operation ?: false;
|
} catch (Exception) {
|
return false;
|
}
|
}
|
|
/**
|
* Process queue with adaptive batch sizing
|
* @return void
|
*/
|
public function processQueue(array $operations = []): void
|
{
|
try {
|
// If no operations passed, fetch them
|
if (empty($operations)) {
|
$batch_size = $this->getAdaptiveBatchSize();
|
$operations = $this->getOperations($batch_size);
|
}
|
|
// Pre-warm cache for dependencies check
|
$this->prewarmDependencyCache($operations);
|
|
foreach ($operations as $operation) {
|
if (!$this->canProcessOperation($operation)) {
|
continue;
|
}
|
|
if (!$this->areDependenciesMet($operation)) {
|
continue;
|
}
|
|
$this->processOperation($operation);
|
|
// Invalidate operation cache after processing
|
$this->cache->forget(self::CACHE_OPERATION_PREFIX . $operation->id);
|
$this->cache->forget(self::CACHE_USER_QUEUE_PREFIX . $operation->user_id);
|
}
|
|
// Batch invalidate caches at the end
|
$this->invalidateQueueCache();
|
$this->runQueueOnShutdown();
|
|
} catch (Exception $e) {
|
error_log('Error processing queue: ' . $e->getMessage());
|
}
|
}
|
|
public function cancelOperation(string $operation_id): bool
|
{
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Only cancel if pending or scheduled
|
$result = $this->wpdb->update(
|
$table,
|
[
|
'status' => 'cancelled',
|
'completed_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql'),
|
'error_message' => 'Cancelled by user'
|
],
|
[
|
'id' => $operation_id,
|
'status' => ['pending', 'scheduled']
|
]
|
);
|
|
if ($result) {
|
$this->invalidateQueueCache('status');
|
return true;
|
}
|
|
return false;
|
}
|
|
/**
|
* Pre-warm cache for dependency checks
|
*/
|
protected function prewarmDependencyCache(array $operations): void
|
{
|
$allDependencyIds = [];
|
foreach ($operations as $op) {
|
$deps = json_decode($op->dependencies ?? '[]', true);
|
if (!is_array($deps)) {
|
$deps = [$deps];
|
}
|
$allDependencyIds = array_merge($allDependencyIds, $deps);
|
}
|
|
if (empty($allDependencyIds)) {
|
return;
|
}
|
|
// Get all statuses at once
|
$statuses = $this->getOperationStatuses(array_unique($allDependencyIds));
|
|
// Cache them individually
|
foreach ($statuses as $id => $status) {
|
$this->cache->set(self::CACHE_OPERATION_PREFIX . $id, $status, 300);
|
}
|
}
|
|
/**
|
* Get operations with optimized query and caching
|
*/
|
protected function getOperations(int $batch_size): array
|
{
|
$table = $this->wpdb->prefix . $this->table;
|
|
|
$current_time = current_time('mysql');
|
$operations = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT * FROM $table
|
WHERE (
|
status = 'pending' OR
|
(status = 'scheduled' AND scheduled_at <= %s)
|
)
|
AND retries < %d
|
ORDER BY
|
FIELD(priority, 'high', 'normal', 'low'),
|
COALESCE(scheduled_at, created_at)
|
LIMIT %d
|
", $current_time, $this->max_attempts, $batch_size));
|
|
return $operations ?: [];
|
}
|
|
protected function canProcessOperation(object $operation): bool
|
{
|
if ($operation->retries >= $this->max_attempts) {
|
$this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts');
|
return false;
|
}
|
return true;
|
}
|
|
protected function markAsPermanentlyFailed(string $operation_id, string $reason): void
|
{
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Get the current operation to preserve original error
|
$operation = $this->wpdb->get_row($this->wpdb->prepare(
|
"SELECT error_message FROM $table WHERE id = %s",
|
$operation_id
|
));
|
|
// Preserve original error message, append cleanup reason
|
$final_error_message = $reason;
|
if (!empty($operation->error_message)) {
|
$final_error_message = $operation->error_message . " | " . $reason;
|
}
|
|
$result = $this->wpdb->update($table, [
|
'status' => 'failed',
|
'error_message' => $final_error_message, // ✅ Preserves original error
|
'completed_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql')
|
], ['id' => $operation_id]);
|
|
if ($result) {
|
$this->invalidateQueueCache('status');
|
}
|
}
|
|
/**
|
* Get operations for a specific user with caching
|
*/
|
public function getUserOperations(int $user_id, array $filters = []): array
|
{
|
$cacheKey = self::CACHE_USER_QUEUE_PREFIX . $user_id . '_' . md5(serialize($filters));
|
//
|
// // Check cache first
|
// $cached = $this->cache->get($cacheKey);
|
// if ($cached !== false) {
|
// return $cached;
|
// }
|
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
$where = ['user_id = %d'];
|
$params = [$user_id];
|
|
if (!empty($filters['status'])) {
|
$where[] = 'status = %s';
|
$params[] = $filters['status'];
|
}
|
|
if (!empty($filters['type'])) {
|
$where[] = 'type = %s';
|
$params[] = $filters['type'];
|
}
|
|
if (!empty($filters['merge'])) {
|
$where[] = 'merge = %s';
|
$params[] = $filters['merge'];
|
}
|
|
$limit = $filters['limit'] ?? 50;
|
|
$query = "SELECT * FROM $table WHERE " . implode(' AND ', $where) .
|
" ORDER BY created_at DESC LIMIT %d";
|
$params[] = $limit;
|
|
$operations = $this->wpdb->get_results($this->wpdb->prepare($query, ...$params));
|
|
// Cache user operations
|
$this->cache->set($cacheKey, $operations);
|
|
return $operations ?: [];
|
}
|
|
public function getUserOperationStatuses(int $user_id): array
|
{
|
// Get user's operation IDs
|
$table = $this->wpdb->prefix . $this->table;
|
$operation_ids = $this->wpdb->get_col($this->wpdb->prepare(
|
"SELECT id FROM $table WHERE user_id = %d ORDER BY created_at DESC LIMIT 20",
|
$user_id
|
));
|
|
return $this->getOperationStatuses($operation_ids);
|
}
|
|
/**
|
* Check if user's queue has been modified since given timestamp
|
* Returns true if modified, false if not (can send 304)
|
*/
|
public function isUserQueueModified(int $user_id, int $since_timestamp): bool
|
{
|
return $this->cache::lastModified("user_{$user_id}") > $since_timestamp;
|
}
|
protected function invalidateUserQueue(int $user_id): void
|
{
|
// This automatically:
|
// 1. Updates HTTP timestamp for user_{$user_id}
|
// 2. Flushes user-specific caches
|
// 3. Triggers connected cache invalidation
|
Cache::for($user_id)->flush();
|
}
|
|
/**
|
* Invalidate all queue-related caches
|
*/
|
protected function invalidateQueueCache(string $scope = 'all'): void
|
{
|
$cacheKeys = [
|
'status' => [self::CACHE_QUEUE_STATUS, self::CACHE_QUEUE_SIZE],
|
'items' => [self::CACHE_HAS_ITEMS, 'queue_info'],
|
'all' => [
|
self::CACHE_QUEUE_STATUS,
|
self::CACHE_HAS_ITEMS,
|
self::CACHE_QUEUE_SIZE,
|
'queue_info'
|
]
|
];
|
|
$keys = $cacheKeys[$scope] ?? $cacheKeys['all'];
|
|
foreach ($keys as $key) {
|
$this->cache->forget($key);
|
}
|
|
$this->cache->touch();
|
|
if ($scope === 'all') {
|
delete_transient('jvb_queue_status_counts');
|
}
|
}
|
|
/**
|
* @return void
|
*/
|
protected function restartStuckOperations(): void
|
{
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Find operations that have been processing for too long
|
$stuck_operations = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT * FROM $table
|
WHERE status = 'processing'
|
AND started_at < %s
|
LIMIT 5",
|
date('Y-m-d H:i:s', strtotime('-30 minutes'))
|
));
|
|
foreach ($stuck_operations as $operation) {
|
$retries = (int) $operation->retries + 1 ?? 1;
|
|
if ($retries >= $this->max_attempts) {
|
// Too many retries, mark as permanently failed
|
$this->wpdb->update(
|
$table,
|
[
|
'status' => 'failed_permanent',
|
'error_message' => 'Operation exceeded maximum retry attempts',
|
'updated_at' => current_time('mysql'),
|
'completed_at' => current_time('mysql')
|
],
|
[ 'id' => $operation->id ]
|
);
|
|
// Log this as a critical error
|
JVB()->error()->log(
|
'[OperationQueue]:restartStuckOperations',
|
"Operation failed after maximum retries",
|
[
|
'operation_id' => $operation->id,
|
'type' => $operation->type,
|
'user_id' => $operation->user_id,
|
'retries' => $retries,
|
'data' => $operation->request_data
|
],
|
'critical'
|
);
|
|
$this->notifyAdmin($operation);
|
} else {
|
// Reset to pending for another attempt with backoff
|
$this->wpdb->update(
|
$table,
|
[
|
'status' => 'pending',
|
'updated_at' => current_time('mysql'),
|
'started_at' => null,
|
'retries' => $retries,
|
],
|
[ 'id' => $operation->id ]
|
);
|
|
// Log the backoff
|
JVB()->error()->log(
|
'[OperationQueue]:restartStuckOperations',
|
"Operation requeued with exponential backoff",
|
[
|
'operation_id' => $operation->id,
|
'retry_number' => $retries,
|
],
|
'warning'
|
);
|
}
|
}
|
}
|
|
/**
|
* Implement intelligent retry delays with exponential backoff
|
*/
|
public function scheduleRetry(string $operation_id, int $attempt): void
|
{
|
// Calculate delay with exponential backoff and jitter
|
$base_delay = 5; // 5 seconds
|
$max_delay = 3600; // 1 hour max
|
|
$delay = min($base_delay * pow(2, $attempt), $max_delay);
|
|
// Add jitter to prevent thundering herd
|
$jitter = rand(0, $delay * 0.1);
|
$delay += $jitter;
|
|
$scheduled_at = date('Y-m-d H:i:s', time() + $delay);
|
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
$this->wpdb->update(
|
$table,
|
[
|
'status' => 'scheduled',
|
'scheduled_at' => $scheduled_at,
|
'retries' => $attempt
|
],
|
['id' => $operation_id],
|
['%s', '%s', '%d'],
|
['%s']
|
);
|
|
}
|
|
/**
|
* Notify admin about repeatedly failed operation
|
*
|
* @param object $operation The failed operation
|
*
|
* @return bool
|
*/
|
protected function notifyAdmin(object $operation):bool
|
{
|
$admin_email = get_option('admin_email');
|
$site_name = get_bloginfo('name');
|
|
$subject = "[$site_name] Queue Operation Failed After Multiple Attempts";
|
|
$message = "A queue operation has failed after multiple retry attempts:\n\n";
|
$message .= "Operation ID: $operation->id\n";
|
$message .= "Operation Type: $operation->type\n";
|
$message .= "User ID: $operation->user_id\n";
|
$message .= "Created At: $operation->created_at\n";
|
$message .= "Retries: $operation->retries\n\n";
|
|
if (!empty($operation->error_message)) {
|
$message .= "Error Message: $operation->error_message\n\n";
|
}
|
|
$message .= "Please check the error logs for more details.";
|
|
return JVB()->email()->sendEmail($admin_email, $subject, $message);
|
}
|
|
/**
|
* @param object $operation
|
*
|
* @return array
|
*/
|
protected function processOperation(object $operation): array
|
{
|
try {
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Mark as processing...
|
$updated = $this->wpdb->query($this->wpdb->prepare(
|
"UPDATE $table
|
SET status = 'processing',
|
started_at = %s,
|
updated_at = %s
|
WHERE id = %s
|
AND status IN ('pending', 'scheduled')
|
AND (scheduled_at IS NULL OR scheduled_at <= %s)",
|
current_time('mysql'),
|
current_time('mysql'),
|
$operation->id,
|
current_time('mysql')
|
));
|
|
if (!$updated) {
|
throw new Exception('Operation no longer available for processing');
|
}
|
$this->invalidateUserQueue($operation->user_id);
|
|
$data = json_decode($operation->request_data, true);
|
$progress_count = (int) $operation->progress_count;
|
$count = (int) $operation->count;
|
$failed_items = json_decode($operation->failed_items ?: '[]', true);
|
|
// Process chunk
|
$chunk = $this->getNextChunk($operation);
|
|
// Apply filter - will return ['success' => bool, 'result' => mixed]
|
$filterResult = apply_filters(
|
BASE.'handle_bulk_operation',
|
['success' => false, 'result' => 'No handler for operation item'], // Default if no filter
|
$operation,
|
$chunk
|
);
|
|
// Ensure we have the expected format
|
if (!is_array($filterResult) || !isset($filterResult['success'])) {
|
// Log invalid filter response and quit early
|
$invalid_response = is_object($filterResult) ? get_class($filterResult) : gettype($filterResult);
|
JVB()->error()->log(
|
'[OperationQueue]:processOperation',
|
"Invalid filter response format - expected array with 'success' key",
|
[
|
'operation_id' => $operation->id,
|
'operation_type' => $operation->type,
|
'filter_response_type' => $invalid_response,
|
'filter_response' => $filterResult
|
],
|
'error'
|
);
|
|
// Mark operation as failed
|
$this->wpdb->update(
|
$table,
|
[
|
'status' => 'failed',
|
'error_message' => "Invalid filter response: expected array with 'success' key, got {$invalid_response}",
|
'result' => json_encode([
|
'success' => false,
|
'error' => "Invalid filter response format"
|
]),
|
'updated_at' => current_time('mysql'),
|
'completed_at' => current_time('mysql')
|
],
|
['id' => $operation->id]
|
);
|
|
$this->invalidateQueueCache('status');
|
|
return [
|
'success' => false,
|
'result' => "Invalid filter response format"
|
];
|
}
|
|
// Additional validation for WP_Error handling
|
if (is_wp_error($filterResult)) {
|
$filterResult = [
|
'success' => false,
|
'result' => $filterResult->get_error_message()
|
];
|
}
|
|
$newCount = $progress_count + $chunk['progress'];
|
|
if ($filterResult['success']) {
|
// Success path
|
$oldResult = json_decode($operation->result ?: '{}', true);
|
if (!is_array($oldResult)) {
|
$oldResult = [];
|
}
|
|
if (!is_array($filterResult['result'])) {
|
$filterResult['result'] = [$filterResult['result']];
|
}
|
// Store the result data
|
$resultToStore = $this->deepMerge($oldResult, $filterResult['result']);
|
$resultToStore['processed_at'] = current_time('mysql');
|
|
// Check if operation is complete
|
if ($newCount >= $count) {
|
$status = empty($failed_items) ? 'completed' : 'completed_with_errors';
|
|
$result = $this->wpdb->update(
|
$table,
|
[
|
'progress_count' => $newCount,
|
'result' => json_encode($resultToStore),
|
'status' => $status,
|
'completed_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql')
|
],
|
['id' => $operation->id],
|
['%d', '%s', '%s', '%s', '%s'],
|
['%s']
|
);
|
|
// Now do post-completion tasks
|
$this->invalidateUserQueue($operation->user_id);
|
|
$this->trackOperationMetrics($operation->id);
|
|
} else {
|
// More work to do - just update progress and result
|
$this->wpdb->update(
|
$table,
|
[
|
'progress_count' => $newCount,
|
'result' => json_encode($resultToStore),
|
'status' => 'pending', // Back to pending for next chunk
|
'updated_at' => current_time('mysql')
|
],
|
['id' => $operation->id],
|
['%d', '%s', '%s', '%s'],
|
['%s']
|
);
|
|
$this->invalidateUserQueue($operation->user_id);
|
}
|
|
} else {
|
// Error path
|
$error_message = is_string($filterResult['result'])
|
? $filterResult['result']
|
: json_encode($filterResult['result']);
|
|
$error_context = [
|
'index' => $progress_count,
|
'error' => $error_message,
|
'timestamp' => current_time('mysql'),
|
'retry_count' => $operation->retries,
|
'type' => $operation->type
|
];
|
|
$failed_items[] = $error_context;
|
|
if ($newCount >= $count) {
|
// Failed but complete
|
$this->wpdb->update(
|
$table,
|
[
|
'failed_items' => json_encode($failed_items),
|
'progress_count' => $newCount,
|
'result' => json_encode([
|
'success' => false,
|
'error' => $error_message,
|
'failed_items' => $failed_items
|
]),
|
'status' => 'failed',
|
'error_message' => $error_message,
|
'completed_at' => current_time('mysql'),
|
'updated_at' => current_time('mysql')
|
],
|
['id' => $operation->id],
|
['%s', '%d', '%s', '%s', '%s', '%s', '%s'],
|
['%s']
|
);
|
|
$this->invalidateUserQueue($operation->user_id);
|
} else {
|
// Failed but more to process - continue with next chunk
|
$this->wpdb->update(
|
$table,
|
[
|
'failed_items' => json_encode($failed_items),
|
'progress_count' => $newCount,
|
'status' => 'pending',
|
'error_message' => $error_message,
|
'updated_at' => current_time('mysql')
|
],
|
['id' => $operation->id],
|
['%s', '%d', '%s', '%s', '%s'],
|
['%s']
|
);
|
|
}
|
}
|
// Clear operation cache after any update
|
$this->invalidateUserQueue($operation->user_id);
|
return $filterResult;
|
|
} catch (Exception $e) {
|
// Handle exception...
|
JVB()->error()->log(
|
'[OperationQueue]:processOperation',
|
"Exception during operation processing: " . $e->getMessage(),
|
[
|
'operation_id' => $operation->id,
|
'type' => $operation->type,
|
'progress_count' => $progress_count ?? 0,
|
'user_id' => $operation->user_id,
|
'file' => $e->getFile(),
|
'line' => $e->getLine(),
|
'trace' => $e->getTraceAsString()
|
],
|
'error'
|
);
|
|
// Mark operation as failed
|
$this->wpdb->update(
|
$table,
|
[
|
'status' => 'failed',
|
'error_message' => $e->getMessage(),
|
'result' => json_encode([
|
'success' => false,
|
'error' => $e->getMessage()
|
]),
|
'updated_at' => current_time('mysql'),
|
'completed_at' => current_time('mysql')
|
],
|
['id' => $operation->id]
|
);
|
|
$this->invalidateQueueCache('status');
|
|
|
return [
|
'success' => false,
|
'result' => $e->getMessage()
|
];
|
}
|
}
|
|
/**
|
* Extract X amount of items from the operation data to process, based on complexity
|
*
|
* @param object $operation The current operation
|
*
|
* @return bool|array false if not batched, the extracted data to process if batchable
|
*/
|
protected function getNextChunk(object $operation): array
|
{
|
$data = json_decode($operation->request_data, true);
|
|
// Get chunk config from metadata
|
$metadata = json_decode($operation->metadata ?? '{}', true);
|
$chunk_config = $metadata['chunk_config'] ?? null;
|
|
// If no chunk config, return all data with progress of 1
|
if (!$chunk_config) {
|
return array_merge($data, ['progress' => 1]);
|
}
|
|
$chunk_key = $chunk_config['key'];
|
$chunk_size = $chunk_config['size'];
|
$current_progress = (int)$operation->progress_count;
|
|
// Handle single or multiple keys uniformly
|
$keys = (array)$chunk_key;
|
$batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size);
|
|
// Merge non-chunked data with chunked batch
|
$result = array_filter($data, function ($k) use ($keys) {
|
return !in_array($k, $keys);
|
}, ARRAY_FILTER_USE_KEY);
|
|
// Add the batched data
|
foreach ($batch as $k => $v) {
|
$result[$k] = $v;
|
}
|
|
return $result;
|
}
|
|
protected function extractBatch(array $data, array $keys, int $currentProgress, int $batchSize): array
|
{
|
$result = ['progress' => 0];
|
$itemsExtracted = 0;
|
|
// Calculate total items across all keys
|
$totalItems = 0;
|
$keyData = [];
|
|
foreach ($keys as $key) {
|
if (isset($data[$key]) && is_array($data[$key])) {
|
$keyData[$key] = $data[$key];
|
$totalItems += count($data[$key]);
|
}
|
}
|
|
// If current progress >= total, we're done
|
if ($currentProgress >= $totalItems) {
|
return ['progress' => 0];
|
}
|
|
// Extract items starting from currentProgress
|
$globalIndex = 0;
|
|
foreach ($keyData as $key => $items) {
|
$keyItemCount = count($items);
|
|
// Skip this key if we haven't reached it yet
|
if ($globalIndex + $keyItemCount <= $currentProgress) {
|
$globalIndex += $keyItemCount;
|
continue;
|
}
|
|
// Calculate start position within this key
|
$startInKey = max(0, $currentProgress - $globalIndex);
|
$remainingInBatch = $batchSize - $itemsExtracted;
|
|
// Extract what we can from this key
|
$extracted = array_slice($items, $startInKey, $remainingInBatch, true);
|
|
if (!empty($extracted)) {
|
$result[$key] = $extracted;
|
$itemsExtracted += count($extracted);
|
$result['progress'] += count($extracted);
|
}
|
|
// Stop if we've filled the batch
|
if ($itemsExtracted >= $batchSize) {
|
break;
|
}
|
|
$globalIndex += $keyItemCount;
|
}
|
|
return $result;
|
}
|
|
|
protected function updateUserQueueTimestamp(int $user_id)
|
{
|
|
Cache::touch("user_{$user_id}");
|
}
|
|
/**
|
* Track operation metrics with proper success checking
|
* @param string $operationId
|
* @return void
|
*/
|
protected function trackOperationMetrics(string $operationId): void
|
{
|
$metrics_table = $this->wpdb->prefix . $this->metricsTable;
|
$operation = $this->getOperation($operationId, true);
|
$today = date('Y-m-d');
|
|
// Calculate duration if available
|
$duration = null;
|
if (!empty($operation->started_at) && !empty($operation->completed_at)) {
|
$started = strtotime($operation->started_at);
|
$completed = strtotime($operation->completed_at);
|
$duration = $completed - $started;
|
}
|
|
// Determine success status from the result JSON
|
$success = false;
|
if (!empty($operation->result)) {
|
$result = json_decode($operation->result, true);
|
if (is_array($result) && isset($result['success'])) {
|
$success = (bool) $result['success'];
|
} else {
|
// Fallback: check status field
|
$success = ($operation->status === 'completed' || $operation->status === 'completed_with_errors');
|
}
|
}
|
|
// Get or create today's record for this operation type
|
$existing = $this->wpdb->get_row($this->wpdb->prepare(
|
"SELECT * FROM $metrics_table
|
WHERE date = %s AND type = %s",
|
$today,
|
$operation->type
|
));
|
|
if ($existing) {
|
// Update existing record
|
$this->wpdb->update(
|
$metrics_table,
|
[
|
'total_operations' => $existing->total_operations + 1,
|
'successful_operations' => $existing->successful_operations + ($success ? 1 : 0),
|
'failed_operations' => $existing->failed_operations + ($success ? 0 : 1),
|
'average_duration' => $duration ?
|
(($existing->average_duration * $existing->total_operations) + $duration) /
|
($existing->total_operations + 1) :
|
$existing->average_duration,
|
'total_items_processed' => $existing->total_items_processed + $operation->count,
|
'peak_queue_size' => max($existing->peak_queue_size, $this->getCurrentQueueSize()),
|
'peak_memory_usage' => max($existing->peak_memory_usage, memory_get_peak_usage(true)),
|
'peak_cpu_usage' => function_exists('sys_getloadavg') ?
|
max($existing->peak_cpu_usage, sys_getloadavg()[0]) :
|
$existing->peak_cpu_usage
|
],
|
['id' => $existing->id]
|
);
|
} else {
|
// Create new record
|
$this->wpdb->insert(
|
$metrics_table,
|
[
|
'date' => $today,
|
'type' => $operation->type,
|
'total_operations' => 1,
|
'successful_operations' => $success ? 1 : 0,
|
'failed_operations' => $success ? 0 : 1,
|
'average_duration' => $duration,
|
'total_items_processed' => $operation->count,
|
'peak_queue_size' => $this->getCurrentQueueSize(),
|
'peak_memory_usage' => memory_get_peak_usage(true),
|
'peak_cpu_usage' => function_exists('sys_getloadavg') ? sys_getloadavg()[0] : null
|
]
|
);
|
}
|
}
|
|
/**
|
* Helper method to check if an operation was successful
|
* @param object|array $operation Operation object or decoded result array
|
* @return bool
|
*/
|
public function isOperationSuccessful($operation): bool
|
{
|
if (is_object($operation)) {
|
// Check the result field
|
if (!empty($operation->result)) {
|
$result = json_decode($operation->result, true);
|
if (is_array($result) && isset($result['success'])) {
|
return (bool) $result['success'];
|
}
|
}
|
// Fallback to status field
|
return in_array($operation->status, ['completed', 'completed_with_errors']);
|
}
|
|
if (is_array($operation)) {
|
// Direct result array
|
return isset($operation['success']) && $operation['success'];
|
}
|
|
return false;
|
}
|
|
/**
|
* Get current size of queue
|
* @return int
|
*/
|
/**
|
* Get current queue size with caching
|
*/
|
protected function getCurrentQueueSize(): int
|
{
|
return $this->getQueueInfo()['total'];
|
}
|
|
/**
|
* Send daily metrics report to admin
|
* @return void
|
*/
|
public function emailDailyMetricsReport():void
|
{
|
$metrics_table = $this->wpdb->prefix . $this->metricsTable;
|
$yesterday = date('Y-m-d', strtotime('-1 day'));
|
|
$metrics = $this->wpdb->get_results($this->wpdb->prepare(
|
"SELECT * FROM $metrics_table WHERE date = %s",
|
$yesterday
|
));
|
|
if (empty($metrics)) {
|
return;
|
}
|
|
$admin_email = get_option('admin_email');
|
$site_name = get_bloginfo('name');
|
$subject = "[$site_name] Daily Queue Performance - " . $yesterday;
|
|
// Calculate totals
|
$total_ops = 0;
|
$total_success = 0;
|
$total_failed = 0;
|
$total_items = 0;
|
|
foreach ($metrics as $metric) {
|
$total_ops += $metric->total_operations;
|
$total_success += $metric->successful_operations;
|
$total_failed += $metric->failed_operations;
|
$total_items += $metric->total_items_processed;
|
}
|
|
$success_rate = round(($total_success / max(1, $total_ops)) * 100, 1);
|
|
$message = JVB()->email()->h1('Daily Queue Performance Report');
|
$message .= sprintf('<p>Report for <strong>%s</strong></p>', $yesterday);
|
|
// Summary stats in grid
|
$stats = [
|
JVB()->email()->stat($total_ops, 'Operations'),
|
JVB()->email()->stat($total_success, 'Successful', '✓'),
|
JVB()->email()->stat($total_failed, 'Failed', $total_failed > 0 ? '⚠' : ''),
|
JVB()->email()->stat($success_rate . '%', 'Success Rate')
|
];
|
$message .= JVB()->email()->grid($stats, 4);
|
|
$message .= JVB()->email()->spacer(20);
|
|
// Alert if success rate is low
|
if ($success_rate < 90) {
|
$message .= JVB()->email()->alert(
|
sprintf('Success rate of %s%% is below the 90%% threshold', $success_rate),
|
'warning'
|
);
|
}
|
|
$message .= JVB()->email()->h2('Details by Operation Type');
|
|
// Details for each operation type
|
foreach ($metrics as $metric) {
|
$details = [];
|
$details[] = ['label' => 'Total', 'value' => $metric->total_operations];
|
$details[] = ['label' => 'Success', 'value' => JVB()->email()->badge($metric->successful_operations, 'success')];
|
$details[] = ['label' => 'Failed', 'value' => $metric->failed_operations > 0 ? JVB()->email()->badge($metric->failed_operations, 'error') : '0'];
|
|
if ($metric->average_duration) {
|
$details[] = ['label' => 'Avg Duration', 'value' => round($metric->average_duration, 2) . 's'];
|
}
|
|
$details[] = ['label' => 'Items Processed', 'value' => number_format($metric->total_items_processed)];
|
|
if ($metric->peak_memory_usage) {
|
$memory_mb = round($metric->peak_memory_usage / 1024 / 1024, 2);
|
$details[] = ['label' => 'Peak Memory', 'value' => $memory_mb . ' MB'];
|
}
|
|
$message .= JVB()->email()->card(
|
JVB()->email()->table($details),
|
esc_html($metric->type)
|
);
|
}
|
|
// Current queue status
|
$pending_count = $this->getCurrentQueueSize();
|
if ($pending_count > 0) {
|
$message .= JVB()->email()->spacer(20);
|
$message .= JVB()->email()->notice(
|
sprintf('<strong>Current Queue:</strong> %d operations pending', $pending_count)
|
);
|
}
|
|
$message .= JVB()->email()->spacer(20);
|
$message .= JVB()->email()->button(admin_url('admin.php?page=jvb-queue'), 'View Queue Dashboard');
|
|
JVB()->email()->sendEmail($admin_email, $subject, $message, 'QUEUE REPORT');
|
}
|
|
/**
|
* @return int
|
*/
|
protected function getCPUUsage():int
|
{
|
$cpu = @sys_getloadavg();
|
return $cpu ? ( $cpu[0] * 100 / 2 ) : 0; // Normalize for 2 cores
|
}
|
|
/**
|
* @return int
|
*/
|
protected function getMemoryUsage():int
|
{
|
$memory_usage = memory_get_usage(true);
|
$memory_limit = $this->getMemoryLimitBytes();
|
|
return (int)( $memory_usage / $memory_limit ) * 100;
|
}
|
|
/**
|
* @return int
|
*/
|
protected function getMemoryLimitBytes():int
|
{
|
$memory_limit = ini_get('memory_limit');
|
if (preg_match('/^(\d+)(.)$/', $memory_limit, $matches)) {
|
$limit = $matches[1];
|
$unit = strtolower($matches[2]);
|
switch ($unit) {
|
case 'g':
|
$limit *= 1024;
|
case 'm':
|
$limit *= 1024;
|
case 'k':
|
$limit *= 1024;
|
}
|
|
return $limit;
|
}
|
|
return 134217728; // 128MB default
|
}
|
|
/**
|
* @return array
|
*/
|
/**
|
* Get queue status with scheduled operations
|
*/
|
/**
|
* Get queue status with caching
|
*/
|
public function getQueueStatus(): array
|
{
|
// Try cache first
|
$cached = $this->cache->get(self::CACHE_QUEUE_STATUS);
|
if ($cached !== false) {
|
return $cached;
|
}
|
|
// If not cached, fetch from database
|
|
$table = $this->wpdb->prefix . $this->table;
|
$current_time = current_time('mysql');
|
|
// Get counts by status with single optimized query
|
$status_counts = $this->wpdb->get_results("
|
SELECT
|
status,
|
COUNT(*) as count,
|
SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready
|
FROM $table
|
GROUP BY status
|
", OBJECT_K);
|
|
$result = [
|
'pending' => $status_counts['pending']->count ?? 0,
|
'scheduled' => $status_counts['scheduled']->count ?? 0,
|
'scheduled_ready' => $status_counts['scheduled']->scheduled_ready ?? 0,
|
'processing' => $status_counts['processing']->count ?? 0,
|
'completed' => $status_counts['completed']->count ?? 0,
|
'failed' => $status_counts['failed']->count ?? 0,
|
'total' => array_sum(array_column($status_counts, 'count'))
|
];
|
|
// Cache the result
|
$this->cache->set(self::CACHE_QUEUE_STATUS, $result);
|
|
return $result;
|
}
|
|
/**
|
* Calculate server load factor
|
*
|
* @return float Load factor (1.0 = normal, >1.0 = high load)
|
* @return float
|
*/
|
private function calculateServerLoadFactor():float
|
{
|
// Get memory usage percentage
|
$memory_usage = memory_get_usage(true) / $this->getMemoryLimitBytes();
|
|
// Get CPU load if available
|
if (function_exists('sys_getloadavg')) {
|
$load = sys_getloadavg();
|
$cpu_cores = 2; // Adjust for your server
|
$load_factor = $load[0] / $cpu_cores;
|
} else {
|
$load_factor = 1.0; // Default if not available
|
}
|
|
// Calculate combined factor
|
return ( $load_factor * 0.7 ) + ( $memory_usage * 0.3 );
|
}
|
|
/**
|
* Dynamically adjust batch size based on server load
|
* @return float
|
*/
|
protected function getAdaptiveBatchSize():float
|
{
|
$load_factor = $this->calculateServerLoadFactor();
|
|
if ($load_factor > 1.5) {
|
// Server is under heavy load
|
return max(5, floor($this->maxPerBatch / 5));
|
} elseif ($load_factor > 1.0) {
|
// Moderate load
|
return max(10, floor($this->maxPerBatch / 2));
|
} else {
|
// Normal load
|
return $this->maxPerBatch;
|
}
|
}
|
|
/**
|
* Get a specific column value from an operation by ID
|
*
|
* @param string $operation_id The ID of the operation
|
* @param string $column The column name to retrieve (default: 'result')
|
* @param bool $decode_json Whether to JSON decode the retrieved value (default: true)
|
* @param string|null $status Filter by operation status (default: null - any status)
|
*
|
* @return mixed|false The requested value or false if not found
|
*/
|
public function getOperationValue(string $operation_id, string $column, bool $decode_json = true, string|null $status = null):mixed
|
{
|
try {
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Prepare the query with status filter if provided
|
$sql = "SELECT {$this->wpdb->_escape($column)} FROM $table WHERE id = %s";
|
$params = [ $operation_id ];
|
|
// Get the column value
|
$value = $this->wpdb->get_var($this->wpdb->prepare($sql, $params));
|
|
if ($value === null) {
|
return false;
|
}
|
|
// Decode JSON if requested and column appears to be JSON
|
if ($decode_json && $value && (str_starts_with(trim($value), '{') || str_starts_with(trim($value), '['))) {
|
$decoded = json_decode($value, true);
|
// Return the decoded value only if decode was successful
|
if (json_last_error() === JSON_ERROR_NONE) {
|
return $decoded;
|
}
|
}
|
|
return $value;
|
} catch (Exception $e) {
|
JVB()->error()->log(
|
'[OperationQueue]:getOperationValue',
|
'Error retrieving operation value: ' . $e->getMessage(),
|
[
|
'operation_id' => $operation_id,
|
'column' => $column
|
],
|
'error'
|
);
|
|
return false;
|
}
|
}
|
|
/**
|
* Get average processing time for an operation type
|
*
|
* @param string $type The type of operation
|
*
|
* @return float Average seconds per operation
|
*/
|
private function getAverageProcessingTime(string $type):float
|
{
|
$stats = get_option(BASE . 'average_operation_time', []);
|
//Return stored stat, if available
|
if (!empty($stats) && array_key_exists($type, $stats)) {
|
return (float) $stats[$type];
|
}
|
//Build statistics if not available
|
$stats[$type] = $this->buildAverageProcessingTime($type);
|
|
return $stats[$type];
|
}
|
|
/**
|
* @return array
|
*/
|
private function getAllAverageProcessingTimes():array
|
{
|
return get_option(BASE . 'average_operation_time', []);
|
}
|
|
/**
|
* @param string $type
|
*
|
* @return float
|
*/
|
private function buildAverageProcessingTime(string $type): float
|
{
|
// Get existing stats from options table
|
$stats = get_option(BASE . 'average_operation_time', []);
|
|
// If there's already a saved value for this operation type, return it
|
if (isset($stats[$type]) && is_numeric($stats[$type])) {
|
return (float)$stats[$type];
|
}
|
|
// Query the database for average processing time
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
$avgTime = $this->wpdb->get_var(
|
$this->wpdb->prepare(
|
"SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at) / count)
|
FROM {$table}
|
WHERE type = %s
|
AND status = 'completed'",
|
$type
|
)
|
);
|
|
// If query returns null or 0, use default values based on operation type
|
if ($avgTime === null || $avgTime === false || (float)$avgTime <= 0) {
|
// Default times for different operation types
|
$defaultTimes = [
|
'email_notification_digest' => 20.0,
|
'image_upload' => 10.0,
|
'image_processing' => 5.0,
|
'batch_upload' => 10.0,
|
'artist_bulk_approval' => 15.0,
|
'taxonomy_relationships' => 60.0,
|
'rebuild_user_term_index' => 10.0,
|
'bio_update' => 12.0,
|
'featured_image' => 15.0,
|
'content_update' => 8.0,
|
'batch_creation' => 20.0,
|
'favourites_batch' => 10.0,
|
'favourite_list_add' => 5.0,
|
'favourite_list_remove' => 5.0,
|
'notification_mark_all_as_read' => 15.0,
|
'sync_content_taxonomy_tables' => 20.0,
|
'temporary_cleanup' => 10.0,
|
];
|
|
// Use type-specific default or general default
|
$avgTime = (array_key_exists($type, $defaultTimes)) ? $defaultTimes[$type] : 3.0;
|
}
|
|
// Ensure it's a float and save it
|
$avgTime = (float)$avgTime;
|
$stats[$type] = $avgTime;
|
update_option(BASE . 'average_operation_time', $stats);
|
|
return $avgTime;
|
}
|
|
public function cleanupStuckOperations(): void
|
{
|
|
$table = $this->wpdb->prefix . $this->table;
|
|
// Find operations that exceeded retry limits but weren't marked as failed
|
$stuck_operations = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT id, type, retries FROM $table
|
WHERE retries >= %d
|
AND status NOT IN ('failed', 'completed')
|
", $this->max_attempts));
|
|
if (!empty($stuck_operations)) {
|
foreach ($stuck_operations as $operation) {
|
$this->markAsPermanentlyFailed(
|
$operation->id,
|
"Cleanup: Operation exceeded {$this->max_attempts} attempts (had {$operation->retries})"
|
);
|
}
|
}
|
|
// Also find operations stuck in processing for too long
|
$long_processing = $this->wpdb->get_results($this->wpdb->prepare("
|
SELECT id, type, started_at FROM $table
|
WHERE status = 'processing'
|
AND started_at < %s
|
", date('Y-m-d H:i:s', strtotime('-1 hour'))));
|
|
if (!empty($long_processing)) {
|
foreach ($long_processing as $operation) {
|
$this->wpdb->update($table, [
|
'status' => 'pending',
|
'started_at' => null,
|
'retries' => $this->wpdb->get_var($this->wpdb->prepare(
|
"SELECT retries FROM $table WHERE id = %s",
|
$operation->id
|
)) + 1,
|
'updated_at' => current_time('mysql')
|
], ['id' => $operation->id]);
|
}
|
}
|
}
|
|
public function renderAdminPage(): void
|
{
|
$status = $this->getQueueStatus();
|
$health = $this->getQueueHealthReport();
|
?>
|
<div class="wrap">
|
<h1>Operation Queue Status</h1>
|
|
<div class="card">
|
<h2>Current Status</h2>
|
<ul>
|
<li>Queue is: <?= ($this->isQueueLocked()) ? 'locked' : 'unlocked'?>.</li>
|
<li>Queue: <?= ($this->hasItemsInQueue()) ? 'has items in queue' : 'has nothing in queue'?>.</li>
|
<li>Pending: <?php echo $status['pending']; ?></li>
|
<li>Processing: <?php echo $status['processing']; ?></li>
|
<li>Scheduled: <?php echo $status['scheduled']; ?>
|
(<?php echo $status['scheduled_ready']; ?> ready)</li>
|
<li>Completed: <?php echo $status['completed']; ?></li>
|
<li>Failed: <?php echo $status['failed']; ?></li>
|
</ul>
|
</div>
|
|
<?php if (!empty($health['stuck_operations'])): ?>
|
<div class="card">
|
<h2>⚠️ Stuck Operations</h2>
|
<table class="wp-list-table widefat fixed striped">
|
<thead>
|
<tr>
|
<th>ID</th>
|
<th>Type</th>
|
<th>Status</th>
|
<th>Age (minutes)</th>
|
<th>Error</th>
|
</tr>
|
</thead>
|
<tbody>
|
<?php foreach ($health['stuck_operations'] as $op): ?>
|
<tr>
|
<td><?php echo esc_html($op['id']); ?></td>
|
<td><?php echo esc_html($op['type']); ?></td>
|
<td><?php echo esc_html($op['status']); ?></td>
|
<td><?php echo esc_html($op['age_minutes']); ?></td>
|
<td><?php echo esc_html(substr($op['error_message'] ?? '', 0, 50)); ?></td>
|
</tr>
|
<?php endforeach; ?>
|
</tbody>
|
</table>
|
</div>
|
<?php endif; ?>
|
</div>
|
<?php
|
}
|
|
function getQueueHealthReport(): array
|
{
|
|
$table = $this->wpdb->prefix . BASE . '_operation_queue';
|
|
$report = [
|
'status_counts' => [],
|
'stuck_operations' => [],
|
'error_patterns' => [],
|
'performance_stats' => []
|
];
|
|
// Status counts
|
$status_counts = $this->wpdb->get_results("
|
SELECT status, COUNT(*) as count, AVG(retries) as avg_retries
|
FROM $table
|
GROUP BY status
|
", OBJECT_K);
|
|
foreach ($status_counts as $status => $data) {
|
$report['status_counts'][$status] = [
|
'count' => intval($data->count),
|
'avg_retries' => round(floatval($data->avg_retries), 2)
|
];
|
}
|
|
// Stuck operations
|
$report['stuck_operations'] = $this->wpdb->get_results("
|
SELECT id, type, status, retries,
|
TIMESTAMPDIFF(MINUTE, created_at, NOW()) as age_minutes,
|
error_message
|
FROM $table
|
WHERE (
|
retries >= 3 OR
|
(status = 'processing' AND started_at < DATE_SUB(NOW(), INTERVAL 30 MINUTE)) OR
|
(status = 'pending' AND created_at < DATE_SUB(NOW(), INTERVAL 2 HOUR))
|
)
|
ORDER BY created_at DESC
|
LIMIT 10
|
", ARRAY_A);
|
|
// Error patterns
|
$report['error_patterns'] = $this->wpdb->get_results("
|
SELECT
|
SUBSTRING(error_message, 1, 100) as error_pattern,
|
COUNT(*) as frequency,
|
type
|
FROM $table
|
WHERE error_message IS NOT NULL
|
AND updated_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)
|
GROUP BY type, SUBSTRING(error_message, 1, 100)
|
HAVING frequency > 1
|
ORDER BY frequency DESC
|
LIMIT 5
|
", ARRAY_A);
|
|
return $report;
|
}
|
|
protected function archiveToMetrics(int $days_to_keep): void
|
{
|
|
$table = $this->wpdb->prefix . $this->table;
|
$metricsTable = $this->wpdb->prefix . $this->metricsTable;
|
|
// Aggregate and insert metrics
|
$this->wpdb->query($this->wpdb->prepare("
|
INSERT INTO $metricsTable
|
(date, type, total_operations, successful_operations,
|
failed_operations, average_duration, total_items_processed)
|
SELECT
|
DATE(completed_at) as date,
|
type,
|
COUNT(*) as total_operations,
|
SUM(IF(status = 'completed', 1, 0)) as successful,
|
SUM(IF(status LIKE 'failed%', 1, 0)) as failed,
|
AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration,
|
SUM(count) as items_processed
|
FROM $table
|
WHERE completed_at < DATE_SUB(NOW(), INTERVAL %d DAY)
|
AND completed_at >= DATE_SUB(NOW(), INTERVAL %d DAY)
|
GROUP BY DATE(completed_at), type
|
ON DUPLICATE KEY UPDATE
|
total_operations = VALUES(total_operations),
|
successful_operations = VALUES(successful_operations),
|
failed_operations = VALUES(failed_operations),
|
average_duration = VALUES(average_duration),
|
total_items_processed = VALUES(total_items_processed)
|
", $days_to_keep, $days_to_keep + 1));
|
}
|
}
|