proofdb/app/service/Embedding/ChunkEmbeddingRepository.php
2026-05-11 15:23:34 +08:00

181 lines
6.6 KiB
PHP

<?php
namespace app\service\Embedding;
use support\Db;
class ChunkEmbeddingRepository
{
public function countChunks(?string $archiveUid = null): int
{
$query = Db::table('chunks');
if ($archiveUid !== null && trim($archiveUid) !== '') {
$query->where('archive_uid', trim($archiveUid));
}
return (int) $query->count();
}
public function countChunksByStatuses(array $statuses, ?string $archiveUid = null): int
{
$query = Db::table('chunks')->whereIn('embedding_status', $statuses);
if ($archiveUid !== null && trim($archiveUid) !== '') {
$query->where('archive_uid', trim($archiveUid));
}
return (int) $query->count();
}
public function resetAllChunksToPending(?string $archiveUid = null): int
{
$query = Db::table('chunks');
if ($archiveUid !== null && trim($archiveUid) !== '') {
$query->where('archive_uid', trim($archiveUid));
}
return $query->update($this->pendingResetPayload());
}
public function resetRecoverableChunksToPending(?string $archiveUid = null): int
{
$query = Db::table('chunks')
->whereIn('embedding_status', [
EmbeddingStatus::QUEUED,
EmbeddingStatus::PROCESSING,
EmbeddingStatus::FAILED_RETRYABLE,
]);
if ($archiveUid !== null && trim($archiveUid) !== '') {
$query->where('archive_uid', trim($archiveUid));
}
return $query->update($this->pendingResetPayload());
}
public function queuePendingArchiveTasks(int $limit): array
{
$staleBefore = date('Y-m-d H:i:s', time() - max(60, (int) config('queue.tasks.stale_after_seconds', 900)));
$archiveUids = Db::table('chunks')
->where(function ($query) use ($staleBefore): void {
$query
->whereIn('embedding_status', [EmbeddingStatus::PENDING, EmbeddingStatus::FAILED_RETRYABLE])
->orWhere(function ($stale) use ($staleBefore): void {
$stale
->whereIn('embedding_status', [EmbeddingStatus::QUEUED, EmbeddingStatus::PROCESSING])
->where(function ($time) use ($staleBefore): void {
$time->whereNull('embedding_updated_at')->orWhere('embedding_updated_at', '<', $staleBefore);
});
});
})
->select('archive_uid')
->groupBy('archive_uid')
->orderByRaw('MIN(id)')
->limit($limit)
->pluck('archive_uid')
->all();
$archiveUids = array_values(array_filter(array_map('strval', $archiveUids)));
foreach ($archiveUids as $archiveUid) {
Db::table('chunks')
->where('archive_uid', $archiveUid)
->where(function ($query) use ($staleBefore): void {
$query
->whereIn('embedding_status', [EmbeddingStatus::PENDING, EmbeddingStatus::FAILED_RETRYABLE])
->orWhere(function ($stale) use ($staleBefore): void {
$stale
->whereIn('embedding_status', [EmbeddingStatus::QUEUED, EmbeddingStatus::PROCESSING])
->where(function ($time) use ($staleBefore): void {
$time->whereNull('embedding_updated_at')->orWhere('embedding_updated_at', '<', $staleBefore);
});
});
})
->update([
'embedding_status' => EmbeddingStatus::QUEUED,
'embedding_error' => null,
'embedding_updated_at' => Db::raw('CURRENT_TIMESTAMP'),
]);
}
return $archiveUids;
}
public function findQueuedChunks(string $archiveUid, int $limit): array
{
$chunks = Db::table('chunks')
->where('archive_uid', $archiveUid)
->whereIn('embedding_status', [EmbeddingStatus::QUEUED, EmbeddingStatus::PROCESSING])
->orderBy('chunk_index')
->limit($limit)
->get(['chunk_uid', 'archive_uid', 'chunk_index', 'text'])
->all();
return array_map(static fn (object $chunk): array => [
'chunk_uid' => (string) $chunk->chunk_uid,
'archive_uid' => (string) $chunk->archive_uid,
'chunk_index' => (int) $chunk->chunk_index,
'text' => (string) $chunk->text,
], $chunks);
}
public function markProcessing(array $chunkUids): void
{
$this->updateStatus($chunkUids, EmbeddingStatus::PROCESSING);
}
public function markEmbedded(string $chunkUid, array $embeddingRef, string $model): void
{
Db::table('chunks')->where('chunk_uid', $chunkUid)->update([
'embedding_status' => EmbeddingStatus::EMBEDDED,
'embedding_ref' => json_encode($embeddingRef, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES),
'embedding_model' => $model,
'embedding_error' => null,
'embedding_updated_at' => Db::raw('CURRENT_TIMESTAMP'),
'search_index_status' => 0,
'search_index_error' => null,
'search_index_updated_at' => null,
]);
}
public function markFailed(array $chunkUids, string $error, bool $retryable): void
{
if ($chunkUids === []) {
return;
}
Db::table('chunks')->whereIn('chunk_uid', $chunkUids)->update([
'embedding_status' => $retryable ? EmbeddingStatus::FAILED_RETRYABLE : EmbeddingStatus::FAILED_TERMINAL,
'embedding_error' => mb_substr($error, 0, 4000),
'embedding_updated_at' => Db::raw('CURRENT_TIMESTAMP'),
]);
}
private function updateStatus(array $chunkUids, int $status): void
{
if ($chunkUids === []) {
return;
}
Db::table('chunks')->whereIn('chunk_uid', $chunkUids)->update([
'embedding_status' => $status,
'embedding_updated_at' => Db::raw('CURRENT_TIMESTAMP'),
]);
}
private function pendingResetPayload(): array
{
return [
'embedding_status' => EmbeddingStatus::PENDING,
'embedding_ref' => null,
'embedding_model' => null,
'embedding_error' => null,
'embedding_updated_at' => null,
'search_index_status' => 0,
'search_index_error' => null,
'search_index_updated_at' => null,
];
}
}