Jake Vanderwerf
5 days ago a9b3b28d001941921aa70d37fdc87c758a163a44
inc/managers/queue/Processor.php
@@ -10,33 +10,52 @@
      private Storage $storage,
      private Executor $defaultExecutor,
      private TypeRegistry $registry,
      private Locker $locker
   ) {}
   public function run(): void
   {
      $this->locker->withLock(function () {
         $ops = $this->storage->fetchRunnable(10);
      if (!$this->hasAdequateResources()) {
         error_log('[Processor] Insufficient resources to start processing');
         return;
      }
         foreach ($ops as $op) {
            if (!$this->storage->markProcessing($op->id)) {
               continue;
      $op = null;
      $this->storage->withTransaction(function() use (&$op) {
         $candidates = $this->storage->fetchRunnable();
         foreach ($candidates as $candidate) {
            if ($candidate->state === 'completed') continue;
            if (!$this->dependenciesSatisfied($candidate)) continue;
            if ($this->storage->markProcessing($candidate->id)) {
               $op = $candidate;
               break;
            }
            $this->processOne($op);
         }
         $this->storage->invalidateQueueCache();
      });
      if (!$op) return;
      $this->processOne($op);
      usleep(10000);
      $this->storage->invalidateQueueCache();
   }
   private function processOne(Operation $op): void
   {
      $progress = new Progress($op);
      $executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
      $op->startedAt = current_time('mysql');
      $op->state = 'processing';
      $this->storage->saveProgress($op);
      try {
         $result = $executor->execute($op, $progress);
         $chunkKey = $op->metadata['chunk_key'] ?? null;
         // No transaction wrapping — executor handles its own
         $result = $chunkKey
            ? $this->executeChunked($op, $executor, $progress, $chunkKey)
            : $executor->execute($op, $progress);
         $op->state = 'completed';
         $op->outcome = $result->outcome;
@@ -44,10 +63,140 @@
         $op->completedAt = current_time('mysql');
      } catch (\Throwable $e) {
         error_log("[Processor] Exception caught: " . $e->getMessage());
         $this->handleFailure($op, $e);
      }
      $this->storage->save($op);
      $this->saveOperation($op);
   }
   private function saveOperation(Operation $op): void
   {
      if ($op->state === 'completed') {
         $this->storage->saveFinal($op);
      } else {
         // Retryable failure — save as scheduled/failed without requiring 'completed' state
         $this->storage->save($op);
      }
   }
   private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
   {
      $keys = (array) $chunkKey;
      $chunkSize = $op->metadata['chunk_size'] ?? 10;
      $offset = $op->metadata['chunk_offset'] ?? 0;
      $chunks = $this->buildChunks($op->requestData, $keys, $chunkSize);
      $totalChunks = count($chunks);
      $allResults = [];
      foreach ($chunks as $index => $chunk) {
         if ($index < $offset) {
            continue;
         }
         // Resource check before each chunk
         if (!$this->checkResourceLimits()) {
            error_log("[Processor] Resource limits reached, pausing at chunk {$index}");
            $op->metadata['chunk_offset'] = $index;
            $op->state = 'scheduled';
            $op->scheduledAt = date('Y-m-d H:i:s', time() + 5);
            $this->storage->save($op);
            return new Result(
               outcome: 'pending',
               result: [
                  'paused_at_chunk' => $index,
                  'total_chunks' => $totalChunks,
                  'partial_results' => $allResults,
               ]
            );
         }
         try {
            $chunkOp = clone $op;
            $chunkOp->requestData = array_merge(
               array_diff_key($op->requestData, array_flip($keys)),
               $chunk['data']
            );
            $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
               $result = $executor->execute($chunkOp, $progress);
               $op->metadata['chunk_offset'] = $index + 1;
               $this->storage->saveProgress($op);
               return $result;
            };
            $chunkResult = $executeChunk();
            // Merge results
            if (!empty($chunkResult->result)) {
               if (is_array($chunkResult->result)) {
                  $allResults = array_merge_recursive($allResults, $chunkResult->result);
               } else {
                  $allResults = $chunkResult->result;
               }
            }
         } catch (\Throwable $e) {
            error_log("[Processor] Chunk {$index} failed: " . $e->getMessage());
            // Record failed items from this chunk
            foreach ($chunk['data'] as $key => $items) {
               foreach ($items as $item) {
                  $progress->failItem($item, $e->getMessage());
               }
            }
            // Continue to next chunk rather than failing entire operation
            // Remove this if you want fail-fast behavior
         }
         // Delay between chunks (skip after last chunk)
         if ($index < $totalChunks - 1) {
            usleep(50000); // 50ms
         }
      }
      // Determine final outcome
      $outcome = 'success';
      if (!empty($op->failedItems)) {
         $failedCount = count($op->failedItems);
         $outcome = $failedCount === $op->totalItems ? 'failed' : 'partial';
      }
      return new Result(
         outcome: $outcome,
         result: $allResults
      );
   }
   private function buildChunks(array $data, array $keys, int $chunkSize): array
   {
      $chunks = [];
      foreach ($keys as $key) {
         if (!isset($data[$key]) || !is_array($data[$key])) {
            continue;
         }
         $items = $data[$key];
         $itemChunks = array_chunk($items, $chunkSize, true); // preserve keys
         foreach ($itemChunks as $index => $chunkItems) {
            if (!isset($chunks[$index])) {
               $chunks[$index] = [
                  'data' => [],
                  'count' => 0
               ];
            }
            $chunks[$index]['data'][$key] = $chunkItems;
            $chunks[$index]['count'] += count($chunkItems);
         }
      }
      return $chunks;
   }
   private function handleFailure(Operation $op, \Throwable $e): void
@@ -88,4 +237,91 @@
      $jitter = rand(0, (int)($delay * 0.1));
      return date('Y-m-d H:i:s', time() + $delay + $jitter);
   }
   private function checkResourceLimits(): bool
   {
      // Check memory (leave 20% buffer)
      $memoryLimit = $this->getMemoryLimitBytes();
      $memoryUsage = memory_get_usage(true);
      if ($memoryUsage > $memoryLimit * 0.8) {
         error_log('[Processor] Memory limit approaching, pausing');
         return false;
      }
      // Check time (leave 30s buffer for cleanup)
      $maxTime = (int) ini_get('max_execution_time');
      if ($maxTime > 0) {
         $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
         if ($elapsed > $maxTime - 30) {
            error_log('[Processor] Time limit approaching, pausing');
            return false;
         }
      }
      return true;
   }
   private function getMemoryLimitBytes(): int
   {
      $limit = ini_get('memory_limit');
      if ($limit === '-1') {
         return PHP_INT_MAX;
      }
      $unit = strtolower(substr($limit, -1));
      $value = (int) $limit;
      return match($unit) {
         'g' => $value * 1024 * 1024 * 1024,
         'm' => $value * 1024 * 1024,
         'k' => $value * 1024,
         default => $value,
      };
   }
   private function hasAdequateResources(): bool
   {
      // Stricter thresholds for starting (50% memory, 60s minimum time)
      $memoryLimit = $this->getMemoryLimitBytes();
      $memoryUsage = memory_get_usage(true);
      if ($memoryUsage > $memoryLimit * 0.5) {
         return false;
      }
      $maxTime = (int) ini_get('max_execution_time');
      if ($maxTime > 0) {
         $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
         if ($maxTime - $elapsed < 60) { // Need at least 60s
            return false;
         }
      }
      return true;
   }
   private function dependenciesSatisfied(Operation $op): bool
   {
      if (empty($op->dependencies)) {
         return true;
      }
      foreach ($op->dependencies as $depId) {
         $dep = $this->storage->find($depId);
         // Missing dependency = block (or decide to ignore; your call)
         if (!$dep) {
            return false;
         }
         if ($dep->state !== 'completed') {
            return false;
         }
         if (!in_array($dep->outcome, ['success', 'partial'], true)) {
            return false;
         }
      }
      return true;
   }
}