proofdb/app/process/ProofDbTaskWorker.php
2026-05-07 01:40:58 +08:00

57 lines
1.5 KiB
PHP

<?php
namespace app\process;
use app\service\Embedding\ChunkEmbeddingHandler;
use app\service\Search\ChunkSearchIndexHandler;
use app\service\Task\ProofDbTaskQueue;
use Throwable;
use Workerman\Timer;
class ProofDbTaskWorker
{
private ProofDbTaskQueue $queue;
private ChunkEmbeddingHandler $embeddings;
private ChunkSearchIndexHandler $searchIndex;
public function __construct()
{
$this->queue = new ProofDbTaskQueue();
$this->embeddings = new ChunkEmbeddingHandler();
$this->searchIndex = new ChunkSearchIndexHandler();
}
public function onWorkerStart(): void
{
Timer::add(10, fn (): int => $this->queue->releaseDueDelayed());
while (true) {
$this->queue->releaseDueDelayed();
$task = $this->queue->pop($this->queue->blockTimeout());
if ($task === null) {
sleep($this->queue->idleSleepSeconds());
continue;
}
$this->handle($task);
}
}
private function handle(array $task): void
{
try {
if (($task['task_type'] ?? null) === 'embedding') {
$this->embeddings->handle($task);
}
if (($task['task_type'] ?? null) === 'search_index') {
$this->searchIndex->handle($task);
}
$this->queue->clearRetry($task);
} catch (Throwable $exception) {
$this->queue->retryLater($task, $exception->getMessage());
}
}
}