127 lines
3.5 KiB
PHP
127 lines
3.5 KiB
PHP
<?php
|
|
|
|
namespace app\service\Task;
|
|
|
|
use support\Redis;
|
|
|
|
class ProofDbTaskQueue
|
|
{
|
|
public function push(array $task): void
|
|
{
|
|
Redis::lPush($this->pendingKey(), json_encode($task, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
|
}
|
|
|
|
public function pop(int $timeout = 5): ?array
|
|
{
|
|
$result = Redis::brPop([$this->pendingKey()], $timeout);
|
|
if (!is_array($result) || count($result) < 2) {
|
|
return null;
|
|
}
|
|
|
|
$task = json_decode((string) $result[1], true);
|
|
return is_array($task) ? $task : null;
|
|
}
|
|
|
|
public function retryLater(array $task, string $error): void
|
|
{
|
|
$taskId = $this->taskId($task);
|
|
$retryKey = $this->retryKey($taskId);
|
|
$retryCount = (int) Redis::incr($retryKey);
|
|
Redis::setEx($this->errorKey($taskId), 86400, $error);
|
|
|
|
$task['attempt'] = $retryCount + 1;
|
|
if ($retryCount > $this->maxRetries()) {
|
|
Redis::lPush($this->failedKey(), json_encode($task, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
|
return;
|
|
}
|
|
|
|
$delay = $this->baseDelaySeconds() * (2 ** max(0, $retryCount - 1));
|
|
Redis::zAdd($this->delayedKey(), time() + $delay, json_encode($task, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
|
}
|
|
|
|
public function releaseDueDelayed(): int
|
|
{
|
|
$items = Redis::zRangeByScore($this->delayedKey(), '-inf', (string) time(), ['limit' => [0, 100]]);
|
|
if (!is_array($items) || $items === []) {
|
|
return 0;
|
|
}
|
|
|
|
foreach ($items as $task) {
|
|
Redis::zRem($this->delayedKey(), $task);
|
|
Redis::lPush($this->pendingKey(), $task);
|
|
}
|
|
|
|
return count($items);
|
|
}
|
|
|
|
public function clearRetry(array $task): void
|
|
{
|
|
$taskId = $this->taskId($task);
|
|
Redis::del($this->retryKey($taskId), $this->errorKey($taskId));
|
|
}
|
|
|
|
public function blockTimeout(): int
|
|
{
|
|
return (int) config('queue.tasks.block_timeout', 5);
|
|
}
|
|
|
|
public function idleSleepSeconds(): int
|
|
{
|
|
return max(1, (int) config('queue.tasks.idle_sleep_seconds', 1));
|
|
}
|
|
|
|
public function dispatcherIntervalSeconds(): int
|
|
{
|
|
return max(1, (int) config('queue.tasks.dispatcher_interval_seconds', 15));
|
|
}
|
|
|
|
public function dispatcherBatchSize(): int
|
|
{
|
|
return max(1, (int) config('queue.tasks.dispatcher_batch_size', 20));
|
|
}
|
|
|
|
private function taskId(array $task): string
|
|
{
|
|
return implode(':', [
|
|
(string) ($task['task_type'] ?? 'unknown'),
|
|
(string) ($task['target_type'] ?? 'unknown'),
|
|
(string) ($task['target_uid'] ?? 'unknown'),
|
|
]);
|
|
}
|
|
|
|
private function pendingKey(): string
|
|
{
|
|
return config('queue.tasks.pending', 'proofdb:tasks:pending');
|
|
}
|
|
|
|
private function delayedKey(): string
|
|
{
|
|
return config('queue.tasks.delayed', 'proofdb:tasks:delayed');
|
|
}
|
|
|
|
private function failedKey(): string
|
|
{
|
|
return config('queue.tasks.failed', 'proofdb:tasks:failed');
|
|
}
|
|
|
|
private function retryKey(string $taskId): string
|
|
{
|
|
return config('queue.tasks.retry_prefix', 'proofdb:tasks:retry:') . $taskId;
|
|
}
|
|
|
|
private function errorKey(string $taskId): string
|
|
{
|
|
return config('queue.tasks.error_prefix', 'proofdb:tasks:error:') . $taskId;
|
|
}
|
|
|
|
private function maxRetries(): int
|
|
{
|
|
return (int) config('queue.tasks.max_retries', 5);
|
|
}
|
|
|
|
private function baseDelaySeconds(): int
|
|
{
|
|
return (int) config('queue.tasks.base_delay_seconds', 60);
|
|
}
|
|
}
|