| | |
| | | { |
| | | switch ($action) { |
| | | case 'unlock-operation-queue': |
| | | error_log('Unlocking Queue from admin action'); |
| | | $this->unlockQueue(); |
| | | return new WP_REST_Response([ |
| | | 'success' => true, |
| | |
| | | return; |
| | | } |
| | | |
| | | error_log('[queue] Checking queue...'); |
| | | |
| | | // Peek at what operations we might process |
| | | $batch_size = $this->getAdaptiveBatchSize(); |
| | |
| | | { |
| | | try { |
| | | $table = $this->wpdb->prefix . $this->table; |
| | | error_log('QUEUING OPERATION: '.print_r([ |
| | | 'type' => $type, |
| | | 'user_id' => $user_id, |
| | | 'data' => $data, |
| | | 'options' => $options, |
| | | ],true)); |
| | | |
| | | // Extract options |
| | | $priority = (array_key_exists('priority', $options) && in_array($options['priority'], ['low', 'normal', 'high'])) ? $options['priority'] : 'normal'; |
| | | $delay = (array_key_exists('delay', $options)) ? (int) $options['delay'] : 0; |
| | |
| | | // Generate operation ID |
| | | $operation_id = $this->checkOperationId($operation_id, $type, $merge, $user_id); |
| | | |
| | | error_log('Checked OperationId: '.print_r($operation_id, true)); |
| | | // Check for existing operation |
| | | $existing = $this->getOperation($operation_id); |
| | | error_log('Existing: '.print_r($existing, true)); |
| | | |
| | | $dependencies = $options['depends_on'] ?? []; |
| | | if (is_string($dependencies)) { |
| | |
| | | } |
| | | |
| | | if ($existing) { |
| | | error_log('[queue]:queueOperation - Already Existing operation... updating'); |
| | | |
| | | if ($merge !== 'append' && ($existing->status === 'pending' || $existing->status === 'scheduled')) { |
| | | error_log('Not appending operation, checking for merge'); |
| | | |
| | | if ($merge === 'merge') { |
| | | $existing_data = json_decode($existing->request_data ?? '{}', true); |
| | | $data = $this->deepMerge($existing_data, $data); |
| | |
| | | } |
| | | |
| | | } else if ($merge === 'append') { |
| | | error_log('Append operation, creating a new one'); |
| | | $operation_id = $operation_id . '_' . time() . '_' . substr(uniqid(), -4); |
| | | $existing = null; |
| | | } |
| | | } |
| | | |
| | | if (!$existing) { |
| | | error_log('Inserting new operation into table'); |
| | | // Prepare metadata with chunk config |
| | | $metadata = $chunk_config ? ['chunk_config' => $chunk_config] : []; |
| | | |
| | |
| | | ]; |
| | | |
| | | } catch (Exception $e) { |
| | | error_log("Error queueing operation: " . $e->getMessage()); |
| | | JVB()->error()->log('queue', $e->getMessage(), $data, 'high'); |
| | | return new WP_Error('queue_failed', $e->getMessage()); |
| | | } |
| | |
| | | $merged = $existing; |
| | | |
| | | if (!$this->isAssociativeArray($existing) && !$this->isAssociativeArray($new)) { |
| | | error_log('It is an associative array!'); |
| | | return array_merge($existing, $new); |
| | | } |
| | | error_log('Not an associative array... moving on!'); |
| | | foreach ($new as $key => $newValue) { |
| | | if (!array_key_exists($key, $existing)) { |
| | | $merged[$key] = $newValue; |
| | |
| | | 'limit' => 1 |
| | | ]); |
| | | if ($existing) { |
| | | error_log('Found existing User operations: '.print_r($existing, true)); |
| | | $operation_id = $existing[0]->id; |
| | | } |
| | | } |
| | | |
| | | if (!$operation_id) { |
| | | $operation_id = uniqid('op_'); |
| | | error_log('Generated operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | if (!str_starts_with($operation_id, 'u')) { |
| | | $operation_id = 'u' . $user_id . '_' . $operation_id; |
| | | error_log('Added user ID to operation ID: '.print_r($operation_id, true)); |
| | | } |
| | | |
| | | return $operation_id; |
| | |
| | | { |
| | | // Only add if not already scheduled AND if operation is high priority or small |
| | | if (!has_action('shutdown', [$this, 'processQueueOnShutdown'])) { |
| | | error_log('Adding shutdown call'); |
| | | add_action('shutdown', [$this, 'processQueueOnShutdown'], 100); |
| | | } |
| | | } |
| | |
| | | protected function canProcessOperation(object $operation): bool |
| | | { |
| | | if ($operation->retries >= $this->max_attempts) { |
| | | error_log("Operation {$operation->id} exceeded retry limit: {$operation->retries} >= {$this->max_attempts}"); |
| | | $this->markAsPermanentlyFailed($operation->id, 'Exceeded maximum retry attempts'); |
| | | return false; |
| | | } |
| | |
| | | |
| | | if ($result) { |
| | | $this->invalidateQueueCache('status'); |
| | | error_log("Marked operation {$operation_id} as permanently failed: {$final_error_message}"); |
| | | } |
| | | } |
| | | |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | // Log retry schedule |
| | | error_log(sprintf( |
| | | "Operation %s scheduled for retry #%d at %s (delay: %ds)", |
| | | $operation_id, |
| | | $attempt, |
| | | $scheduled_at, |
| | | $delay |
| | | )); |
| | | } |
| | | |
| | | /** |
| | |
| | | ]; |
| | | } |
| | | |
| | | error_log('Filtered Result: '.print_r($filterResult, true)); |
| | | |
| | | $newCount = $progress_count + $chunk['progress']; |
| | | |
| | | if ($filterResult['success']) { |
| | |
| | | $filterResult['result'] = [$filterResult['result']]; |
| | | } |
| | | // Store the result data |
| | | error_log('Merging Old Result: '. print_r($oldResult, true)); |
| | | error_log('With Newer Result: '. print_r($filterResult['result'], true)); |
| | | $resultToStore = $this->deepMerge($oldResult, $filterResult['result']); |
| | | |
| | | error_log('Merged Result: '.print_r($resultToStore, true)); |
| | | |
| | | $resultToStore['processed_at'] = current_time('mysql'); |
| | | |
| | | // Check if operation is complete |
| | |
| | | ['%s'] |
| | | ); |
| | | |
| | | |
| | | |
| | | |
| | | error_log('Completion result: '.print_r($result, true)); |
| | | // Now do post-completion tasks |
| | | $this->invalidateQueueCache('status'); |
| | | $this->updateLastModified($operation->user_id); |
| | |
| | | ", $this->max_attempts)); |
| | | |
| | | if (!empty($stuck_operations)) { |
| | | error_log("Found " . count($stuck_operations) . " stuck operations to clean up"); |
| | | |
| | | foreach ($stuck_operations as $operation) { |
| | | $this->markAsPermanentlyFailed( |
| | | $operation->id, |
| | |
| | | ", date('Y-m-d H:i:s', strtotime('-1 hour')))); |
| | | |
| | | if (!empty($long_processing)) { |
| | | error_log("Found " . count($long_processing) . " long-running operations to reset"); |
| | | |
| | | foreach ($long_processing as $operation) { |
| | | $this->wpdb->update($table, [ |
| | | 'status' => 'pending', |