<?php
|
namespace JVBase\managers\queue\executors;
|
|
use JVBase\managers\queue\{Executor, Operation, Progress, Result};
|
use Exception;
|
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
/**
|
* Executor for integration-related queue operations.
|
*
|
* Routes operations to the correct integration instance based on the
|
* operation type prefix (e.g. 'helcim_sync_product' → Helcim class).
|
*
|
* Handles: {integration}_sync_to, {integration}_delete_from,
|
* {integration}_sync_from, {integration}_sync_customer, etc.
|
*
|
* Registration (in Integrations::registerAdditionalHooks):
|
* $queue = JVB()->queue();
|
* $queue->registry()->register('helcim_sync_to', new TypeConfig(
|
* executor: new IntegrationExecutor(),
|
* chunkKey: 'items',
|
* chunkSize: 10,
|
* maxRetries: 3
|
* ));
|
*/
|
final class IntegrationExecutor implements Executor
|
{
|
/**
|
* Map of integration service names to class identifiers.
|
* Populated on first use from JVB()->connect().
|
*/
|
private array $integrationCache = [];
|
|
public function execute(Operation $operation, Progress $progress): Result
|
{
|
try {
|
[$serviceName, $action] = $this->parseOperationType($operation->type);
|
|
$integration = $this->resolveIntegration($serviceName, $operation->userId);
|
|
if (!$integration || !$integration->isSetUp()) {
|
return new Result(
|
outcome: 'failed',
|
result: ['error' => "Integration '{$serviceName}' not available or not configured"]
|
);
|
}
|
|
$data = $operation->requestData;
|
|
return match ($action) {
|
'sync_to' => $this->processSyncTo($integration, $data, $progress),
|
'sync_from' => $this->processSyncFrom($integration, $data, $progress),
|
'delete_from' => $this->processDeleteFrom($integration, $data, $progress),
|
'sync_customer' => $this->processSyncCustomer($integration, $data, $progress),
|
'import' => $this->processImport($integration, $data, $progress),
|
default => $this->processDynamic($integration, $action, $data, $progress),
|
};
|
|
} catch (Exception $e) {
|
JVB()->error()->log(
|
'[IntegrationExecutor]:execute',
|
$e->getMessage(),
|
[
|
'operation_id' => $operation->id,
|
'operation_type' => $operation->type,
|
'user_id' => $operation->userId,
|
]
|
);
|
|
return new Result(
|
outcome: 'failed',
|
result: ['error' => $e->getMessage()]
|
);
|
}
|
}
|
|
/*****************************************************************
|
* TYPE PARSING
|
*****************************************************************/
|
|
/**
|
* Parse 'helcim_sync_to' → ['helcim', 'sync_to']
|
*/
|
private function parseOperationType(string $type): array
|
{
|
// Remove BASE prefix if present (e.g. 'jvb_helcim_sync_to' → 'helcim_sync_to')
|
$type = str_replace(BASE, '', $type);
|
|
$pos = strpos($type, '_');
|
if ($pos === false) {
|
throw new Exception("Invalid integration operation type: {$type}");
|
}
|
|
$serviceName = substr($type, 0, $pos);
|
$action = substr($type, $pos + 1);
|
|
return [$serviceName, $action];
|
}
|
|
/**
|
* Resolve integration instance, optionally for a specific user
|
*/
|
private function resolveIntegration(string $serviceName, int $userId): ?object
|
{
|
if (!isset($this->integrationCache[$serviceName])) {
|
$this->integrationCache[$serviceName] = JVB()->connect($serviceName);
|
}
|
|
$integration = $this->integrationCache[$serviceName];
|
|
// If operation has a user context, re-instantiate for that user
|
if ($integration && $userId) {
|
$class = get_class($integration);
|
return new $class($userId);
|
}
|
|
return $integration;
|
}
|
|
/*****************************************************************
|
* OPERATION HANDLERS
|
*****************************************************************/
|
|
/**
|
* Sync WordPress posts → external service
|
*/
|
private function processSyncTo(object $integration, array $data, Progress $progress): Result
|
{
|
$items = $data['items'] ?? [];
|
$success = [];
|
$errors = [];
|
|
if (empty($items)) {
|
return new Result(outcome: 'success', result: ['synced' => [], 'message' => 'No items to sync']);
|
}
|
|
foreach ($items as $postID) {
|
try {
|
$result = $integration->syncPostToService((int)$postID);
|
if (is_wp_error($result)) {
|
$errors[$postID] = $result->get_error_message();
|
$progress->failItem($postID, $result->get_error_message());
|
} else {
|
$success[] = $postID;
|
$progress->advance();
|
}
|
} catch (Exception $e) {
|
$errors[$postID] = $e->getMessage();
|
$progress->failItem($postID, $e->getMessage());
|
}
|
}
|
|
$outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial');
|
|
return new Result(
|
outcome: $outcome,
|
result: [
|
'synced' => $success,
|
'errors' => $errors,
|
'synced_count' => count($success),
|
'failed_count' => count($errors),
|
]
|
);
|
}
|
|
/**
|
* Sync external service → WordPress posts
|
*/
|
private function processSyncFrom(object $integration, array $data, Progress $progress): Result
|
{
|
$items = $data['items'] ?? $data['external_ids'] ?? [];
|
$success = [];
|
$errors = [];
|
|
foreach ($items as $externalId) {
|
try {
|
$result = $integration->syncFromService($externalId);
|
if (is_wp_error($result)) {
|
$errors[$externalId] = $result->get_error_message();
|
$progress->failItem($externalId, $result->get_error_message());
|
} else {
|
$success[] = $externalId;
|
$progress->advance();
|
}
|
} catch (Exception $e) {
|
$errors[$externalId] = $e->getMessage();
|
$progress->failItem($externalId, $e->getMessage());
|
}
|
}
|
|
$outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial');
|
|
return new Result(
|
outcome: $outcome,
|
result: ['imported' => $success, 'errors' => $errors]
|
);
|
}
|
|
/**
|
* Delete items from external service
|
*/
|
private function processDeleteFrom(object $integration, array $data, Progress $progress): Result
|
{
|
$externalIds = $data['external_ids'] ?? [];
|
$success = [];
|
$errors = [];
|
|
foreach ($externalIds as $externalId) {
|
try {
|
$result = $integration->deleteFromService($externalId);
|
if (is_wp_error($result)) {
|
$errors[$externalId] = $result->get_error_message();
|
} else {
|
$success[] = $externalId;
|
}
|
$progress->advance();
|
} catch (Exception $e) {
|
$errors[$externalId] = $e->getMessage();
|
$progress->failItem($externalId, $e->getMessage());
|
}
|
}
|
|
$outcome = empty($errors) ? 'success' : (empty($success) ? 'failed' : 'partial');
|
return new Result(outcome: $outcome, result: ['deleted' => $success, 'errors' => $errors]);
|
}
|
|
/**
|
* Sync a customer record
|
*/
|
private function processSyncCustomer(object $integration, array $data, Progress $progress): Result
|
{
|
try {
|
$result = $integration->syncCustomer($data);
|
$progress->advance();
|
|
if (is_wp_error($result)) {
|
return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]);
|
}
|
return new Result(outcome: 'success', result: $result);
|
} catch (Exception $e) {
|
return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]);
|
}
|
}
|
|
/**
|
* Bulk import from external service
|
*/
|
private function processImport(object $integration, array $data, Progress $progress): Result
|
{
|
try {
|
$result = $integration->importFromService($data);
|
$progress->advance($data['count'] ?? 1);
|
|
if (is_wp_error($result)) {
|
return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]);
|
}
|
return new Result(outcome: 'success', result: $result);
|
} catch (Exception $e) {
|
return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]);
|
}
|
}
|
|
/**
|
* Fallback: call processAction on the integration for non-standard actions
|
*/
|
private function processDynamic(object $integration, string $action, array $data, Progress $progress): Result
|
{
|
try {
|
$result = $integration->processAction($action, $data);
|
$progress->advance();
|
|
if (is_wp_error($result)) {
|
return new Result(outcome: 'failed', result: ['error' => $result->get_error_message()]);
|
}
|
|
return new Result(
|
outcome: 'success',
|
result: is_array($result) ? $result : ['result' => $result]
|
);
|
} catch (Exception $e) {
|
return new Result(outcome: 'failed', result: ['error' => $e->getMessage()]);
|
}
|
}
|
}
|