proofdb/app/service/Search/ChunkSearchIndexHandler.php
2026-05-11 15:23:34 +08:00

101 lines
3.1 KiB
PHP

<?php
namespace app\service\Search;
use Throwable;
class ChunkSearchIndexHandler
{
private ChunkSearchIndexRepository $chunks;
private OpenSearchChunkIndex $index;
public function __construct(
?ChunkSearchIndexRepository $chunks = null,
?OpenSearchChunkIndex $index = null
) {
$this->chunks = $chunks ?? new ChunkSearchIndexRepository();
$this->index = $index ?? new OpenSearchChunkIndex();
}
public function handle(array $task): int
{
if (($task['target_type'] ?? null) !== 'archive') {
return 0;
}
$archiveUid = trim((string) ($task['target_uid'] ?? ''));
if ($archiveUid === '') {
return 0;
}
$documents = $this->chunks->findQueuedDocuments($archiveUid, (int) config('opensearch.bulk.chunk_size', 500));
if ($documents === []) {
return 0;
}
$chunkUids = array_column($documents, 'chunk_uid');
$this->chunks->markIndexing($chunkUids);
try {
$documents = $this->validatedDocuments($documents);
if ($documents === []) {
return 0;
}
$chunkUids = array_column($documents, 'chunk_uid');
$this->index->ensureExists();
$response = $this->index->bulkIndex($documents);
$failedChunkUids = $this->failedChunkUids($response);
if ($failedChunkUids !== []) {
$this->chunks->markFailed($failedChunkUids, 'OpenSearch bulk index returned item errors.', true);
}
$indexedChunkUids = array_values(array_diff($chunkUids, $failedChunkUids));
$this->chunks->markIndexed($indexedChunkUids);
return count($chunkUids);
} catch (Throwable $exception) {
$this->chunks->markFailed($chunkUids, $exception->getMessage(), true);
throw $exception;
}
}
private function validatedDocuments(array $documents): array
{
$dimensions = (int) config('opensearch.vector.dimensions', 2048);
$valid = [];
foreach ($documents as $document) {
if (($document['embedding_dimensions'] ?? 0) !== $dimensions) {
$this->chunks->markFailed([$document['chunk_uid']], sprintf(
'Chunk %s embedding dimension mismatch: expected %d, got %d.',
$document['chunk_uid'] ?? '',
$dimensions,
$document['embedding_dimensions'] ?? 0
), false);
continue;
}
$valid[] = $document;
}
return $valid;
}
private function failedChunkUids(array $response): array
{
if (($response['errors'] ?? false) !== true) {
return [];
}
$failed = [];
foreach ($response['items'] ?? [] as $item) {
$result = $item['index'] ?? [];
if (isset($result['error'])) {
$failed[] = (string) ($result['_id'] ?? '');
}
}
return array_values(array_filter($failed));
}
}