Jake Vanderwerf
2026-02-14 27fb820ae9081fb56957cf75e79eccd8a99edd52
inc/managers/queue/Processor.php
@@ -19,9 +19,14 @@
         return;
      }
      $ops = $this->storage->fetchRunnable(3);
      $ops = $this->storage->fetchRunnable();
      if (empty($ops)) {
         return;
      }
      foreach ($ops as $op) {
         if ($op->state === 'completed') {
            return;
         }
         if (!$this->dependenciesSatisfied($op)) {
            continue;
         }
@@ -45,28 +50,13 @@
      $this->storage->saveProgress($op);
      //Check to see if we can merge first
      $mergeable = $this->registry->getMergeable($op->type);
      if ($mergeable) {
         $existing = $this->storage->findMergeable(
            $op->type,
            $op->userId
         );
         if ($existing && $mergeable->canMerge($existing, $op)) {
            $this->applyMerge($mergeable, $existing, $op);
            return;
         }
      }
      try {
         // Check if this operation should be chunked
         $chunkKey = $op->metadata['chunk_key'] ?? null;
         // No transaction wrapping — executor handles its own
         $result = $chunkKey
            ? $this->executeChunked($op, $executor, $progress, $chunkKey)
            : $this->storage->withTransaction(fn() => $executor->execute($op, $progress));
            : $executor->execute($op, $progress);
         $op->state = 'completed';
         $op->outcome = $result->outcome;
@@ -78,7 +68,16 @@
         $this->handleFailure($op, $e);
      }
      $this->storage->saveFinal($op);
      $this->saveOperation($op);
   }
   private function saveOperation(Operation $op): void
   {
      if ($op->state === 'completed') {
         $this->storage->saveFinal($op);
      } else {
         // Retryable failure — save as scheduled/failed without requiring 'completed' state
         $this->storage->save($op);
      }
   }
   private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
@@ -116,23 +115,20 @@
         }
         try {
            $chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) {
               // Clone operation with only this chunk's data
               $chunkOp = clone $op;
               $chunkOp->requestData = array_merge(
                  array_diff_key($op->requestData, array_flip($keys)),
                  $chunk['data']
               );
            $chunkOp = clone $op;
            $chunkOp->requestData = array_merge(
               array_diff_key($op->requestData, array_flip($keys)),
               $chunk['data']
            );
               // Execute this chunk
            $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
               $result = $executor->execute($chunkOp, $progress);
               // Update progress
               $op->metadata['chunk_offset'] = $index + 1;
               $this->storage->saveProgress($op);
               return $result;
            });
            };
            $chunkResult = $executeChunk();
            // Merge results
            if (!empty($chunkResult->result)) {
@@ -243,33 +239,6 @@
      return date('Y-m-d H:i:s', time() + $delay + $jitter);
   }
   private function applyMerge(
      Mergeable $mergeable,
      Operation $target,
      Operation $incoming
   ): void {
      // Safety: only merge into actively processing ops
      if ($target->state !== 'processing') {
         return;
      }
      $this->storage->withTransaction(function () use ($mergeable, $target, $incoming) {
         $mergeable->merge($target, $incoming);
         $target->dependencies[] = $incoming->id;
         $target->dependencies = array_values(array_unique($target->dependencies));
         $this->storage->saveProgress($target);
         $incoming->state = 'completed';
         $incoming->outcome = 'success';
         $incoming->completedAt = current_time('mysql');
         $incoming->result = ['merged_into' => $target->id];
         $this->storage->saveFinal($incoming);
      });
   }
   private function checkResourceLimits(): bool
   {
      // Check memory (leave 20% buffer)
@@ -336,7 +305,6 @@
      if (empty($op->dependencies)) {
         return true;
      }
      foreach ($op->dependencies as $depId) {
         $dep = $this->storage->find($depId);