| | |
| | | } |
| | | final class FilteredExecutor implements Executor |
| | | { |
| | | public function __construct( |
| | | private Storage $storage |
| | | ) {} |
| | | |
| | | public function execute(Operation $operation, Progress $progress): Result |
| | | { |
| | | $chunkKey = $operation->metadata['chunk_key'] ?? null; |
| | | |
| | | // No chunking — process entire request at once |
| | | if (!$chunkKey) { |
| | | return $this->processSingle($operation, $progress); |
| | | } |
| | | |
| | | // Chunked processing |
| | | return $this->processChunked($operation, $progress, $chunkKey); |
| | | } |
| | | |
| | | private function processSingle(Operation $operation, Progress $progress): Result |
| | | { |
| | | // Process whatever data is passed (Processor handles chunking) |
| | | $filterResult = $this->callFilter($operation, $operation->requestData); |
| | | |
| | | $progress->advance(1); |
| | | $progress->advance($operation->totalItems); |
| | | |
| | | return new Result( |
| | | outcome: $filterResult['success'] ? 'success' : 'failed', |
| | |
| | | ); |
| | | } |
| | | |
| | | private function processChunked(Operation $operation, Progress $progress, string|array $chunkKey): Result |
| | | { |
| | | $keys = (array) $chunkKey; |
| | | $chunkSize = $operation->metadata['chunk_size'] ?? 10; |
| | | $chunks = $this->buildChunks($operation->requestData, $keys, $chunkSize); |
| | | $offset = $operation->metadata['chunk_offset'] ?? 0; |
| | | $results = []; |
| | | |
| | | foreach ($chunks as $index => $chunk) { |
| | | if ($index < $offset) { |
| | | continue; |
| | | } |
| | | |
| | | $chunkData = array_merge( |
| | | // Non-chunked data |
| | | array_diff_key($operation->requestData, array_flip($keys)), |
| | | // This chunk's data |
| | | $chunk['data'] |
| | | ); |
| | | |
| | | $filterResult = $this->callFilter($operation, $chunkData); |
| | | |
| | | if (!$filterResult['success']) { |
| | | // Record failed items but continue |
| | | foreach ($chunk['data'] as $key => $items) { |
| | | foreach ($items as $item) { |
| | | $progress->failItem($item, $filterResult['message'] ?? 'Chunk failed'); |
| | | } |
| | | } |
| | | } |
| | | |
| | | $progress->advance($chunk['count']); |
| | | $operation->metadata['chunk_offset'] = $index + 1; |
| | | |
| | | if (isset($filterResult['result'])) { |
| | | $results = array_merge($results, (array) $filterResult['result']); |
| | | } |
| | | |
| | | // Save progress after each chunk |
| | | $this->storage->save($operation); |
| | | } |
| | | |
| | | $outcome = 'success'; |
| | | if ($operation->failedItems) { |
| | | $outcome = count($operation->failedItems) === $operation->totalItems ? 'failed' : 'partial'; |
| | | } |
| | | |
| | | return new Result( |
| | | outcome: $outcome, |
| | | result: [ |
| | | 'processed' => $operation->processedItems, |
| | | 'failed' => $operation->failedItems ? count($operation->failedItems) : 0, |
| | | 'chunks' => count($chunks), |
| | | 'results' => $results, |
| | | ] |
| | | ); |
| | | } |
| | | |
| | | private function callFilter(Operation $operation, array $data): array |
| | | { |
| | | $filterResult = apply_filters( |
| | |
| | | |
| | | return $filterResult; |
| | | } |
| | | |
| | | private function buildChunks(array $data, array $keys, int $chunkSize): array |
| | | { |
| | | // Collect all items across keys |
| | | $allItems = []; |
| | | foreach ($keys as $key) { |
| | | if (isset($data[$key]) && is_array($data[$key])) { |
| | | foreach ($data[$key] as $i => $item) { |
| | | $allItems[] = ['key' => $key, 'index' => $i, 'item' => $item]; |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Split into chunks |
| | | $chunks = []; |
| | | foreach (array_chunk($allItems, $chunkSize) as $chunkItems) { |
| | | $chunkData = []; |
| | | foreach ($chunkItems as $entry) { |
| | | $chunkData[$entry['key']][] = $entry['item']; |
| | | } |
| | | $chunks[] = [ |
| | | 'data' => $chunkData, |
| | | 'count' => count($chunkItems), |
| | | ]; |
| | | } |
| | | |
| | | return $chunks; |
| | | } |
| | | } |