From 3b83905603d44b1a08f8b2b36a605808ce686ad6 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Tue, 02 Jun 2026 00:46:48 +0000
Subject: [PATCH] =double checking schema outputs for legacytattooremoval
---
inc/managers/queue/Processor.php | 260 +++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 248 insertions(+), 12 deletions(-)
diff --git a/inc/managers/queue/Processor.php b/inc/managers/queue/Processor.php
index 0f02650..f54fadd 100644
--- a/inc/managers/queue/Processor.php
+++ b/inc/managers/queue/Processor.php
@@ -10,33 +10,52 @@
private Storage $storage,
private Executor $defaultExecutor,
private TypeRegistry $registry,
- private Locker $locker
) {}
public function run(): void
{
- $this->locker->withLock(function () {
- $ops = $this->storage->fetchRunnable(10);
+ if (!$this->hasAdequateResources()) {
+ error_log('[Processor] Insufficient resources to start processing');
+ return;
+ }
- foreach ($ops as $op) {
- if (!$this->storage->markProcessing($op->id)) {
- continue;
+ $op = null;
+ $this->storage->withTransaction(function() use (&$op) {
+ $candidates = $this->storage->fetchRunnable();
+ foreach ($candidates as $candidate) {
+ if ($candidate->state === 'completed') continue;
+ if (!$this->dependenciesSatisfied($candidate)) continue;
+ if ($this->storage->markProcessing($candidate->id)) {
+ $op = $candidate;
+ break;
}
-
- $this->processOne($op);
}
-
- $this->storage->invalidateQueueCache();
});
+
+ if (!$op) return;
+
+ $this->processOne($op);
+ usleep(10000);
+
+ $this->storage->invalidateQueueCache();
}
+
private function processOne(Operation $op): void
{
$progress = new Progress($op);
$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
+ $op->startedAt = current_time('mysql');
+ $op->state = 'processing';
+ $this->storage->saveProgress($op);
try {
- $result = $executor->execute($op, $progress);
+ $chunkKey = $op->metadata['chunk_key'] ?? null;
+
+ // No transaction wrapping — executor handles its own
+ $result = $chunkKey
+ ? $this->executeChunked($op, $executor, $progress, $chunkKey)
+ : $executor->execute($op, $progress);
$op->state = 'completed';
$op->outcome = $result->outcome;
@@ -44,10 +63,140 @@
$op->completedAt = current_time('mysql');
} catch (\Throwable $e) {
+ error_log("[Processor] Exception caught: " . $e->getMessage());
$this->handleFailure($op, $e);
}
- $this->storage->save($op);
+ $this->saveOperation($op);
+ }
+ private function saveOperation(Operation $op): void
+ {
+ if ($op->state === 'completed') {
+ $this->storage->saveFinal($op);
+ } else {
+ // Retryable failure — save as scheduled/failed without requiring 'completed' state
+ $this->storage->save($op);
+ }
+ }
+
+ private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
+ {
+ $keys = (array) $chunkKey;
+ $chunkSize = $op->metadata['chunk_size'] ?? 10;
+ $offset = $op->metadata['chunk_offset'] ?? 0;
+ $chunks = $this->buildChunks($op->requestData, $keys, $chunkSize);
+ $totalChunks = count($chunks);
+
+ $allResults = [];
+
+ foreach ($chunks as $index => $chunk) {
+ if ($index < $offset) {
+ continue;
+ }
+
+ // Resource check before each chunk
+ if (!$this->checkResourceLimits()) {
+ error_log("[Processor] Resource limits reached, pausing at chunk {$index}");
+
+ $op->metadata['chunk_offset'] = $index;
+ $op->state = 'scheduled';
+ $op->scheduledAt = date('Y-m-d H:i:s', time() + 5);
+ $this->storage->save($op);
+
+ return new Result(
+ outcome: 'pending',
+ result: [
+ 'paused_at_chunk' => $index,
+ 'total_chunks' => $totalChunks,
+ 'partial_results' => $allResults,
+ ]
+ );
+ }
+
+ try {
+ $chunkOp = clone $op;
+ $chunkOp->requestData = array_merge(
+ array_diff_key($op->requestData, array_flip($keys)),
+ $chunk['data']
+ );
+
+ $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
+ $result = $executor->execute($chunkOp, $progress);
+ $op->metadata['chunk_offset'] = $index + 1;
+ $this->storage->saveProgress($op);
+ return $result;
+ };
+
+ $chunkResult = $executeChunk();
+
+ // Merge results
+ if (!empty($chunkResult->result)) {
+ if (is_array($chunkResult->result)) {
+ $allResults = array_merge_recursive($allResults, $chunkResult->result);
+ } else {
+ $allResults = $chunkResult->result;
+ }
+ }
+
+ } catch (\Throwable $e) {
+ error_log("[Processor] Chunk {$index} failed: " . $e->getMessage());
+
+ // Record failed items from this chunk
+ foreach ($chunk['data'] as $key => $items) {
+ foreach ($items as $item) {
+ $progress->failItem($item, $e->getMessage());
+ }
+ }
+
+ // Continue to next chunk rather than failing entire operation
+ // Remove this if you want fail-fast behavior
+ }
+
+ // Delay between chunks (skip after last chunk)
+ if ($index < $totalChunks - 1) {
+ usleep(50000); // 50ms
+ }
+ }
+
+ // Determine final outcome
+ $outcome = 'success';
+ if (!empty($op->failedItems)) {
+ $failedCount = count($op->failedItems);
+ $outcome = $failedCount === $op->totalItems ? 'failed' : 'partial';
+ }
+
+ return new Result(
+ outcome: $outcome,
+ result: $allResults
+ );
+ }
+
+ private function buildChunks(array $data, array $keys, int $chunkSize): array
+ {
+ $chunks = [];
+
+ foreach ($keys as $key) {
+ if (!isset($data[$key]) || !is_array($data[$key])) {
+ continue;
+ }
+
+ $items = $data[$key];
+ $itemChunks = array_chunk($items, $chunkSize, true); // preserve keys
+
+ foreach ($itemChunks as $index => $chunkItems) {
+ if (!isset($chunks[$index])) {
+ $chunks[$index] = [
+ 'data' => [],
+ 'count' => 0
+ ];
+ }
+
+ $chunks[$index]['data'][$key] = $chunkItems;
+ $chunks[$index]['count'] += count($chunkItems);
+ }
+ }
+
+ return $chunks;
}
private function handleFailure(Operation $op, \Throwable $e): void
@@ -88,4 +237,91 @@
$jitter = rand(0, (int)($delay * 0.1));
return date('Y-m-d H:i:s', time() + $delay + $jitter);
}
+
+ private function checkResourceLimits(): bool
+ {
+ // Check memory (leave 20% buffer)
+ $memoryLimit = $this->getMemoryLimitBytes();
+ $memoryUsage = memory_get_usage(true);
+ if ($memoryUsage > $memoryLimit * 0.8) {
+ error_log('[Processor] Memory limit approaching, pausing');
+ return false;
+ }
+
+ // Check time (leave 30s buffer for cleanup)
+ $maxTime = (int) ini_get('max_execution_time');
+ if ($maxTime > 0) {
+ $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
+ if ($elapsed > $maxTime - 30) {
+ error_log('[Processor] Time limit approaching, pausing');
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private function getMemoryLimitBytes(): int
+ {
+ $limit = ini_get('memory_limit');
+ if ($limit === '-1') {
+ return PHP_INT_MAX;
+ }
+
+ $unit = strtolower(substr($limit, -1));
+ $value = (int) $limit;
+
+ return match($unit) {
+ 'g' => $value * 1024 * 1024 * 1024,
+ 'm' => $value * 1024 * 1024,
+ 'k' => $value * 1024,
+ default => $value,
+ };
+ }
+
+ private function hasAdequateResources(): bool
+ {
+ // Stricter thresholds for starting (50% memory, 60s minimum time)
+ $memoryLimit = $this->getMemoryLimitBytes();
+ $memoryUsage = memory_get_usage(true);
+ if ($memoryUsage > $memoryLimit * 0.5) {
+ return false;
+ }
+
+ $maxTime = (int) ini_get('max_execution_time');
+ if ($maxTime > 0) {
+ $elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
+ if ($maxTime - $elapsed < 60) { // Need at least 60s
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private function dependenciesSatisfied(Operation $op): bool
+ {
+ if (empty($op->dependencies)) {
+ return true;
+ }
+ foreach ($op->dependencies as $depId) {
+ $dep = $this->storage->find($depId);
+
+ // Missing dependency = block (or decide to ignore; your call)
+ if (!$dep) {
+ return false;
+ }
+
+ if ($dep->state !== 'completed') {
+ return false;
+ }
+
+ if (!in_array($dep->outcome, ['success', 'partial'], true)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
}
--
Gitblit v1.10.0