101 lines
3.1 KiB
PHP
101 lines
3.1 KiB
PHP
<?php
|
|
|
|
namespace app\service\Search;
|
|
|
|
use Throwable;
|
|
|
|
class ChunkSearchIndexHandler
|
|
{
|
|
private ChunkSearchIndexRepository $chunks;
|
|
private OpenSearchChunkIndex $index;
|
|
|
|
public function __construct(
|
|
?ChunkSearchIndexRepository $chunks = null,
|
|
?OpenSearchChunkIndex $index = null
|
|
) {
|
|
$this->chunks = $chunks ?? new ChunkSearchIndexRepository();
|
|
$this->index = $index ?? new OpenSearchChunkIndex();
|
|
}
|
|
|
|
public function handle(array $task): int
|
|
{
|
|
if (($task['target_type'] ?? null) !== 'archive') {
|
|
return 0;
|
|
}
|
|
|
|
$archiveUid = trim((string) ($task['target_uid'] ?? ''));
|
|
if ($archiveUid === '') {
|
|
return 0;
|
|
}
|
|
|
|
$documents = $this->chunks->findQueuedDocuments($archiveUid, (int) config('opensearch.bulk.chunk_size', 500));
|
|
if ($documents === []) {
|
|
return 0;
|
|
}
|
|
|
|
$chunkUids = array_column($documents, 'chunk_uid');
|
|
$this->chunks->markIndexing($chunkUids);
|
|
|
|
try {
|
|
$documents = $this->validatedDocuments($documents);
|
|
if ($documents === []) {
|
|
return 0;
|
|
}
|
|
|
|
$chunkUids = array_column($documents, 'chunk_uid');
|
|
$this->index->ensureExists();
|
|
$response = $this->index->bulkIndex($documents);
|
|
$failedChunkUids = $this->failedChunkUids($response);
|
|
|
|
if ($failedChunkUids !== []) {
|
|
$this->chunks->markFailed($failedChunkUids, 'OpenSearch bulk index returned item errors.', true);
|
|
}
|
|
|
|
$indexedChunkUids = array_values(array_diff($chunkUids, $failedChunkUids));
|
|
$this->chunks->markIndexed($indexedChunkUids);
|
|
return count($chunkUids);
|
|
} catch (Throwable $exception) {
|
|
$this->chunks->markFailed($chunkUids, $exception->getMessage(), true);
|
|
throw $exception;
|
|
}
|
|
}
|
|
|
|
private function validatedDocuments(array $documents): array
|
|
{
|
|
$dimensions = (int) config('opensearch.vector.dimensions', 2048);
|
|
$valid = [];
|
|
foreach ($documents as $document) {
|
|
if (($document['embedding_dimensions'] ?? 0) !== $dimensions) {
|
|
$this->chunks->markFailed([$document['chunk_uid']], sprintf(
|
|
'Chunk %s embedding dimension mismatch: expected %d, got %d.',
|
|
$document['chunk_uid'] ?? '',
|
|
$dimensions,
|
|
$document['embedding_dimensions'] ?? 0
|
|
), false);
|
|
continue;
|
|
}
|
|
|
|
$valid[] = $document;
|
|
}
|
|
|
|
return $valid;
|
|
}
|
|
|
|
private function failedChunkUids(array $response): array
|
|
{
|
|
if (($response['errors'] ?? false) !== true) {
|
|
return [];
|
|
}
|
|
|
|
$failed = [];
|
|
foreach ($response['items'] ?? [] as $item) {
|
|
$result = $item['index'] ?? [];
|
|
if (isset($result['error'])) {
|
|
$failed[] = (string) ($result['_id'] ?? '');
|
|
}
|
|
}
|
|
|
|
return array_values(array_filter($failed));
|
|
}
|
|
}
|