57 lines
1.5 KiB
PHP
57 lines
1.5 KiB
PHP
<?php
|
|
|
|
namespace app\process;
|
|
|
|
use app\service\Embedding\ChunkEmbeddingHandler;
|
|
use app\service\Search\ChunkSearchIndexHandler;
|
|
use app\service\Task\ProofDbTaskQueue;
|
|
use Throwable;
|
|
use Workerman\Timer;
|
|
|
|
class ProofDbTaskWorker
|
|
{
|
|
private ProofDbTaskQueue $queue;
|
|
private ChunkEmbeddingHandler $embeddings;
|
|
private ChunkSearchIndexHandler $searchIndex;
|
|
|
|
public function __construct()
|
|
{
|
|
$this->queue = new ProofDbTaskQueue();
|
|
$this->embeddings = new ChunkEmbeddingHandler();
|
|
$this->searchIndex = new ChunkSearchIndexHandler();
|
|
}
|
|
|
|
public function onWorkerStart(): void
|
|
{
|
|
Timer::add(10, fn (): int => $this->queue->releaseDueDelayed());
|
|
|
|
while (true) {
|
|
$this->queue->releaseDueDelayed();
|
|
$task = $this->queue->pop($this->queue->blockTimeout());
|
|
if ($task === null) {
|
|
sleep($this->queue->idleSleepSeconds());
|
|
continue;
|
|
}
|
|
|
|
$this->handle($task);
|
|
}
|
|
}
|
|
|
|
private function handle(array $task): void
|
|
{
|
|
try {
|
|
if (($task['task_type'] ?? null) === 'embedding') {
|
|
$this->embeddings->handle($task);
|
|
}
|
|
|
|
if (($task['task_type'] ?? null) === 'search_index') {
|
|
$this->searchIndex->handle($task);
|
|
}
|
|
|
|
$this->queue->clearRetry($task);
|
|
} catch (Throwable $exception) {
|
|
$this->queue->retryLater($task, $exception->getMessage());
|
|
}
|
|
}
|
|
}
|