<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
|
final class Processor
|
{
|
public function __construct(
|
private Storage $storage,
|
private Executor $defaultExecutor,
|
private TypeRegistry $registry,
|
) {}
|
|
public function run(): void
|
{
|
if (!$this->hasAdequateResources()) {
|
error_log('[Processor] Insufficient resources to start processing');
|
return;
|
}
|
|
$ops = $this->storage->fetchRunnable(3);
|
|
foreach ($ops as $op) {
|
if (!$this->dependenciesSatisfied($op)) {
|
continue;
|
}
|
if (!$this->storage->markProcessing($op->id)) {
|
continue;
|
}
|
$this->processOne($op);
|
usleep(10000);
|
}
|
|
$this->storage->invalidateQueueCache();
|
}
|
|
private function processOne(Operation $op): void
|
{
|
$progress = new Progress($op);
|
|
$executor = $this->registry->getExecutor($op->type) ?? $this->defaultExecutor;
|
$op->startedAt = current_time('mysql');
|
$op->state = 'processing';
|
|
$this->storage->saveProgress($op);
|
|
//Check to see if we can merge first
|
$mergeable = $this->registry->getMergeable($op->type);
|
|
if ($mergeable) {
|
$existing = $this->storage->findMergeable(
|
$op->type,
|
$op->userId
|
);
|
|
if ($existing && $mergeable->canMerge($existing, $op)) {
|
$this->applyMerge($mergeable, $existing, $op);
|
return;
|
}
|
}
|
|
try {
|
// Check if this operation should be chunked
|
$chunkKey = $op->metadata['chunk_key'] ?? null;
|
|
$result = $chunkKey
|
? $this->executeChunked($op, $executor, $progress, $chunkKey)
|
: $this->storage->withTransaction(fn() => $executor->execute($op, $progress));
|
|
$op->state = 'completed';
|
$op->outcome = $result->outcome;
|
$op->result = $result->result;
|
$op->completedAt = current_time('mysql');
|
|
} catch (\Throwable $e) {
|
error_log("[Processor] Exception caught: " . $e->getMessage());
|
$this->handleFailure($op, $e);
|
}
|
|
$this->storage->saveFinal($op);
|
}
|
|
private function executeChunked(Operation $op, Executor $executor, Progress $progress, string|array $chunkKey): Result
|
{
|
$keys = (array) $chunkKey;
|
$chunkSize = $op->metadata['chunk_size'] ?? 10;
|
$offset = $op->metadata['chunk_offset'] ?? 0;
|
$chunks = $this->buildChunks($op->requestData, $keys, $chunkSize);
|
$totalChunks = count($chunks);
|
|
$allResults = [];
|
|
foreach ($chunks as $index => $chunk) {
|
if ($index < $offset) {
|
continue;
|
}
|
|
// Resource check before each chunk
|
if (!$this->checkResourceLimits()) {
|
error_log("[Processor] Resource limits reached, pausing at chunk {$index}");
|
|
$op->metadata['chunk_offset'] = $index;
|
$op->state = 'scheduled';
|
$op->scheduledAt = date('Y-m-d H:i:s', time() + 5);
|
$this->storage->save($op);
|
|
return new Result(
|
outcome: 'pending',
|
result: [
|
'paused_at_chunk' => $index,
|
'total_chunks' => $totalChunks,
|
'partial_results' => $allResults,
|
]
|
);
|
}
|
|
try {
|
$chunkResult = $this->storage->withTransaction(function () use ($op, $executor, $progress, $chunk, $keys, $index) {
|
// Clone operation with only this chunk's data
|
$chunkOp = clone $op;
|
$chunkOp->requestData = array_merge(
|
array_diff_key($op->requestData, array_flip($keys)),
|
$chunk['data']
|
);
|
|
// Execute this chunk
|
$result = $executor->execute($chunkOp, $progress);
|
|
// Update progress
|
$op->metadata['chunk_offset'] = $index + 1;
|
$this->storage->saveProgress($op);
|
|
return $result;
|
});
|
|
// Merge results
|
if (!empty($chunkResult->result)) {
|
if (is_array($chunkResult->result)) {
|
$allResults = array_merge_recursive($allResults, $chunkResult->result);
|
} else {
|
$allResults = $chunkResult->result;
|
}
|
}
|
|
} catch (\Throwable $e) {
|
error_log("[Processor] Chunk {$index} failed: " . $e->getMessage());
|
|
// Record failed items from this chunk
|
foreach ($chunk['data'] as $key => $items) {
|
foreach ($items as $item) {
|
$progress->failItem($item, $e->getMessage());
|
}
|
}
|
|
// Continue to next chunk rather than failing entire operation
|
// Remove this if you want fail-fast behavior
|
}
|
|
// Delay between chunks (skip after last chunk)
|
if ($index < $totalChunks - 1) {
|
usleep(50000); // 50ms
|
}
|
}
|
|
// Determine final outcome
|
$outcome = 'success';
|
if (!empty($op->failedItems)) {
|
$failedCount = count($op->failedItems);
|
$outcome = $failedCount === $op->totalItems ? 'failed' : 'partial';
|
}
|
|
return new Result(
|
outcome: $outcome,
|
result: $allResults
|
);
|
}
|
|
private function buildChunks(array $data, array $keys, int $chunkSize): array
|
{
|
$chunks = [];
|
|
foreach ($keys as $key) {
|
if (!isset($data[$key]) || !is_array($data[$key])) {
|
continue;
|
}
|
|
$items = $data[$key];
|
$itemChunks = array_chunk($items, $chunkSize, true); // preserve keys
|
|
foreach ($itemChunks as $index => $chunkItems) {
|
if (!isset($chunks[$index])) {
|
$chunks[$index] = [
|
'data' => [],
|
'count' => 0
|
];
|
}
|
|
$chunks[$index]['data'][$key] = $chunkItems;
|
$chunks[$index]['count'] += count($chunkItems);
|
}
|
}
|
|
return $chunks;
|
}
|
|
private function handleFailure(Operation $op, \Throwable $e): void
|
{
|
$hash = md5($e->getMessage());
|
|
if ($op->lastErrorHash === $hash) {
|
// Same error twice → permanent failure
|
$op->state = 'completed';
|
$op->outcome = 'failed_permanent';
|
$op->completedAt = current_time('mysql');
|
} else {
|
// New error → schedule retry
|
$op->retries++;
|
$op->lastErrorHash = $hash;
|
$op->state = 'scheduled';
|
$op->scheduledAt = $this->calculateBackoff($op->retries);
|
}
|
|
$op->errorMessage = $e->getMessage();
|
|
JVB()->error()->log(
|
'[Queue]:processOne',
|
$e->getMessage(),
|
[
|
'operation_id' => $op->id,
|
'type' => $op->type,
|
'user_id' => $op->userId,
|
'retries' => $op->retries,
|
],
|
$op->outcome === 'failed_permanent' ? 'critical' : 'warning'
|
);
|
}
|
|
private function calculateBackoff(int $attempt): string
|
{
|
$delay = min(30 * pow(2, $attempt - 1), 3600);
|
$jitter = rand(0, (int)($delay * 0.1));
|
return date('Y-m-d H:i:s', time() + $delay + $jitter);
|
}
|
|
private function applyMerge(
|
Mergeable $mergeable,
|
Operation $target,
|
Operation $incoming
|
): void {
|
// Safety: only merge into actively processing ops
|
if ($target->state !== 'processing') {
|
return;
|
}
|
|
$this->storage->withTransaction(function () use ($mergeable, $target, $incoming) {
|
$mergeable->merge($target, $incoming);
|
|
$target->dependencies[] = $incoming->id;
|
$target->dependencies = array_values(array_unique($target->dependencies));
|
|
$this->storage->saveProgress($target);
|
|
$incoming->state = 'completed';
|
$incoming->outcome = 'success';
|
$incoming->completedAt = current_time('mysql');
|
$incoming->result = ['merged_into' => $target->id];
|
|
$this->storage->saveFinal($incoming);
|
});
|
}
|
|
private function checkResourceLimits(): bool
|
{
|
// Check memory (leave 20% buffer)
|
$memoryLimit = $this->getMemoryLimitBytes();
|
$memoryUsage = memory_get_usage(true);
|
if ($memoryUsage > $memoryLimit * 0.8) {
|
error_log('[Processor] Memory limit approaching, pausing');
|
return false;
|
}
|
|
// Check time (leave 30s buffer for cleanup)
|
$maxTime = (int) ini_get('max_execution_time');
|
if ($maxTime > 0) {
|
$elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
|
if ($elapsed > $maxTime - 30) {
|
error_log('[Processor] Time limit approaching, pausing');
|
return false;
|
}
|
}
|
|
return true;
|
}
|
|
private function getMemoryLimitBytes(): int
|
{
|
$limit = ini_get('memory_limit');
|
if ($limit === '-1') {
|
return PHP_INT_MAX;
|
}
|
|
$unit = strtolower(substr($limit, -1));
|
$value = (int) $limit;
|
|
return match($unit) {
|
'g' => $value * 1024 * 1024 * 1024,
|
'm' => $value * 1024 * 1024,
|
'k' => $value * 1024,
|
default => $value,
|
};
|
}
|
|
private function hasAdequateResources(): bool
|
{
|
// Stricter thresholds for starting (50% memory, 60s minimum time)
|
$memoryLimit = $this->getMemoryLimitBytes();
|
$memoryUsage = memory_get_usage(true);
|
if ($memoryUsage > $memoryLimit * 0.5) {
|
return false;
|
}
|
|
$maxTime = (int) ini_get('max_execution_time');
|
if ($maxTime > 0) {
|
$elapsed = microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'];
|
if ($maxTime - $elapsed < 60) { // Need at least 60s
|
return false;
|
}
|
}
|
|
return true;
|
}
|
|
private function dependenciesSatisfied(Operation $op): bool
|
{
|
if (empty($op->dependencies)) {
|
return true;
|
}
|
|
foreach ($op->dependencies as $depId) {
|
$dep = $this->storage->find($depId);
|
|
// Missing dependency = block (or decide to ignore; your call)
|
if (!$dep) {
|
return false;
|
}
|
|
if ($dep->state !== 'completed') {
|
return false;
|
}
|
|
if (!in_array($dep->outcome, ['success', 'partial'], true)) {
|
return false;
|
}
|
}
|
|
return true;
|
}
|
|
}
|