<?php
|
namespace JVBase\managers\queue;
|
if (!defined('ABSPATH')) {
|
exit;
|
}
|
final class FilteredExecutor implements Executor
|
{
|
public function __construct(
|
private Storage $storage
|
) {}
|
|
public function execute(Operation $operation, Progress $progress): Result
|
{
|
$chunkKey = $operation->metadata['chunk_key'] ?? null;
|
|
// No chunking — process entire request at once
|
if (!$chunkKey) {
|
return $this->processSingle($operation, $progress);
|
}
|
|
// Chunked processing
|
return $this->processChunked($operation, $progress, $chunkKey);
|
}
|
|
private function processSingle(Operation $operation, Progress $progress): Result
|
{
|
$filterResult = $this->callFilter($operation, $operation->requestData);
|
|
$progress->advance(1);
|
|
return new Result(
|
outcome: $filterResult['success'] ? 'success' : 'failed',
|
result: $filterResult['result'] ?? null
|
);
|
}
|
|
private function processChunked(Operation $operation, Progress $progress, string|array $chunkKey): Result
|
{
|
$keys = (array) $chunkKey;
|
$chunkSize = $operation->metadata['chunk_size'] ?? 10;
|
$chunks = $this->buildChunks($operation->requestData, $keys, $chunkSize);
|
$offset = $operation->metadata['chunk_offset'] ?? 0;
|
$results = [];
|
|
foreach ($chunks as $index => $chunk) {
|
if ($index < $offset) {
|
continue;
|
}
|
|
$chunkData = array_merge(
|
// Non-chunked data
|
array_diff_key($operation->requestData, array_flip($keys)),
|
// This chunk's data
|
$chunk['data']
|
);
|
|
$filterResult = $this->callFilter($operation, $chunkData);
|
|
if (!$filterResult['success']) {
|
// Record failed items but continue
|
foreach ($chunk['data'] as $key => $items) {
|
foreach ($items as $item) {
|
$progress->failItem($item, $filterResult['message'] ?? 'Chunk failed');
|
}
|
}
|
}
|
|
$progress->advance($chunk['count']);
|
$operation->metadata['chunk_offset'] = $index + 1;
|
|
if (isset($filterResult['result'])) {
|
$results = array_merge($results, (array) $filterResult['result']);
|
}
|
|
// Save progress after each chunk
|
$this->storage->save($operation);
|
}
|
|
$outcome = 'success';
|
if ($operation->failedItems) {
|
$outcome = count($operation->failedItems) === $operation->totalItems ? 'failed' : 'partial';
|
}
|
|
return new Result(
|
outcome: $outcome,
|
result: [
|
'processed' => $operation->processedItems,
|
'failed' => $operation->failedItems ? count($operation->failedItems) : 0,
|
'chunks' => count($chunks),
|
'results' => $results,
|
]
|
);
|
}
|
|
private function callFilter(Operation $operation, array $data): array
|
{
|
$filterResult = apply_filters(
|
BASE . 'handle_bulk_operation',
|
['success' => false, 'message' => 'No handler for: ' . $operation->type],
|
(object) [
|
'id' => $operation->id,
|
'type' => $operation->type,
|
'user_id' => $operation->userId,
|
],
|
$data
|
);
|
|
// Normalize WP_Error
|
if (is_wp_error($filterResult)) {
|
return [
|
'success' => false,
|
'message' => $filterResult->get_error_message(),
|
];
|
}
|
|
// Ensure expected format
|
if (!is_array($filterResult) || !isset($filterResult['success'])) {
|
return [
|
'success' => false,
|
'message' => 'Invalid handler response',
|
];
|
}
|
|
return $filterResult;
|
}
|
|
private function buildChunks(array $data, array $keys, int $chunkSize): array
|
{
|
// Collect all items across keys
|
$allItems = [];
|
foreach ($keys as $key) {
|
if (isset($data[$key]) && is_array($data[$key])) {
|
foreach ($data[$key] as $i => $item) {
|
$allItems[] = ['key' => $key, 'index' => $i, 'item' => $item];
|
}
|
}
|
}
|
|
// Split into chunks
|
$chunks = [];
|
foreach (array_chunk($allItems, $chunkSize) as $chunkItems) {
|
$chunkData = [];
|
foreach ($chunkItems as $entry) {
|
$chunkData[$entry['key']][] = $entry['item'];
|
}
|
$chunks[] = [
|
'data' => $chunkData,
|
'count' => count($chunkItems),
|
];
|
}
|
|
return $chunks;
|
}
|
}
|