From 75a097a018a0090f5902758353c578fce4aa2a25 Mon Sep 17 00:00:00 2001
From: Jake Vanderwerf <get@jakevanderwerf.ca>
Date: Sat, 23 May 2026 18:43:42 +0000
Subject: [PATCH] =CustomBlocks.php overhaul relatively complete. Also refactored the gallery in gallery.min.js and the jvbRenderGallery.
---
inc/managers/queue/Processor.php | 133 +++++++++++++++++++++-----------------------
1 files changed, 64 insertions(+), 69 deletions(-)
diff --git a/inc/managers/queue/Processor.php b/inc/managers/queue/Processor.php
index d743f77..f54fadd 100644
--- a/inc/managers/queue/Processor.php
+++ b/inc/managers/queue/Processor.php
@@ -19,53 +19,43 @@
return;
}
- $ops = $this->storage->fetchRunnable(3);
-
- $lastOpId = null;
- foreach ($ops as $op) {
- if (!$this->storage->markProcessing($op->id)) {
- continue;
+ $op = null;
+ $this->storage->withTransaction(function() use (&$op) {
+ $candidates = $this->storage->fetchRunnable();
+ foreach ($candidates as $candidate) {
+ if ($candidate->state === 'completed') continue;
+ if (!$this->dependenciesSatisfied($candidate)) continue;
+ if ($this->storage->markProcessing($candidate->id)) {
+ $op = $candidate;
+ break;
+ }
}
- $lastOpId = $op->id;
- $this->processOne($op);
- usleep(10000);
- }
+ });
+
+ if (!$op) return;
+
+ $this->processOne($op);
+ usleep(10000);
$this->storage->invalidateQueueCache();
}
+
private function processOne(Operation $op): void
{
$progress = new Progress($op);
-
$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
$op->startedAt = current_time('mysql');
$op->state = 'processing';
-
$this->storage->saveProgress($op);
- //Check to see if we can merge first
- $mergeable = $this->registry->getMergeable($op->type);
-
- if ($mergeable) {
- $existing = $this->storage->findMergeable(
- $op->type,
- $op->userId
- );
-
- if ($existing && $mergeable->canMerge($existing, $op)) {
- $this->applyMerge($mergeable, $existing, $op);
- return;
- }
- }
-
try {
- // Check if this operation should be chunked
$chunkKey = $op->metadata['chunk_key'] ?? null;
+ // No transaction wrapping — executor handles its own
$result = $chunkKey
? $this->executeChunked($op, $executor, $progress, $chunkKey)
- : $this->storage->withTransaction(fn() => $executor->execute($op, $progress));
+ : $executor->execute($op, $progress);
$op->state = 'completed';
$op->outcome = $result->outcome;
@@ -77,7 +67,16 @@
$this->handleFailure($op, $e);
}
- $this->storage->saveFinal($op);
+ $this->saveOperation($op);
+ }
+ private function saveOperation(Operation $op): void
+ {
+ if ($op->state === 'completed') {
+ $this->storage->saveFinal($op);
+ } else {
+ // Retryable failure — save as scheduled/failed without requiring 'completed' state
+ $this->storage->save($op);
+ }
}
private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
@@ -115,23 +114,20 @@
}
try {
- $chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) {
- // Clone operation with only this chunk's data
- $chunkOp = clone $op;
- $chunkOp->requestData = array_merge(
- array_diff_key($op->requestData, array_flip($keys)),
- $chunk['data']
- );
+ $chunkOp = clone $op;
+ $chunkOp->requestData = array_merge(
+ array_diff_key($op->requestData, array_flip($keys)),
+ $chunk['data']
+ );
- // Execute this chunk
+ $executeChunk = function () use ($op, $executor, $progress, $chunkOp, $index) {
$result = $executor->execute($chunkOp, $progress);
-
- // Update progress
$op->metadata['chunk_offset'] = $index + 1;
$this->storage->saveProgress($op);
-
return $result;
- });
+ };
+
+ $chunkResult = $executeChunk();
// Merge results
if (!empty($chunkResult->result)) {
@@ -242,33 +238,6 @@
return date('Y-m-d H:i:s', time() + $delay + $jitter);
}
- private function applyMerge(
- Mergeable $mergeable,
- Operation $target,
- Operation $incoming
- ): void {
- // Safety: only merge into actively processing ops
- if ($target->state !== 'processing') {
- return;
- }
-
- $this->storage->withTransaction(function () use ($mergeable, $target, $incoming) {
- $mergeable->merge($target, $incoming);
-
- $target->dependencies[] = $incoming->id;
- $target->dependencies = array_values(array_unique($target->dependencies));
-
- $this->storage->saveProgress($target);
-
- $incoming->state = 'completed';
- $incoming->outcome = 'success';
- $incoming->completedAt = current_time('mysql');
- $incoming->result = ['merged_into' => $target->id];
-
- $this->storage->saveFinal($incoming);
- });
- }
-
private function checkResourceLimits(): bool
{
// Check memory (leave 20% buffer)
@@ -329,4 +298,30 @@
return true;
}
+
+ private function dependenciesSatisfied(Operation $op): bool
+ {
+ if (empty($op->dependencies)) {
+ return true;
+ }
+ foreach ($op->dependencies as $depId) {
+ $dep = $this->storage->find($depId);
+
+ // Missing dependency = block (or decide to ignore; your call)
+ if (!$dep) {
+ return false;
+ }
+
+ if ($dep->state !== 'completed') {
+ return false;
+ }
+
+ if (!in_array($dep->outcome, ['success', 'partial'], true)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
}
--
Gitblit v1.10.0