| | |
| | | } |
| | | |
| | | use JVBase\managers\Cache; |
| | | use JVBase\managers\CustomTable; |
| | | use LogicException; |
| | | |
| | | class Storage |
| | | { |
| | | private \wpdb $wpdb; |
| | | private string $table; |
| | | private CustomTable $table; |
| | | private CustomTable $stats; |
| | | private Cache $cache; |
| | | |
| | | private const CACHE_QUEUE_INFO = 'queue_info'; |
| | | |
| | | public function __construct() |
| | | { |
| | | global $wpdb; |
| | | $this->wpdb = $wpdb; |
| | | $this->table = $wpdb->prefix . BASE . '_operation_queue'; |
| | | $this->cache = Cache::for('queue', DAY_IN_SECONDS); |
| | | $this->defineTables(); |
| | | $this->cache = Cache::for('queue', DAY_IN_SECONDS)->user(); |
| | | } |
| | | |
| | | public function hasProcessingOperations(): bool |
| | | { |
| | | return (bool) $this->wpdb->get_var( |
| | | "SELECT 1 FROM {$this->table} WHERE state = 'processing' LIMIT 1" |
| | | ); |
| | | return (bool) $this->table->queryVar("SELECT 1 FROM {table} WHERE state = 'processing' LIMIT 1"); |
| | | } |
| | | |
| | | public function fetchRunnable(int $offset = 0): array |
| | | { |
| | | $now = current_time('mysql'); |
| | | |
| | | $rows = $this->wpdb->get_results( |
| | | $this->wpdb->prepare(" |
| | | SELECT * |
| | | FROM {$this->table} |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND scheduled_at <= %s |
| | | ORDER BY |
| | | FIELD(priority, 'high', 'normal', 'low'), |
| | | scheduled_at |
| | | LIMIT 10 OFFSET %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", $now, $offset) |
| | | ); |
| | | $rows = $this->table->queryResults(" |
| | | SELECT * FROM {table} |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND scheduled_at <= %s |
| | | ORDER BY FIELD(priority, 'high', 'normal', 'low'), scheduled_at |
| | | LIMIT 10 OFFSET %d |
| | | FOR UPDATE SKIP LOCKED |
| | | ", [$now, $offset]); |
| | | |
| | | $total = count($rows); |
| | | foreach ($rows as $row) { |
| | |
| | | public function markProcessing(string $id): bool |
| | | { |
| | | $now = current_time('mysql'); |
| | | |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET state = 'processing', started_at = %s, updated_at = %s |
| | | WHERE id = %s AND state IN ('pending', 'scheduled') |
| | | ", $now, $now, $id)); |
| | | $affected = $this->table->query(" |
| | | UPDATE {table} |
| | | SET state = 'processing', started_at = %s, updated_at = %s |
| | | WHERE id = %s AND state IN ('pending', 'scheduled') |
| | | ", [$now, $now, $id]); |
| | | |
| | | if ($affected > 0) { |
| | | $op = $this->find($id); |
| | | if ($op) { |
| | | $this->invalidateUser($op->userId); |
| | | } |
| | | if ($op) $this->invalidateUser($op->userId); |
| | | } |
| | | |
| | | return $affected > 0; |
| | | } |
| | | |
| | | public function save(Operation $op): bool |
| | | { |
| | | $data = [ |
| | | $result = $this->table->update([ |
| | | 'request_data' => json_encode($op->requestData), |
| | | 'total_items' => $op->totalItems, |
| | | 'processed_items' => $op->processedItems, |
| | |
| | | 'retries' => $op->retries, |
| | | 'last_error_hash' => $op->lastErrorHash, |
| | | 'error_message' => $op->errorMessage, |
| | | 'scheduled_at' => $op->scheduledAt, |
| | | 'started_at' => $op->startedAt, |
| | | 'completed_at' => $op->completedAt, |
| | | 'metadata' => json_encode($op->metadata), |
| | | 'result' => $op->result ? json_encode($op->result) : null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'merged_into' => $op->merged_into, |
| | | 'user_dismissed' => $op->userDismissed ? 1 : 0, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | ], |
| | | ['id' => $op->id] |
| | | ); |
| | | |
| | | $result = $this->wpdb->update($this->table, $data, ['id' => $op->id]); |
| | | |
| | | if ($result !== false) { |
| | | $this->invalidateUser($op->userId); |
| | | } |
| | | |
| | | if ($result !== false) $this->invalidateUser($op->userId); |
| | | return $result !== false; |
| | | } |
| | | |
| | |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveProgress(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | $table = $this->table; |
| | | |
| | | $data = [ |
| | | public function saveProgress(Operation $op): bool |
| | | { |
| | | $result = $this->table->update([ |
| | | 'processed_items' => $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'metadata' => ($op->metadata) ? json_encode($op->metadata) : null, |
| | | 'result' => ($op->result) ? json_encode($op->result) :null, |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | 'metadata' => $op->metadata ? json_encode($op->metadata) : null, |
| | | 'result' => $op->result ? json_encode($op->result) : null, |
| | | ], ['id' => $op->id, 'state' => 'processing']); // state guard preserved |
| | | |
| | | // IMPORTANT: never touch terminal state |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveProgress] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | |
| | | |
| | | if ($result === false) return false; |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | | |
| | |
| | | * @param Operation $op |
| | | * @return bool |
| | | */ |
| | | public function saveFinal(Operation $op): bool { |
| | | global $wpdb; |
| | | |
| | | if (($op->state?? null) !== 'completed') { |
| | | public function saveFinal(Operation $op): bool |
| | | { |
| | | if ($op->state !== 'completed') { |
| | | throw new LogicException('saveFinal called without completed state'); |
| | | } |
| | | |
| | | $table = $this->table; |
| | | $result = $this->table->update([ |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome ?? 'success', |
| | | 'processed_items' => $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | ], ['id' => $op->id, 'state' => 'processing']); // hard guard preserved |
| | | |
| | | $data = [ |
| | | 'state' => 'completed', |
| | | 'outcome' => $op->outcome?? 'success', |
| | | 'processed_items'=> $op->processedItems ?? 0, |
| | | 'failed_items' => $op->failedItems ? json_encode($op->failedItems) : null, |
| | | 'result' => isset($op->result) ? wp_json_encode($op->result) : null, |
| | | 'completed_at' => $op->completedAt ?? current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]; |
| | | if ($result === 0) return true; // already completed, not an error |
| | | if ($result === false) return false; |
| | | |
| | | // HARD GUARD: cannot overwrite completed |
| | | $where = [ |
| | | 'id' => $op->id, |
| | | 'state' => 'processing', |
| | | ]; |
| | | |
| | | $updated = $wpdb->update($table, $data, $where); |
| | | |
| | | if ($updated === 0) { |
| | | return true; |
| | | } |
| | | |
| | | if ($updated === false) { |
| | | error_log('[Storage::saveFinal] DB error: ' . $wpdb->last_error); |
| | | return false; |
| | | } |
| | | $this->invalidateQueueCache(); |
| | | $this->invalidateUser($op->userId); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | public function insert(Operation $op): bool |
| | | { |
| | | $result = $this->wpdb->insert($this->table, [ |
| | | $result = $this->table->insert([ |
| | | 'id' => $op->id, |
| | | 'type' => $op->type, |
| | | 'user_id' => $op->userId, |
| | |
| | | 'state' => $op->state, |
| | | 'outcome' => $op->outcome, |
| | | 'retries' => 0, |
| | | 'last_error_hash' => null, |
| | | 'error_message' => null, |
| | | 'scheduled_at' => $op->scheduledAt ?? current_time('mysql'), |
| | | 'started_at' => null, |
| | | 'completed_at' => null, |
| | | 'metadata' => json_encode($op->metadata), |
| | | 'result' => null, |
| | | 'dependencies' => json_encode($op->dependencies), |
| | | 'user_dismissed' => 0, |
| | | 'merged_into' => null, |
| | | 'created_at' => current_time('mysql'), |
| | | 'updated_at' => current_time('mysql'), |
| | | ]); |
| | | |
| | | if ($result) { |
| | |
| | | |
| | | public function find(string $id): ?Operation |
| | | { |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare( |
| | | "SELECT * FROM {$this->table} WHERE id = %s", |
| | | $id |
| | | )); |
| | | |
| | | $row = $this->table->get(['id' => $id]); |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | } |
| | | |
| | | public function findMergeable(string $type, int $userId, array $criteria = []): ?Operation |
| | | { |
| | | $sql = "SELECT * FROM {$this->table} |
| | | WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')"; |
| | | $sql = "SELECT * FROM {table} WHERE type = %s AND user_id = %d AND state IN ('pending', 'scheduled')"; |
| | | $params = [$type, $userId]; |
| | | |
| | | foreach ($criteria as $key => $value) { |
| | | if ($value === null) { |
| | | continue; |
| | | } |
| | | if ($value === null) continue; |
| | | $sql .= " AND JSON_UNQUOTE(JSON_EXTRACT(request_data, %s)) = %s"; |
| | | $params[] = '$.' . $key; |
| | | $params[] = (string) $value; |
| | | } |
| | | |
| | | $sql .= " ORDER BY created_at DESC LIMIT 1"; |
| | | |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare($sql, ...$params)); |
| | | |
| | | $this->invalidateUser($userId); |
| | | |
| | | return $row ? $this->rowToOperation($row) : null; |
| | | $rows = $this->table->queryResults($sql, $params); |
| | | return !empty($rows) ? $this->rowToOperation($rows[0]) : null; |
| | | } |
| | | |
| | | public function getUserOperations(int $userId, array $filters = []): array |
| | |
| | | // Order by state priority, then created_at |
| | | $orderBy = $filters['order_by'] ?? "FIELD(state, 'processing', 'pending', 'scheduled', 'completed'), created_at DESC"; |
| | | |
| | | $limit = $filters['limit'] ?? 50; |
| | | $params[] = $limit; |
| | | $params[] = $filters['limit'] ?? 50; |
| | | |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare( |
| | | "SELECT * FROM {$this->table} WHERE " . implode(' AND ', $where) . |
| | | " ORDER BY {$orderBy} LIMIT %d", |
| | | ...$params |
| | | )); |
| | | $rows = $this->table->queryResults( |
| | | "SELECT * FROM {table} WHERE " . implode(' AND ', $where) . " ORDER BY {$orderBy} LIMIT %d", |
| | | $params |
| | | ); |
| | | |
| | | return array_map([$this, 'rowToOperation'], $rows ?: []); |
| | | } |
| | |
| | | public function getQueueInfo(): array |
| | | { |
| | | $cached = $this->cache->get(self::CACHE_QUEUE_INFO); |
| | | if ($cached !== false) { |
| | | return $cached; |
| | | } |
| | | if ($cached !== false) return $cached; |
| | | |
| | | $now = current_time('mysql'); |
| | | $row = $this->wpdb->get_row($this->wpdb->prepare(" |
| | | SELECT |
| | | COUNT(*) as total, |
| | | SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active, |
| | | SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled |
| | | FROM {$this->table} |
| | | WHERE state IN ('pending', 'processing', 'scheduled') |
| | | ", $now)); |
| | | $row = $this->table->queryResults(" |
| | | SELECT COUNT(*) as total, |
| | | SUM(IF(state IN ('pending', 'processing'), 1, 0)) as active, |
| | | SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready_scheduled |
| | | FROM {table} |
| | | WHERE state IN ('pending', 'processing', 'scheduled') |
| | | ", [$now]); |
| | | $row = $row[0] ?? null; |
| | | |
| | | $info = [ |
| | | 'total' => (int) ($row->total ?? 0), |
| | |
| | | public function getQueueStatus(): array |
| | | { |
| | | $now = current_time('mysql'); |
| | | $rows = $this->table->queryResults(" |
| | | SELECT state, COUNT(*) as count, |
| | | SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready |
| | | FROM {table} GROUP BY state |
| | | ", [$now]); |
| | | |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare(" |
| | | SELECT |
| | | state, |
| | | COUNT(*) as count, |
| | | SUM(IF(state = 'scheduled' AND scheduled_at <= %s, 1, 0)) as ready |
| | | FROM {$this->table} |
| | | GROUP BY state |
| | | ", $now), OBJECT_K); |
| | | |
| | | $indexed = array_column($rows, null, 'state'); |
| | | return [ |
| | | 'pending' => (int) ($rows['pending']->count ?? 0), |
| | | 'scheduled' => (int) ($rows['scheduled']->count ?? 0), |
| | | 'scheduled_ready' => (int) ($rows['scheduled']->ready ?? 0), |
| | | 'processing' => (int) ($rows['processing']->count ?? 0), |
| | | 'completed' => (int) ($rows['completed']->count ?? 0), |
| | | 'pending' => (int) ($indexed['pending']->count ?? 0), |
| | | 'scheduled' => (int) ($indexed['scheduled']->count ?? 0), |
| | | 'scheduled_ready' => (int) ($indexed['scheduled']->ready ?? 0), |
| | | 'processing' => (int) ($indexed['processing']->count ?? 0), |
| | | 'completed' => (int) ($indexed['completed']->count ?? 0), |
| | | ]; |
| | | } |
| | | |
| | | public function getUserStats(int $userId): array |
| | | { |
| | | $rows = $this->wpdb->get_results($this->wpdb->prepare(" |
| | | SELECT state, outcome, COUNT(*) as count |
| | | FROM {$this->table} |
| | | WHERE user_id = %d |
| | | GROUP BY state, outcome |
| | | ", $userId)); |
| | | $rows = $this->table->queryResults( |
| | | "SELECT state, outcome, COUNT(*) as count FROM {table} WHERE user_id = %d GROUP BY state, outcome", |
| | | [$userId] |
| | | ); |
| | | |
| | | $stats = [ |
| | | 'pending' => 0, |
| | |
| | | |
| | | public function dismiss(string $id): bool |
| | | { |
| | | $result = $this->wpdb->update( |
| | | $this->table, |
| | | ['user_dismissed' => 1, 'updated_at' => current_time('mysql')], |
| | | ['id' => $id] |
| | | ); |
| | | |
| | | return $result !== false; |
| | | return $this->table->update(['user_dismissed' => 1], ['id' => $id]) !== false; |
| | | } |
| | | |
| | | /** |
| | |
| | | public function delete(string $id): bool |
| | | { |
| | | $op = $this->find($id); |
| | | $userId = $op?->userId; |
| | | |
| | | $result = $this->wpdb->delete($this->table, ['id' => $id]); |
| | | |
| | | if ($result && $userId) { |
| | | $this->invalidateUser($userId); |
| | | } |
| | | |
| | | $result = $this->table->delete(['id' => $id]); |
| | | if ($result && $op) $this->invalidateUser($op->userId); |
| | | return $result !== false; |
| | | } |
| | | |
| | |
| | | { |
| | | Cache::for($userId.'_queue')->flush(); |
| | | } |
| | | public function getLastError(): string |
| | | { |
| | | return $this->wpdb->last_error; |
| | | } |
| | | |
| | | public function withTransaction(callable $callback): mixed |
| | | { |
| | | $this->wpdb->query('START TRANSACTION'); |
| | | |
| | | $this->table->startTransaction(); |
| | | try { |
| | | $result = $callback(); |
| | | $this->wpdb->query('COMMIT'); |
| | | $this->table->commit(); |
| | | return $result; |
| | | } catch (\Throwable $e) { |
| | | $this->wpdb->query('ROLLBACK'); |
| | | $this->table->rollback(); |
| | | error_log('[Storage] Transaction rolled back: ' . $e->getMessage()); |
| | | throw $e; |
| | | } |
| | |
| | | return $this->withTransaction(function () use ($stuckMinutes) { |
| | | $cutoff = date('Y-m-d H:i:s', strtotime("-{$stuckMinutes} minutes")); |
| | | |
| | | // Get IDs first for logging |
| | | $stuckIds = $this->wpdb->get_col($this->wpdb->prepare(" |
| | | SELECT id FROM {$this->table} |
| | | WHERE state = 'processing' |
| | | AND started_at < %s |
| | | FOR UPDATE |
| | | ", $cutoff)); |
| | | $stuckIds = $this->table->queryResults( |
| | | "SELECT id FROM {table} WHERE state = 'processing' AND started_at < %s FOR UPDATE", |
| | | [$cutoff] |
| | | ); |
| | | $stuckIds = array_column($stuckIds, 'id'); |
| | | |
| | | if (empty($stuckIds)) { |
| | | return 0; |
| | | } |
| | | if (empty($stuckIds)) return 0; |
| | | |
| | | // Reset them |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET state = 'scheduled', |
| | | scheduled_at = %s, |
| | | retries = retries + 1, |
| | | updated_at = %s |
| | | WHERE id IN (" . implode(',', array_fill(0, count($stuckIds), '%s')) . ") |
| | | ", |
| | | date('Y-m-d H:i:s', time() + 60), |
| | | current_time('mysql'), |
| | | ...$stuckIds |
| | | )); |
| | | |
| | | // Optional: Log to audit table |
| | | // $this->logStuckReset($stuckIds); |
| | | |
| | | return (int) $affected; |
| | | $placeholders = implode(',', array_fill(0, count($stuckIds), '%s')); |
| | | return (int) $this->table->query( |
| | | "UPDATE {table} SET state = 'scheduled', scheduled_at = %s, retries = retries + 1, updated_at = %s WHERE id IN ({$placeholders})", |
| | | array_merge([date('Y-m-d H:i:s', time() + 60), current_time('mysql')], $stuckIds) |
| | | ); |
| | | }); |
| | | } |
| | | |
| | |
| | | public function replaceDependency(string $fromId, string $toId): int |
| | | { |
| | | return $this->withTransaction(function () use ($fromId, $toId) { |
| | | |
| | | // Only affect pending/scheduled operations |
| | | $affected = $this->wpdb->query($this->wpdb->prepare(" |
| | | UPDATE {$this->table} |
| | | SET dependencies = REPLACE(dependencies, %s, %s), |
| | | updated_at = %s |
| | | WHERE state IN ('pending', 'scheduled') |
| | | AND dependencies LIKE %s |
| | | ", |
| | | '"' . $fromId . '"', |
| | | '"' . $toId . '"', |
| | | current_time('mysql'), |
| | | '%"' . $fromId . '"%' |
| | | )); |
| | | |
| | | return (int) $affected; |
| | | return (int) $this->table->query( |
| | | "UPDATE {table} SET dependencies = REPLACE(dependencies, %s, %s), updated_at = %s WHERE state IN ('pending', 'scheduled') AND dependencies LIKE %s", |
| | | ['"'.$fromId.'"', '"'.$toId.'"', current_time('mysql'), '%"'.$fromId.'"%'] |
| | | ); |
| | | }); |
| | | } |
| | | |
| | | public function defineTables():void |
| | | { |
| | | $queue = CustomTable::for('_operation_queue'); |
| | | $queue->setColumns([ |
| | | 'id' => 'VARCHAR(64) NOT NULL', |
| | | 'type' => 'VARCHAR(50) NOT NULL', |
| | | 'user_id' => $queue->getUserIDType().' NOT NULL', |
| | | |
| | | 'request_data' => 'JSON NOT NULL CHECK (JSON_VALID(request_data))', |
| | | |
| | | 'total_items' => 'INT(11) NOT NULL DEFAULT 1', |
| | | 'processed_items' => 'INT(11) DEFAULT 0', |
| | | 'failed_items' => 'JSON', |
| | | |
| | | 'priority' => 'ENUM(\'high\',\'normal\',\'low\') DEFAULT \'normal\'', |
| | | 'state' => 'ENUM(\'pending\', \'scheduled\', \'processing\', \'completed\') DEFAULT \'pending\'', |
| | | 'outcome' => 'ENUM(\'pending\', \'success\',\'partial\',\'merged\',\'failed\',\'failed_permanent\') DEFAULT \'pending\'', |
| | | |
| | | 'retries' => 'INT(11) DEFAULT 0', |
| | | 'last_error_hash'=> 'CHAR(32) DEFAULT NULL', |
| | | 'error_message' => 'TEXT', |
| | | |
| | | 'scheduled_at' => 'DATETIME DEFAULT NULL', |
| | | 'started_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', |
| | | 'completed_at' => 'DATETIME DEFAULT NULL', |
| | | |
| | | 'metadata' => 'JSON DEFAULT NULL', |
| | | 'result' => 'JSON', |
| | | 'dependencies' => 'JSON', |
| | | 'merged_into' => 'VARCHAR(64) DEFAULT NULL', |
| | | |
| | | 'user_dismissed'=> 'tinyint(1) DEFAULT 0', |
| | | 'created_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', |
| | | 'updated_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP', |
| | | ]); |
| | | |
| | | $queue->setKeys([ |
| | | ['key' => 'PRIMARY', 'value' => '(`id`)'], |
| | | '`idx_run_queue` (`state`, `priority`, `scheduled_at`)', |
| | | '`idx_user_ops` (`user_id`, `state`)', |
| | | '`idx_user_type_pending` (`user_id`, `type`, `state`)', |
| | | '`idx_completed_at` (`completed_at`)', |
| | | '`idx_processing_stuck` (`state`, `started_at`)' |
| | | ]); |
| | | |
| | | $queue->defineTable(); |
| | | $this->table = $queue; |
| | | |
| | | $stats = CustomTable::for('stats__operation_queue'); |
| | | $stats->setColumns([ |
| | | 'id' => 'BIGINT unsigned AUTO_INCREMENT', |
| | | 'date' => 'DATE NOT NULL', |
| | | 'type' => 'VARCHAR(50) NOT NULL', |
| | | |
| | | // Only store what can't be queried from the main table later |
| | | 'peak_queue_size' => 'INT NOT NULL DEFAULT 0', |
| | | 'peak_memory_bytes' => 'BIGINT DEFAULT NULL', |
| | | |
| | | // Snapshot totals for post-purge historical view |
| | | 'total_operations' => 'INT NOT NULL DEFAULT 0', |
| | | 'successful_operations' => 'INT NOT NULL DEFAULT 0', |
| | | 'failed_permanent_operations' => 'INT NOT NULL DEFAULT 0', |
| | | 'total_items_processed' => 'INT NOT NULL DEFAULT 0', |
| | | |
| | | 'created_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', |
| | | 'updated_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', |
| | | ]); |
| | | |
| | | $stats->setKeys([ |
| | | ['key' => 'PRIMARY', 'value' => '(`id`)'], |
| | | ['key' => 'UNIQUE', 'value' => '(`date`, `type`)'], |
| | | '`date_idx` (`date`)', |
| | | '`type_idx` (`type`)' |
| | | ]); |
| | | $stats->defineTable(); |
| | | |
| | | $this->stats = $stats; |
| | | } |
| | | |
| | | public function snapshotDaily(): void |
| | | { |
| | | $today = current_time('Y-m-d'); |
| | | $types = $this->table->queryResults( |
| | | "SELECT DISTINCT type FROM {table} WHERE DATE(completed_at) = %s", |
| | | [$today] |
| | | ); |
| | | |
| | | foreach ($types as $row) { |
| | | $stats = $this->table->queryResults(" |
| | | SELECT |
| | | COUNT(*) as total, |
| | | SUM(outcome = 'success') as successful, |
| | | SUM(outcome = 'failed_permanent') as failed_permanent, |
| | | SUM(processed_items) as items_processed |
| | | FROM {table} |
| | | WHERE type = %s AND DATE(completed_at) = %s |
| | | ", [$row->type, $today]); |
| | | |
| | | $s = $stats[0] ?? null; |
| | | if (!$s) continue; |
| | | |
| | | $this->stats->table->query(" |
| | | INSERT INTO {table} (date, type, total_operations, successful_operations, failed_permanent_operations, total_items_processed) |
| | | VALUES (%s, %s, %d, %d, %d, %d) |
| | | ON DUPLICATE KEY UPDATE |
| | | total_operations = VALUES(total_operations), |
| | | successful_operations = VALUES(successful_operations), |
| | | failed_permanent_operations = VALUES(failed_permanent_operations), |
| | | total_items_processed = VALUES(total_items_processed), |
| | | updated_at = NOW() |
| | | ", [$today, $row->type, $s->total, $s->successful, $s->failed_permanent, $s->items_processed]); |
| | | } |
| | | } |
| | | } |