queue(); * $queue->registry()->register('helcim_sync_to', new TypeConfig( * executor: new IntegrationExecutor(), * chunkKey: 'items', * chunkSize: 10, * maxRetries: 3 * )); */ final class IntegrationExecutor implements Executor { /** * Map of integration service names to class identifiers. * Populated on first use from JVB()->connect(). */ private array $integrationCache = []; public function execute(Operation $operation, Progress $progress): Result { try { [$serviceName, $action] = $this->parseOperationType($operation->type); $integration = $this->resolveIntegration($serviceName, $operation->userId); if (!$integration || !$integration->isSetUp()) { return new Result( outcome: 'failed', result: ['error' => "Integration '{$serviceName}' not available or not configured"] ); } $data = $operation->requestData; return match ($action) { 'sync_to' => $this->processSyncTo($integration, $data, $progress), 'sync_from' => $this->processSyncFrom($integration, $data, $progress), 'delete_from' => $this->processDeleteFrom($integration, $data, $progress), 'sync_customer' => $this->processSyncCustomer($integration, $data, $progress), 'import' => $this->processImport($integration, $data, $progress), default => $this->processDynamic($integration, $action, $data, $progress), }; } catch (Exception $e) { JVB()->error()->log( '[IntegrationExecutor]:execute', $e->getMessage(), [ 'operation_id' => $operation->id, 'operation_type' => $operation->type, 'user_id' => $operation->userId, ] ); return new Result( outcome: 'failed', result: ['error' => $e->getMessage()] ); } } /***************************************************************** * TYPE PARSING *****************************************************************/ /** * Parse 'helcim_sync_to' → ['helcim', 'sync_to'] */ private function parseOperationType(string $type): array { // Remove BASE prefix if present (e.g. 'jvb_helcim_sync_to' → 'helcim_sync_to') $type = str_replace(BASE, '', $type); $pos = strpos($type, '_'); if ($pos === false) { throw new Exception("Invalid integration operation type: {$type}"); } $serviceName = substr($type, 0, $pos); $action = substr($type, $pos + 1); return [$serviceName, $action]; } /** * Resolve integration instance, optionally for a specific user */ private function resolveIntegration(string $serviceName, int $userId): ?object { if (!isset($this->integrationCache[$serviceName])) { $this->integrationCache[$serviceName] = JVB()->connect($serviceName); } $integration = $this->integrationCache[$serviceName]; // If operation has a user context, re-instantiate for that user if ($integration && $userId) { $class = get_class($integration); return new $class($userId); } return $integration; } /***************************************************************** * OPERATION HANDLERS *****************************************************************/ /** * Sync WordPress posts → external service */ private function processSyncTo(object $integration, array $data, Progress $progress): Result { $items = $data['items'] ?? []; $success = []; $errors = []; if (empty($items)) { return new Result(outcome: 'success', result: ['synced' => [], 'message' => 'No items to sync']); } foreach ($items as $postID) { try { $result = $integration->syncPostToService((int)$postID); if (is_wp_error($result)) { $errors[$postID] = $result->get_error_message(); $progress->failItem($postID, $result->get_error_message()); } else { $success[] = $postID; $progress->advance(); } } catch (Exception $e) { $errors[$postID] = $e->getMessage(); $progress->failItem($postID, $e->getMessage()); } } $outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial'); return new Result( outcome: $outcome, result: [ 'synced' => $success, 'errors' => $errors, 'synced_count' => count($success), 'failed_count' => count($errors), ] ); } /** * Sync external service → WordPress posts */ private function processSyncFrom(object $integration, array $data, Progress $progress): Result { $items = $data['items'] ?? $data['external_ids'] ?? []; $success = []; $errors = []; foreach ($items as $externalId) { try { $result = $integration->syncFromService($externalId); if (is_wp_error($result)) { $errors[$externalId] = $result->get_error_message(); $progress->failItem($externalId, $result->get_error_message()); } else { $success[] = $externalId; $progress->advance(); } } catch (Exception $e) { $errors[$externalId] = $e->getMessage(); $progress->failItem($externalId, $e->getMessage()); } } $outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial'); return new Result( outcome: $outcome, result: ['imported' => $success, 'errors' => $errors] ); } /** * Delete items from external service */ private function processDeleteFrom(object $integration, array $data, Progress $progress): Result { $externalIds = $data['external_ids'] ?? []; $success = []; $errors = []; foreach ($externalIds as $externalId) { try { $result = $integration->deleteFromService($externalId); if (is_wp_error($result)) { $errors[$externalId] = $result->get_error_message(); } else { $success[] = $externalId; } $progress->advance(); } catch (Exception $e) { $errors[$externalId] = $e->getMessage(); $progress->failItem($externalId, $e->getMessage()); } } $outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial'); return new Result(outcome: $outcome, result: ['deleted' => $success, 'errors' => $errors]); } /** * Sync a customer record */ private function processSyncCustomer(object $integration, array $data, Progress $progress): Result { try { $result = $integration->syncCustomer($data); $progress->advance(); if (is_wp_error($result)) { return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]); } return new Result(outcome: 'success', result: $result); } catch (Exception $e) { return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]); } } /** * Bulk import from external service */ private function processImport(object $integration, array $data, Progress $progress): Result { try { $result = $integration->importFromService($data); $progress->advance($data['count'] ?? 1); if (is_wp_error($result)) { return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]); } return new Result(outcome: 'success', result: $result); } catch (Exception $e) { return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]); } } /** * Fallback: call processAction on the integration for non-standard actions */ private function processDynamic(object $integration, string $action, array $data, Progress $progress): Result { try { $result = $integration->processAction($action, $data); $progress->advance(); if (is_wp_error($result)) { return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]); } return new Result( outcome: 'success', result: is_array($result) ? $result : ['result' => $result] ); } catch (Exception $e) { return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]); } } }