Jake Vanderwerf
4 days ago 0afb2c0046b55c123eafb4ab9ee77efa68d12463
inc/managers/queue/FilteredExecutor.php
@@ -5,28 +5,12 @@
}
final class FilteredExecutor implements Executor
{
   public function __construct(
      private Storage $storage
   ) {}
   public function execute(Operation $operation, Progress $progress): Result
   {
      $chunkKey = $operation->metadata['chunk_key'] ?? null;
      // No chunking — process entire request at once
      if (!$chunkKey) {
         return $this->processSingle($operation, $progress);
      }
      // Chunked processing
      return $this->processChunked($operation, $progress, $chunkKey);
   }
   private function processSingle(Operation $operation, Progress $progress): Result
   {
      // Process whatever data is passed (Processor handles chunking)
      $filterResult = $this->callFilter($operation, $operation->requestData);
      $progress->advance(1);
      $progress->advance($operation->totalItems);
      return new Result(
         outcome: $filterResult['success'] ? 'success' : 'failed',
@@ -34,64 +18,6 @@
      );
   }
   private function processChunked(Operation $operation, Progress $progress, string|array $chunkKey): Result
   {
      $keys = (array) $chunkKey;
      $chunkSize = $operation->metadata['chunk_size'] ?? 10;
      $chunks = $this->buildChunks($operation->requestData, $keys, $chunkSize);
      $offset = $operation->metadata['chunk_offset'] ?? 0;
      $results = [];
      foreach ($chunks as $index => $chunk) {
         if ($index < $offset) {
            continue;
         }
         $chunkData = array_merge(
         // Non-chunked data
            array_diff_key($operation->requestData, array_flip($keys)),
            // This chunk's data
            $chunk['data']
         );
         $filterResult = $this->callFilter($operation, $chunkData);
         if (!$filterResult['success']) {
            // Record failed items but continue
            foreach ($chunk['data'] as $key => $items) {
               foreach ($items as $item) {
                  $progress->failItem($item, $filterResult['message'] ?? 'Chunk failed');
               }
            }
         }
         $progress->advance($chunk['count']);
         $operation->metadata['chunk_offset'] = $index + 1;
         if (isset($filterResult['result'])) {
            $results = array_merge($results, (array) $filterResult['result']);
         }
         // Save progress after each chunk
         $this->storage->save($operation);
      }
      $outcome = 'success';
      if ($operation->failedItems) {
         $outcome = count($operation->failedItems) === $operation->totalItems ? 'failed' : 'partial';
      }
      return new Result(
         outcome: $outcome,
         result: [
            'processed' => $operation->processedItems,
            'failed'    => $operation->failedItems ? count($operation->failedItems) : 0,
            'chunks'    => count($chunks),
            'results'   => $results,
         ]
      );
   }
   private function callFilter(Operation $operation, array $data): array
   {
      $filterResult = apply_filters(
@@ -123,32 +49,4 @@
      return $filterResult;
   }
   private function buildChunks(array $data, array $keys, int $chunkSize): array
   {
      // Collect all items across keys
      $allItems = [];
      foreach ($keys as $key) {
         if (isset($data[$key]) && is_array($data[$key])) {
            foreach ($data[$key] as $i => $item) {
               $allItems[] = ['key' => $key, 'index' => $i, 'item' => $item];
            }
         }
      }
      // Split into chunks
      $chunks = [];
      foreach (array_chunk($allItems, $chunkSize) as $chunkItems) {
         $chunkData = [];
         foreach ($chunkItems as $entry) {
            $chunkData[$entry['key']][] = $entry['item'];
         }
         $chunks[] = [
            'data'  => $chunkData,
            'count' => count($chunkItems),
         ];
      }
      return $chunks;
   }
}