| | |
| | | <?php |
| | | namespace JVBase\managers; |
| | | |
| | | use JVBase\managers\CacheManager; |
| | | use Exception; |
| | | use JVBase\utility\Features; |
| | | use WP_Error; |
| | | use WP_REST_Response; |
| | | use WP_REST_Request; |
| | |
| | | |
| | | protected ?CacheManager $cache = null; |
| | | protected int $ttl = 300; |
| | | protected string $cacheGroup = 'queue'; |
| | | // Cache keys for different data types |
| | | private const CACHE_QUEUE_STATUS = 'status'; |
| | | private const CACHE_HAS_ITEMS = 'has_items'; |
| | | private const CACHE_OPERATION_PREFIX = 'op_'; |
| | | private const CACHE_USER_QUEUE_PREFIX = 'user_queue_'; |
| | | private const CACHE_METRICS_PREFIX = 'metrics_'; |
| | | private const CACHE_QUEUE_SIZE = 'queue_size'; |
| | | |
| | | // Prepared statement cache |
| | | protected array $preparedStatements = []; |
| | | |
| | | public function __construct() |
| | | { |
| | |
| | | * |
| | | * @return bool|WP_REST_Response |
| | | */ |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action) |
| | | public function adminActionFilter(WP_REST_Response $response, WP_REST_Request $request, string $action):WP_REST_Response|bool |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | |
| | | $result = $this->wpdb->get_row($this->wpdb->prepare(" |
| | | SELECT |
| | | COUNT(*) as total, |
| | | SUM(CASE WHEN status IN ('pending', 'processing') THEN 1 ELSE 0 END) as active, |
| | | SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= %s THEN 1 ELSE 0 END) as ready_scheduled |
| | | SUM(IF(status IN ('pending', 'processing'), 1, 0)) as active, |
| | | SUM(IF(status = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled |
| | | FROM $table |
| | | WHERE status IN ('pending', 'processing', 'scheduled') |
| | | ", $current_time)); |
| | |
| | | // $this->pruneOldMetrics(); // Optional: clean old metrics |
| | | } |
| | | |
| | | protected function pruneOldMetrics(): void |
| | | { |
| | | $metricsTable = $this->wpdb->prefix . $this->metricsTable; |
| | | |
| | | // Keep only last 90 days of metrics |
| | | $this->wpdb->query($this->wpdb->prepare( |
| | | "DELETE FROM $metricsTable WHERE date < %s", |
| | | date('Y-m-d', strtotime('-90 days')) |
| | | )); |
| | | } |
| | | |
| | | protected function isHeavyOperation(string $type): bool |
| | | { |
| | | $heavyOperations = [ |
| | |
| | | */ |
| | | protected function isQueueLocked():bool |
| | | { |
| | | $lock = get_transient(BASE . 'queue_lock'); |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | if ($lock) { |
| | | // Check if the lock is stale (process might have crashed) |
| | | $lock_time = (int)$lock; |
| | | $current_time = time(); |
| | | // Check if lock exists and is recent |
| | | $result = $this->wpdb->get_var($this->wpdb->prepare( |
| | | "SELECT IS_USED_LOCK(%s)", |
| | | $lock_name |
| | | )); |
| | | |
| | | // If lock is older than 5 minutes, it's probably stale |
| | | if (($current_time - $lock_time) > 300) { |
| | | $this->unlockQueue(); // Clear stale lock |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | return false; |
| | | return $result !== null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | protected function lockQueue():bool |
| | | { |
| | | // Try to acquire the lock |
| | | $result = set_transient(BASE . 'queue_lock', time()); |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | if ($result) { |
| | | return true; |
| | | } |
| | | return false; |
| | | // Try to acquire lock with 0 timeout (non-blocking) |
| | | $result = $this->wpdb->get_var($this->wpdb->prepare( |
| | | "SELECT GET_LOCK(%s, 0)", |
| | | $lock_name |
| | | )); |
| | | |
| | | return $result === '1'; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | protected function unlockQueue():void |
| | | { |
| | | delete_transient(BASE . 'queue_lock'); |
| | | $lock_name = BASE . '_queue_lock'; |
| | | |
| | | $this->wpdb->query($this->wpdb->prepare( |
| | | "SELECT RELEASE_LOCK(%s)", |
| | | $lock_name |
| | | )); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | $this->updateLastModified($user_id); |
| | | $this->invalidateQueueCache(); |
| | | $this->cache->delete(self::CACHE_USER_QUEUE_PREFIX . $user_id); |
| | | $this->invalidateUserQueue($user_id); |
| | | $this->runQueueOnShutdown(); |
| | | |
| | | return [ |
| | |
| | | } |
| | | } |
| | | |
| | | protected function updateLastModified(int $user_id) { |
| | | CacheManager::updateTimestamp("user_{$user_id}"); |
| | | } |
| | | |
| | | protected function deepMerge(array $existing, array $new): array |
| | | { |
| | | $merged = $existing; |
| | |
| | | AND retries < %d |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | COALESCE(scheduled_at, created_at) ASC |
| | | COALESCE(scheduled_at, created_at) |
| | | LIMIT %d |
| | | ", $current_time, $this->max_attempts, $batch_size)); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if user's queue has been modified since given timestamp |
| | | * Returns true if modified, false if not (can send 304) |
| | | */ |
| | | public function isUserQueueModified(int $user_id, int $since_timestamp): bool |
| | | { |
| | | return $this->cache::getTimestamp("user_{$user_id}") > $since_timestamp; |
| | | } |
| | | protected function invalidateUserQueue(int $user_id): void |
| | | { |
| | | // This automatically: |
| | | // 1. Updates HTTP timestamp for user_{$user_id} |
| | | // 2. Flushes user-specific caches |
| | | // 3. Triggers connected cache invalidation |
| | | CacheManager::invalidateAll("user_{$user_id}"); |
| | | } |
| | | |
| | | /** |
| | | * Invalidate all queue-related caches |
| | | */ |
| | | protected function invalidateQueueCache(string $scope = 'all'): void |
| | |
| | | $this->cache->delete($key); |
| | | } |
| | | |
| | | $this->cache->touch(); |
| | | |
| | | if ($scope === 'all') { |
| | | // Clear entire group for complete refresh |
| | | $this->cache->invalidate(); |
| | | delete_transient('jvb_queue_status_counts'); |
| | | } |
| | | } |
| | |
| | | if (!$updated) { |
| | | throw new Exception('Operation no longer available for processing'); |
| | | } |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | |
| | | $data = json_decode($operation->request_data, true); |
| | | $progress_count = (int) $operation->progress_count; |
| | |
| | | ); |
| | | |
| | | // Now do post-completion tasks |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->updateUserQueueTimestamp($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | |
| | | $this->trackOperationMetrics($operation->id); |
| | | |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | } |
| | | |
| | | } else { |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | } else { |
| | | // Failed but more to process - continue with next chunk |
| | | $this->wpdb->update( |
| | |
| | | } |
| | | } |
| | | // Clear operation cache after any update |
| | | $this->cache->delete(self::CACHE_OPERATION_PREFIX . $operation->id); |
| | | $this->updateLastModified($operation->user_id); |
| | | $this->invalidateUserQueue($operation->user_id); |
| | | return $filterResult; |
| | | |
| | | } catch (Exception $e) { |
| | |
| | | $batch = $this->extractBatch($data, $keys, $current_progress, $chunk_size); |
| | | |
| | | // Merge non-chunked data with chunked batch |
| | | $result = []; |
| | | foreach ($data as $k => $v) { |
| | | if (!in_array($k, $keys)) { |
| | | $result[$k] = $v; // Keep non-batch data |
| | | } |
| | | } |
| | | $result = array_filter($data, function ($k) use ($keys) { |
| | | return !in_array($k, $keys); |
| | | }, ARRAY_FILTER_USE_KEY); |
| | | |
| | | // Add the batched data |
| | | foreach ($batch as $k => $v) { |
| | |
| | | SELECT |
| | | status, |
| | | COUNT(*) as count, |
| | | SUM(CASE WHEN status = 'scheduled' AND scheduled_at <= '$current_time' THEN 1 ELSE 0 END) as scheduled_ready |
| | | SUM(IF(status = 'scheduled' AND scheduled_at <= '$current_time', 1, 0)) as scheduled_ready |
| | | FROM $table |
| | | GROUP BY status |
| | | ", OBJECT_K); |
| | |
| | | DATE(completed_at) as date, |
| | | type, |
| | | COUNT(*) as total_operations, |
| | | SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful, |
| | | SUM(CASE WHEN status LIKE 'failed%' THEN 1 ELSE 0 END) as failed, |
| | | SUM(IF(status = 'completed', 1, 0)) as successful, |
| | | SUM(IF(status LIKE 'failed%', 1, 0)) as failed, |
| | | AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) as avg_duration, |
| | | SUM(count) as items_processed |
| | | FROM $table |