From 547cbbb4a519df7a680847efa40ee95f319825fd Mon Sep 17 00:00:00 2001 From: Ziki Shay Date: Mon, 11 May 2026 15:23:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apidoc/adminapi.md | 2 - apidoc/evidenceapi.md | 4 - .../Api/ArticleImportController.php | 20 ++- app/process/AiMetadata.php | 38 ++++- .../AdminConsole/ArchiveAdminService.php | 4 +- .../AdminConsole/MaintenanceScriptService.php | 16 +-- .../ArchiveMetadataEnrichmentService.php | 6 +- app/service/ArchiveRepository.php | 97 ++++++++++++- app/service/ArticleImportService.php | 63 -------- .../Embedding/ChunkEmbeddingHandler.php | 14 +- .../Embedding/ChunkEmbeddingRepository.php | 89 +++++++++++- .../Search/ChunkSearchIndexHandler.php | 11 +- .../Search/ChunkSearchIndexRepository.php | 77 ++++++++-- app/view/admin/dashboard.html | 12 +- config/queue.php | 4 + readme.md | 3 +- scriptdoc/README.md | 4 +- scriptdoc/backfill_archive_content.md | 79 ++-------- scriptdoc/reembed_chunks.md | 113 +++++++++++++++ scriptdoc/reindex_opensearch.md | 53 ++++++- scriptdoc/setup_database.md | 1 + scripts/backfill_archive_content.php | 116 +-------------- scripts/reembed_chunks.php | 135 ++++++++++++++++++ scripts/reindex_opensearch.php | 88 ++++++++++-- scripts/setup_database.php | 4 +- 25 files changed, 721 insertions(+), 332 deletions(-) create mode 100644 scriptdoc/reembed_chunks.md create mode 100644 scripts/reembed_chunks.php diff --git a/apidoc/adminapi.md b/apidoc/adminapi.md index c963f63..7232133 100644 --- a/apidoc/adminapi.md +++ b/apidoc/adminapi.md @@ -128,8 +128,6 @@ PATCH /api/admin/archives/{archive_uid} - `series` - `tags` - `metadata` -- `content` -- `raw` 其中: diff --git a/apidoc/evidenceapi.md b/apidoc/evidenceapi.md index 23c8a99..523e73e 100644 --- a/apidoc/evidenceapi.md +++ b/apidoc/evidenceapi.md @@ -52,8 +52,6 @@ curl /api/archives/01KQHVREB6XPYF604RVZAP9NNY "provider": "bigmodel" } }, - "content": "full normalized archive content ...", - "raw": "# 1.test ...", "chunks": [ "01KQHVREB6XPYF604RVZAP9NNY_1_39003", "01KQHVREB6XPYF604RVZAP9NNY_2_12345" @@ -65,8 +63,6 @@ curl /api/archives/01KQHVREB6XPYF604RVZAP9NNY 说明: -- `content` 是归一化后的 archive 正文。 -- `raw` 是导入时保存的原始 Markdown。 - `chunks` 是当前 archive 关联的 `chunk_uid` 列表。 - `chunk_count` 方便调用方快速判断档案规模,而不必自己数数组长度。 diff --git a/app/controller/Api/ArticleImportController.php b/app/controller/Api/ArticleImportController.php index 7d605ca..7acc7c7 100644 --- a/app/controller/Api/ArticleImportController.php +++ b/app/controller/Api/ArticleImportController.php @@ -37,9 +37,23 @@ class ArticleImportController try { $service->persistSnapshot($result['data']); (new \app\service\ArchiveRepository())->saveImport($result['data']); - if (($result['data']['queue']['needs_ai_metadata'] ?? false) === true) { - (new \app\service\AiMetadataQueue())->push($result['data']['archive']['archive_uid']); - $result['data']['queue']['ai_metadata_enqueued'] = true; + if (($result['data']['queue']['needs_ai_metadata'] ?? false) === true && (new \app\service\ArchiveMetadataEnrichmentService())->isEnabled()) { + $archiveUid = $result['data']['archive']['archive_uid']; + (new \app\service\ArchiveRepository())->markAiMetadataStatus($archiveUid, 'queued'); + + try { + (new \app\service\AiMetadataQueue())->push($archiveUid); + $result['data']['queue']['ai_metadata_enqueued'] = true; + } catch (Throwable $queueException) { + (new \app\service\ArchiveRepository())->markAiMetadataStatus($archiveUid, 'failed_retryable', $queueException->getMessage()); + $result['data']['queue']['ai_metadata_enqueue_error'] = $queueException->getMessage(); + } + } elseif (($result['data']['queue']['needs_ai_metadata'] ?? false) === true) { + (new \app\service\ArchiveRepository())->markAiMetadataStatus( + $result['data']['archive']['archive_uid'], + 'disabled', + 'AI metadata enrichment is not enabled.' + ); } } catch (Throwable $exception) { return $this->jsonResponse([ diff --git a/app/process/AiMetadata.php b/app/process/AiMetadata.php index a1610c8..a3206a6 100644 --- a/app/process/AiMetadata.php +++ b/app/process/AiMetadata.php @@ -24,6 +24,17 @@ class AiMetadata public function onWorkerStart(): void { Timer::add(10, fn (): int => $this->queue->releaseDueDelayed()); + Timer::add(max(30, (int) config('queue.ai_metadata.dispatcher_interval_seconds', 60)), function (): void { + try { + if (!$this->enrichment->isEnabled()) { + return; + } + foreach ($this->archives->queuePendingAiMetadataArchives((int) config('queue.ai_metadata.dispatcher_batch_size', 20)) as $archiveUid) { + $this->queue->push($archiveUid); + } + } catch (Throwable) { + } + }); while (true) { $this->queue->releaseDueDelayed(); @@ -47,10 +58,18 @@ class AiMetadata } if (!$this->archives->archiveNeedsMetadata($archive)) { + $this->archives->markAiMetadataStatus($archiveUid, 'completed', null, ['attempted' => false]); $this->queue->clearRetry($archiveUid); return; } + if (!$this->enrichment->isEnabled()) { + $this->archives->markAiMetadataStatus($archiveUid, 'disabled', 'AI metadata enrichment is not enabled.'); + $this->queue->clearRetry($archiveUid); + return; + } + + $this->archives->markAiMetadataStatus($archiveUid, 'processing'); $payload = $archive; $payload['content'] = $this->archives->findChunksText($archiveUid); @@ -58,7 +77,15 @@ class AiMetadata $aiMeta = $enriched['metadata']['ai_enrichment'] ?? []; if (($aiMeta['attempted'] ?? false) !== true || ($aiMeta['error'] ?? null)) { - $this->queue->retryLater($archiveUid, $aiMeta['error'] ?? 'AI metadata enrichment did not complete.'); + $retryable = ($aiMeta['enabled'] ?? true) === true && ($aiMeta['error'] ?? null) !== null; + $status = $retryable ? 'failed_retryable' : 'failed_terminal'; + $error = $aiMeta['error'] ?? 'AI metadata enrichment did not complete.'; + $this->archives->markAiMetadataStatus($archiveUid, $status, $error, $aiMeta); + if ($retryable) { + $this->queue->retryLater($archiveUid, $error); + } else { + $this->queue->clearRetry($archiveUid); + } return; } @@ -70,9 +97,16 @@ class AiMetadata } $this->archives->updateMetadata($archiveUid, $fields, $aiMeta); + $this->archives->markAiMetadataStatus($archiveUid, 'completed', null, $aiMeta); $this->queue->clearRetry($archiveUid); } catch (Throwable $exception) { - $this->queue->retryLater($archiveUid, $exception->getMessage()); + $status = $this->archives->isAiMetadataRetryable($exception) ? 'failed_retryable' : 'failed_terminal'; + $this->archives->markAiMetadataStatus($archiveUid, $status, $exception->getMessage()); + if ($status === 'failed_retryable') { + $this->queue->retryLater($archiveUid, $exception->getMessage()); + } else { + $this->queue->clearRetry($archiveUid); + } } } } diff --git a/app/service/AdminConsole/ArchiveAdminService.php b/app/service/AdminConsole/ArchiveAdminService.php index 5976920..31620a1 100644 --- a/app/service/AdminConsole/ArchiveAdminService.php +++ b/app/service/AdminConsole/ArchiveAdminService.php @@ -72,7 +72,7 @@ class ArchiveAdminService } $updates = []; - foreach (['title', 'summary', 'author', 'source', 'series', 'content', 'raw'] as $field) { + foreach (['title', 'summary', 'author', 'source', 'series'] as $field) { if (array_key_exists($field, $payload)) { $updates[$field] = $this->nullableText($payload[$field]); } @@ -134,8 +134,6 @@ class ArchiveAdminService { $data = $this->listItem($row); $data['metadata'] = $this->decodeJson($row->metadata ?? null, []); - $data['content'] = $row->content; - $data['raw'] = $row->raw; $data['chunks'] = $this->decodeJson($row->chunks ?? null, []); return $data; diff --git a/app/service/AdminConsole/MaintenanceScriptService.php b/app/service/AdminConsole/MaintenanceScriptService.php index 5fbb3a5..b383450 100644 --- a/app/service/AdminConsole/MaintenanceScriptService.php +++ b/app/service/AdminConsole/MaintenanceScriptService.php @@ -119,6 +119,14 @@ class MaintenanceScriptService 'doc_name' => 'setup_opensearch.md', 'args_hint' => '无参数', ], + 'reembed_chunks' => [ + 'name' => 'reembed_chunks', + 'file' => 'reembed_chunks.php', + 'label' => '重新生成向量', + 'description' => '对 chunks 重新执行 embedding,支持 resume 与 --reset。', + 'doc_name' => 'reembed_chunks.md', + 'args_hint' => '--archive_uid=01... 或 --reset', + ], 'reindex_opensearch' => [ 'name' => 'reindex_opensearch', 'file' => 'reindex_opensearch.php', @@ -127,14 +135,6 @@ class MaintenanceScriptService 'doc_name' => 'reindex_opensearch.md', 'args_hint' => '--archive_uid=01...', ], - 'backfill_archive_content' => [ - 'name' => 'backfill_archive_content', - 'file' => 'backfill_archive_content.php', - 'label' => '回填 archive content', - 'description' => '从 raw 或 chunks 回填 archives.content。', - 'doc_name' => 'backfill_archive_content.md', - 'args_hint' => '--archive_uid=01...', - ], 'setup_admin_users' => [ 'name' => 'setup_admin_users', 'file' => 'setup_admin_users.php', diff --git a/app/service/ArchiveMetadataEnrichmentService.php b/app/service/ArchiveMetadataEnrichmentService.php index 3fe935b..8a9a80b 100644 --- a/app/service/ArchiveMetadataEnrichmentService.php +++ b/app/service/ArchiveMetadataEnrichmentService.php @@ -20,9 +20,9 @@ class ArchiveMetadataEnrichmentService public function enrich(array $payload): array { $missing = $this->missingFields($payload); - if ($missing === [] || !$this->enabled()) { + if ($missing === [] || !$this->isEnabled()) { return $this->withAiMeta($payload, [ - 'enabled' => $this->enabled(), + 'enabled' => $this->isEnabled(), 'attempted' => false, 'filled' => [], 'missing' => $missing, @@ -80,7 +80,7 @@ class ArchiveMetadataEnrichmentService return array_values(array_filter($fields, fn (string $field): bool => !$this->hasUsefulValue($payload, $field))); } - private function enabled(): bool + public function isEnabled(): bool { return (bool) config('LLMapi.metadata.enabled', true) && $this->client->isConfigured(); } diff --git a/app/service/ArchiveRepository.php b/app/service/ArchiveRepository.php index c8222b6..9aad9ac 100644 --- a/app/service/ArchiveRepository.php +++ b/app/service/ArchiveRepository.php @@ -2,6 +2,7 @@ namespace app\service; +use Throwable; use support\Db; class ArchiveRepository @@ -24,8 +25,6 @@ class ArchiveRepository 'series' => $archive['series'] ?? null, 'tags' => json_encode($archive['tags'] ?? [], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), 'metadata' => json_encode($archive['metadata'] ?? [], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), - 'content' => $archive['content'] ?? null, - 'raw' => $archive['raw'] ?? null, 'chunks' => json_encode($chunkUids, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), ] ); @@ -194,6 +193,79 @@ class ArchiveRepository Db::table('archives')->where('archive_uid', $archiveUid)->update($updates); } + public function queuePendingAiMetadataArchives(int $limit): array + { + $limit = max(1, $limit); + $rows = Db::table('archives') + ->where(function ($query): void { + $query + ->whereNull('title') + ->orWhere('title', '') + ->orWhereNull('summary') + ->orWhere('summary', '') + ->orWhereNull('author') + ->orWhere('author', '') + ->orWhereNull('year') + ->orWhere('year', '<=', 0) + ->orWhereRaw("tags = '[]'::jsonb") + ->orWhereRaw("metadata #>> '{title_source}' = 'fallback'"); + }) + ->orderByDesc('updated_time') + ->limit(max($limit * 5, 50)) + ->get() + ->all(); + + $archiveUids = []; + foreach ($rows as $row) { + $archive = $this->archiveToArray($row); + if (!$this->archiveNeedsMetadata($archive) || !$this->shouldQueueAiMetadata($archive)) { + continue; + } + + $archiveUids[] = (string) $archive['archive_uid']; + $this->markAiMetadataStatus((string) $archive['archive_uid'], 'queued', null); + + if (count($archiveUids) >= $limit) { + break; + } + } + + return $archiveUids; + } + + public function markAiMetadataStatus(string $archiveUid, string $status, ?string $error = null, array $extra = []): void + { + $archive = $this->findArchive($archiveUid); + if ($archive === null) { + return; + } + + $metadata = is_array($archive['metadata'] ?? null) ? $archive['metadata'] : []; + $aiMeta = is_array($metadata['ai_enrichment'] ?? null) ? $metadata['ai_enrichment'] : []; + $attempt = ((int) ($aiMeta['attempt'] ?? 0)) + ($status === 'processing' ? 1 : 0); + + $metadata['ai_enrichment'] = array_merge($aiMeta, $extra, [ + 'status' => $status, + 'attempt' => $attempt, + 'updated_at' => date(DATE_ATOM), + 'error' => $error, + ]); + + Db::table('archives')->where('archive_uid', $archiveUid)->update([ + 'metadata' => json_encode($metadata, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), + ]); + } + + public function isAiMetadataRetryable(Throwable $exception): bool + { + if (!$exception instanceof \app\service\LLM\LLMRequestException) { + return true; + } + + $statusCode = $exception->statusCode(); + return $statusCode === null || $statusCode === 429 || $statusCode >= 500; + } + public function archiveNeedsMetadata(array $archive): bool { foreach (['title', 'year', 'author', 'tags', 'summary'] as $field) { @@ -227,12 +299,29 @@ class ArchiveRepository 'series' => $archive->series, 'tags' => json_decode($archive->tags ?? '[]', true) ?: [], 'metadata' => json_decode($archive->metadata ?? '{}', true) ?: [], - 'content' => $archive->content, - 'raw' => $archive->raw, 'chunks' => json_decode($archive->chunks ?? '[]', true) ?: [], ]; } + private function shouldQueueAiMetadata(array $archive): bool + { + $aiMeta = is_array($archive['metadata']['ai_enrichment'] ?? null) ? $archive['metadata']['ai_enrichment'] : []; + $status = (string) ($aiMeta['status'] ?? ''); + if ($status === 'failed_terminal') { + return false; + } + + $updatedAt = strtotime((string) ($aiMeta['updated_at'] ?? '')) ?: 0; + $staleAfter = max(60, (int) config('queue.ai_metadata.stale_after_seconds', 900)); + $isFresh = $updatedAt > 0 && (time() - $updatedAt) < $staleAfter; + + if (in_array($status, ['queued', 'processing'], true) && $isFresh) { + return false; + } + + return true; + } + private function chunkRowToArray(object $row): array { return [ diff --git a/app/service/ArticleImportService.php b/app/service/ArticleImportService.php index c515d47..f334d86 100644 --- a/app/service/ArticleImportService.php +++ b/app/service/ArticleImportService.php @@ -70,16 +70,6 @@ class ArticleImportService } } - public function normalizeArchiveContentString(string $content): ?string - { - return $this->nullableClean($this->cleanMarkdownPage($content)); - } - - public function normalizeArchiveRawString(string $content): ?string - { - return $this->nullableClean($content); - } - private function validate(array $payload): array { $errors = []; @@ -192,8 +182,6 @@ class ArticleImportService 'tags' => is_array($payload['tags'] ?? null) ? array_values($payload['tags']) : [], 'summary' => $this->nullableClean($payload['summary'] ?? null), 'metadata' => $payload['metadata'] ?? [], - 'content' => $this->normalizedArchiveContent($payload), - 'raw' => $this->rawArchiveContent($payload), ]; } @@ -210,57 +198,6 @@ class ArticleImportService return $this->pageBlocksFromItems($payload, preg_split('/\R{2,}/u', $payload['content'])); } - private function normalizedArchiveContent(array $payload): ?string - { - if (isset($payload['pages']) && is_array($payload['pages'])) { - $parts = []; - foreach ($payload['pages'] as $page) { - if (!is_array($page) || !isset($page['content']) || !is_string($page['content'])) { - continue; - } - - $content = $this->cleanMarkdownPage($page['content']); - if ($content !== '') { - $parts[] = $content; - } - } - - return $this->nullableClean(implode("\n\n", $parts)); - } - - if (isset($payload['paragraphs']) && is_array($payload['paragraphs'])) { - $parts = []; - foreach ($payload['paragraphs'] as $paragraph) { - $content = is_array($paragraph) ? ($paragraph['content'] ?? '') : $paragraph; - if (!is_string($content)) { - continue; - } - - $content = $this->clean($content); - if ($content !== '') { - $parts[] = $content; - } - } - - return $this->nullableClean(implode("\n\n", $parts)); - } - - if (isset($payload['content']) && is_string($payload['content'])) { - return $this->normalizeArchiveContentString($payload['content']); - } - - return null; - } - - private function rawArchiveContent(array $payload): ?string - { - if (isset($payload['content']) && is_string($payload['content'])) { - return $this->normalizeArchiveRawString($payload['content']); - } - - return null; - } - private function pageBlocksFromPages(array $payload): array { $pageBlocks = []; diff --git a/app/service/Embedding/ChunkEmbeddingHandler.php b/app/service/Embedding/ChunkEmbeddingHandler.php index 0ca1815..aaf8c02 100644 --- a/app/service/Embedding/ChunkEmbeddingHandler.php +++ b/app/service/Embedding/ChunkEmbeddingHandler.php @@ -22,24 +22,29 @@ class ChunkEmbeddingHandler $this->retryQueue = $retryQueue ?? new LLMRetryQueue(); } - public function handle(array $task): void + public function handle(array $task): int { if (($task['target_type'] ?? null) !== 'archive') { - return; + return 0; } $archiveUid = trim((string) ($task['target_uid'] ?? '')); if ($archiveUid === '') { - return; + return 0; } $batchSize = (int) config('LLMapi.embedding.batch_size', 32); $chunks = $this->chunks->findQueuedChunks($archiveUid, $batchSize); if ($chunks === []) { - return; + return 0; } $chunkUids = array_column($chunks, 'chunk_uid'); + if (!$this->client->isConfigured()) { + $this->chunks->markFailed($chunkUids, 'BigModel embedding API is not configured.', false); + return count($chunkUids); + } + $this->chunks->markProcessing($chunkUids); try { @@ -52,6 +57,7 @@ class ChunkEmbeddingHandler ); $this->persistEmbeddings($chunks, $payload); + return count($chunkUids); } catch (Throwable $exception) { $this->chunks->markFailed($chunkUids, $exception->getMessage(), $this->isRetryable($exception)); throw $exception; diff --git a/app/service/Embedding/ChunkEmbeddingRepository.php b/app/service/Embedding/ChunkEmbeddingRepository.php index 270b3e8..11e20f0 100644 --- a/app/service/Embedding/ChunkEmbeddingRepository.php +++ b/app/service/Embedding/ChunkEmbeddingRepository.php @@ -6,11 +6,70 @@ use support\Db; class ChunkEmbeddingRepository { + public function countChunks(?string $archiveUid = null): int + { + $query = Db::table('chunks'); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return (int) $query->count(); + } + + public function countChunksByStatuses(array $statuses, ?string $archiveUid = null): int + { + $query = Db::table('chunks')->whereIn('embedding_status', $statuses); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return (int) $query->count(); + } + + public function resetAllChunksToPending(?string $archiveUid = null): int + { + $query = Db::table('chunks'); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return $query->update($this->pendingResetPayload()); + } + + public function resetRecoverableChunksToPending(?string $archiveUid = null): int + { + $query = Db::table('chunks') + ->whereIn('embedding_status', [ + EmbeddingStatus::QUEUED, + EmbeddingStatus::PROCESSING, + EmbeddingStatus::FAILED_RETRYABLE, + ]); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return $query->update($this->pendingResetPayload()); + } + public function queuePendingArchiveTasks(int $limit): array { - $statuses = [EmbeddingStatus::PENDING, EmbeddingStatus::QUEUED, EmbeddingStatus::FAILED_RETRYABLE]; + $staleBefore = date('Y-m-d H:i:s', time() - max(60, (int) config('queue.tasks.stale_after_seconds', 900))); $archiveUids = Db::table('chunks') - ->whereIn('embedding_status', $statuses) + ->where(function ($query) use ($staleBefore): void { + $query + ->whereIn('embedding_status', [EmbeddingStatus::PENDING, EmbeddingStatus::FAILED_RETRYABLE]) + ->orWhere(function ($stale) use ($staleBefore): void { + $stale + ->whereIn('embedding_status', [EmbeddingStatus::QUEUED, EmbeddingStatus::PROCESSING]) + ->where(function ($time) use ($staleBefore): void { + $time->whereNull('embedding_updated_at')->orWhere('embedding_updated_at', '<', $staleBefore); + }); + }); + }) ->select('archive_uid') ->groupBy('archive_uid') ->orderByRaw('MIN(id)') @@ -22,7 +81,17 @@ class ChunkEmbeddingRepository foreach ($archiveUids as $archiveUid) { Db::table('chunks') ->where('archive_uid', $archiveUid) - ->whereIn('embedding_status', $statuses) + ->where(function ($query) use ($staleBefore): void { + $query + ->whereIn('embedding_status', [EmbeddingStatus::PENDING, EmbeddingStatus::FAILED_RETRYABLE]) + ->orWhere(function ($stale) use ($staleBefore): void { + $stale + ->whereIn('embedding_status', [EmbeddingStatus::QUEUED, EmbeddingStatus::PROCESSING]) + ->where(function ($time) use ($staleBefore): void { + $time->whereNull('embedding_updated_at')->orWhere('embedding_updated_at', '<', $staleBefore); + }); + }); + }) ->update([ 'embedding_status' => EmbeddingStatus::QUEUED, 'embedding_error' => null, @@ -94,4 +163,18 @@ class ChunkEmbeddingRepository 'embedding_updated_at' => Db::raw('CURRENT_TIMESTAMP'), ]); } + + private function pendingResetPayload(): array + { + return [ + 'embedding_status' => EmbeddingStatus::PENDING, + 'embedding_ref' => null, + 'embedding_model' => null, + 'embedding_error' => null, + 'embedding_updated_at' => null, + 'search_index_status' => 0, + 'search_index_error' => null, + 'search_index_updated_at' => null, + ]; + } } diff --git a/app/service/Search/ChunkSearchIndexHandler.php b/app/service/Search/ChunkSearchIndexHandler.php index 74b9884..660dd65 100644 --- a/app/service/Search/ChunkSearchIndexHandler.php +++ b/app/service/Search/ChunkSearchIndexHandler.php @@ -17,20 +17,20 @@ class ChunkSearchIndexHandler $this->index = $index ?? new OpenSearchChunkIndex(); } - public function handle(array $task): void + public function handle(array $task): int { if (($task['target_type'] ?? null) !== 'archive') { - return; + return 0; } $archiveUid = trim((string) ($task['target_uid'] ?? '')); if ($archiveUid === '') { - return; + return 0; } $documents = $this->chunks->findQueuedDocuments($archiveUid, (int) config('opensearch.bulk.chunk_size', 500)); if ($documents === []) { - return; + return 0; } $chunkUids = array_column($documents, 'chunk_uid'); @@ -39,7 +39,7 @@ class ChunkSearchIndexHandler try { $documents = $this->validatedDocuments($documents); if ($documents === []) { - return; + return 0; } $chunkUids = array_column($documents, 'chunk_uid'); @@ -53,6 +53,7 @@ class ChunkSearchIndexHandler $indexedChunkUids = array_values(array_diff($chunkUids, $failedChunkUids)); $this->chunks->markIndexed($indexedChunkUids); + return count($chunkUids); } catch (Throwable $exception) { $this->chunks->markFailed($chunkUids, $exception->getMessage(), true); throw $exception; diff --git a/app/service/Search/ChunkSearchIndexRepository.php b/app/service/Search/ChunkSearchIndexRepository.php index 404a408..0c780d5 100644 --- a/app/service/Search/ChunkSearchIndexRepository.php +++ b/app/service/Search/ChunkSearchIndexRepository.php @@ -7,6 +7,31 @@ use support\Db; class ChunkSearchIndexRepository { + public function countEmbeddedChunks(?string $archiveUid = null): int + { + $query = Db::table('chunks') + ->where('embedding_status', EmbeddingStatus::EMBEDDED); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return (int) $query->count(); + } + + public function countIndexedChunks(?string $archiveUid = null): int + { + $query = Db::table('chunks') + ->where('embedding_status', EmbeddingStatus::EMBEDDED) + ->where('search_index_status', SearchIndexStatus::INDEXED); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return (int) $query->count(); + } + public function resetEmbeddedChunksToPending(?string $archiveUid = null): int { $query = Db::table('chunks') @@ -23,18 +48,44 @@ class ChunkSearchIndexRepository ]); } + public function resetRecoverableChunksToPending(?string $archiveUid = null): int + { + $query = Db::table('chunks') + ->where('embedding_status', EmbeddingStatus::EMBEDDED) + ->whereIn('search_index_status', [ + SearchIndexStatus::QUEUED, + SearchIndexStatus::INDEXING, + SearchIndexStatus::FAILED_RETRYABLE, + ]); + + if ($archiveUid !== null && trim($archiveUid) !== '') { + $query->where('archive_uid', trim($archiveUid)); + } + + return $query->update([ + 'search_index_status' => SearchIndexStatus::PENDING, + 'search_index_error' => null, + 'search_index_updated_at' => null, + ]); + } + public function queuePendingArchiveTasks(int $limit): array { - $statuses = [ - SearchIndexStatus::PENDING, - SearchIndexStatus::QUEUED, - SearchIndexStatus::INDEXING, - SearchIndexStatus::FAILED_RETRYABLE, - ]; + $staleBefore = date('Y-m-d H:i:s', time() - max(60, (int) config('queue.tasks.stale_after_seconds', 900))); $archiveUids = Db::table('chunks') ->where('embedding_status', EmbeddingStatus::EMBEDDED) - ->whereIn('search_index_status', $statuses) + ->where(function ($query) use ($staleBefore): void { + $query + ->whereIn('search_index_status', [SearchIndexStatus::PENDING, SearchIndexStatus::FAILED_RETRYABLE]) + ->orWhere(function ($stale) use ($staleBefore): void { + $stale + ->whereIn('search_index_status', [SearchIndexStatus::QUEUED, SearchIndexStatus::INDEXING]) + ->where(function ($time) use ($staleBefore): void { + $time->whereNull('search_index_updated_at')->orWhere('search_index_updated_at', '<', $staleBefore); + }); + }); + }) ->select('archive_uid') ->groupBy('archive_uid') ->orderByRaw('MIN(id)') @@ -47,7 +98,17 @@ class ChunkSearchIndexRepository Db::table('chunks') ->where('archive_uid', $archiveUid) ->where('embedding_status', EmbeddingStatus::EMBEDDED) - ->whereIn('search_index_status', $statuses) + ->where(function ($query) use ($staleBefore): void { + $query + ->whereIn('search_index_status', [SearchIndexStatus::PENDING, SearchIndexStatus::FAILED_RETRYABLE]) + ->orWhere(function ($stale) use ($staleBefore): void { + $stale + ->whereIn('search_index_status', [SearchIndexStatus::QUEUED, SearchIndexStatus::INDEXING]) + ->where(function ($time) use ($staleBefore): void { + $time->whereNull('search_index_updated_at')->orWhere('search_index_updated_at', '<', $staleBefore); + }); + }); + }) ->update([ 'search_index_status' => SearchIndexStatus::QUEUED, 'search_index_error' => null, diff --git a/app/view/admin/dashboard.html b/app/view/admin/dashboard.html index 0840982..e1dec60 100644 --- a/app/view/admin/dashboard.html +++ b/app/view/admin/dashboard.html @@ -139,12 +139,6 @@ - - - - - -
@@ -525,8 +519,6 @@ async function loadArchiveDetail(archiveUid) { field(els.archiveForm, 'tags').value = (data.tags || []).join(', '); field(els.archiveForm, 'summary').value = data.summary || ''; field(els.archiveForm, 'metadata').value = JSON.stringify(data.metadata || {}, null, 2); - field(els.archiveForm, 'content').value = data.content || ''; - field(els.archiveForm, 'raw').value = data.raw || ''; } async function saveArchive(event) { @@ -553,9 +545,7 @@ async function saveArchive(event) { series: field(els.archiveForm, 'series').value, tags: field(els.archiveForm, 'tags').value, summary: field(els.archiveForm, 'summary').value, - metadata, - content: field(els.archiveForm, 'content').value, - raw: field(els.archiveForm, 'raw').value + metadata }; await api(`/api/admin/archives/${encodeURIComponent(state.archiveUid)}`, { diff --git a/config/queue.php b/config/queue.php index b38e4c1..9f08f0b 100644 --- a/config/queue.php +++ b/config/queue.php @@ -11,6 +11,9 @@ return [ 'base_delay_seconds' => (int) (getenv('AI_METADATA_QUEUE_BASE_DELAY_SECONDS') ?: 60), 'block_timeout' => (int) (getenv('AI_METADATA_QUEUE_BLOCK_TIMEOUT') ?: 5), 'idle_sleep_seconds' => (int) (getenv('AI_METADATA_QUEUE_IDLE_SLEEP_SECONDS') ?: 1), + 'dispatcher_interval_seconds' => (int) (getenv('AI_METADATA_QUEUE_DISPATCHER_INTERVAL_SECONDS') ?: 60), + 'dispatcher_batch_size' => (int) (getenv('AI_METADATA_QUEUE_DISPATCHER_BATCH_SIZE') ?: 20), + 'stale_after_seconds' => (int) (getenv('AI_METADATA_QUEUE_STALE_AFTER_SECONDS') ?: 900), ], 'tasks' => [ 'pending' => 'proofdb:tasks:pending', @@ -24,5 +27,6 @@ return [ 'idle_sleep_seconds' => (int) (getenv('PROOFDB_TASK_QUEUE_IDLE_SLEEP_SECONDS') ?: 1), 'dispatcher_interval_seconds' => (int) (getenv('PROOFDB_TASK_DISPATCHER_INTERVAL_SECONDS') ?: 15), 'dispatcher_batch_size' => (int) (getenv('PROOFDB_TASK_DISPATCHER_BATCH_SIZE') ?: 20), + 'stale_after_seconds' => (int) (getenv('PROOFDB_TASK_STALE_AFTER_SECONDS') ?: 900), ], ]; diff --git a/readme.md b/readme.md index c14b870..f330d63 100644 --- a/readme.md +++ b/readme.md @@ -262,7 +262,8 @@ GET /api/evidence/{chunk_uid} - [x] Archive detail API is implemented with external documentation: `GET /api/archives/{archive_uid}`. - [x] Archive chunk-list and archive evidence-list APIs are implemented with external documentation: `GET /api/archives/{archive_uid}/chunks` and `GET /api/archives/{archive_uid}/evidence`. - [x] Evidence smoke test passed for `01KQHVREB6XPYF604RVZAP9NNY_1_39003`, returning page label, citation string, and chunk quote. -- [x] Historical `archives.content` can now be repaired with `php scripts/backfill_archive_content.php`, using normalized `raw` when available and ordered chunk text as fallback. +- [x] `archives` rows should no longer persist redundant `content` or `raw` bodies. Archive body reconstruction should come from chunks or the original Markdown source outside PostgreSQL. +- [x] AI metadata, embedding, and OpenSearch indexing paths now have resumable recovery logic for stale queued/processing work instead of relying only on in-memory progress. - [x] OpenSearch repair/reindex maintenance script exists: `php scripts/reindex_opensearch.php`, with optional `--archive_uid=...` targeting. ### Partially Done diff --git a/scriptdoc/README.md b/scriptdoc/README.md index 7f77757..b7d1499 100644 --- a/scriptdoc/README.md +++ b/scriptdoc/README.md @@ -5,8 +5,8 @@ - [setup_database.md](/www/proofdb/scriptdoc/setup_database.md): PostgreSQL 结构初始化与升级 - [setup_admin_users.md](/www/proofdb/scriptdoc/setup_admin_users.md): 管理员用户表与首个管理员账号初始化 - [setup_opensearch.md](/www/proofdb/scriptdoc/setup_opensearch.md): OpenSearch 索引初始化 +- [reembed_chunks.md](/www/proofdb/scriptdoc/reembed_chunks.md): chunk 向量重新生成与断点续跑 - [reindex_opensearch.md](/www/proofdb/scriptdoc/reindex_opensearch.md): OpenSearch 重建索引与回灌 -- [backfill_archive_content.md](/www/proofdb/scriptdoc/backfill_archive_content.md): 历史 archive `content` 正文字段回填 ## 当前运维脚本 @@ -14,8 +14,8 @@ scripts/setup_database.php scripts/setup_admin_users.php scripts/setup_opensearch.php +scripts/reembed_chunks.php scripts/reindex_opensearch.php -scripts/backfill_archive_content.php ``` ## 推荐顺序 diff --git a/scriptdoc/backfill_archive_content.md b/scriptdoc/backfill_archive_content.md index 8cbc8f0..eb5d172 100644 --- a/scriptdoc/backfill_archive_content.md +++ b/scriptdoc/backfill_archive_content.md @@ -1,76 +1,13 @@ -# Archive Content 回填脚本 +# Archive Content 回填脚本(已废弃) -## 脚本路径 +`archives.content` 与 `archives.raw` 已经从 PostgreSQL 设计中移除。 -```text -scripts/backfill_archive_content.php -``` +因此: -## 脚本作用 +- `scripts/backfill_archive_content.php` 不再执行任何回填逻辑 +- 该脚本现在只会输出废弃提示并退出 -回填历史 `archives.content` 字段。 +当前约定是: -这个脚本主要用于修复旧数据中 `content` 为空的问题。它会按下面顺序尝试生成 `content`: - -1. 如果 archive 有 `raw`,就按当前导入规则把原始 Markdown 规范化成正文文本。 -2. 如果 `raw` 为空,就按 `chunk_index` 顺序拼接现有 chunk 的 `text` 作为回退正文。 - -脚本不会伪造 `raw`。如果历史数据里 `raw` 丢了,脚本只会尽力补 `content`。 - -## 运行前提 - -- 当前环境中的 PostgreSQL 配置可用。 -- 项目依赖已安装完成。 -- 从项目根目录执行命令。 - -## 运行命令 - -默认只处理 `content` 为空的 archive: - -```bash -php scripts/backfill_archive_content.php -``` - -只处理一个 archive: - -```bash -php scripts/backfill_archive_content.php --archive_uid=01KQHVREB6XPYF604RVZAP9NNY -``` - -强制重算,即使 `content` 已经有值: - -```bash -php scripts/backfill_archive_content.php --force -``` - -只预览,不写数据库: - -```bash -php scripts/backfill_archive_content.php --dry-run -``` - -## 成功输出示例 - -```text -[updated] 01KQHVREB6XPYF604RVZAP9NNY source=chunks content_length=6375 -Archive content backfill completed. -Archive filter: auto -Force mode: no -Dry run: no -Scanned: 1 -Updated: 1 -From raw: 0 -From chunks: 1 -Skipped: 0 -``` - -## 适用场景 - -- 修复旧版本导入留下的 `archives.content` 为空问题。 -- 导入逻辑更新后,希望重算归一化正文。 -- 为后续 AI / RAG / archive 级读取补齐正文字段。 - -## 重要限制 - -- 如果历史数据既没有 `raw`,也没有 chunks,脚本会跳过该 archive。 -- 用 chunks 回填时,得到的是拼接后的正文文本,不会恢复原始 Markdown 结构。 +- archive 正文从 `chunks` 重建 +- 原始 Markdown 由 PostgreSQL 外部的原始来源负责保存 diff --git a/scriptdoc/reembed_chunks.md b/scriptdoc/reembed_chunks.md new file mode 100644 index 0000000..3fcfe2e --- /dev/null +++ b/scriptdoc/reembed_chunks.md @@ -0,0 +1,113 @@ +# Chunk 重新 Embedding 脚本 + +## 脚本路径 + +```text +scripts/reembed_chunks.php +``` + +## 脚本作用 + +根据 PostgreSQL 中已有的 `chunks.text`,重新生成 embedding,并把结果写回 `chunks.embedding_*` 字段。 + +脚本会做这些事: + +1. 判断当前是“断点续跑”还是“全量重建”。 +2. 把需要恢复的 chunk 状态置回 `pending`。 +3. 逐个 archive 拉取待处理 chunk,并按 embedding request batch 调用现有 handler。 +4. 输出基于 chunk 数的进度条、batch 明细和最终统计结果。 + +## 断点续跑语义 + +默认模式下,脚本会尽量**保留已经完成的 `embedded` 进度**: + +- 只把 `queued / processing / failed_retryable` 的 chunk 重新置回 `pending` +- 已经是 `embedded` 的 chunk 不会重复生成 +- `failed_terminal` 也不会自动重试 + +这意味着如果中途断电、断网、进程被杀: + +- 已经成功写回 embedding 的 chunk 进度不会丢 +- 未完成的那部分在下次重跑脚本时会继续 + +如果你明确想从头开始全部重新 embedding,可以手动加 `--reset`。 + +## 运行前提 + +- PostgreSQL 可连接。 +- BigModel / Zhipu embedding API 配置可用。 +- 项目依赖已安装完成。 +- 从项目根目录执行命令。 + +## 运行命令 + +默认断点续跑: + +```bash +php scripts/reembed_chunks.php +``` + +强制从头全量重新 embedding: + +```bash +php scripts/reembed_chunks.php --reset +``` + +只处理一个 archive: + +```bash +php scripts/reembed_chunks.php --archive_uid=01KQHVREB6XPYF604RVZAP9NNY +``` + +## 成功输出示例 + +```text +Progress granularity: embedding request batches (up to 32 chunks each) +Re-embedding [================================] 100.0% (14/14) +Batch #1 archive=01KQHVREB6XPYF604RVZAP9NNY chunks=14 progress=14/14 +Chunk re-embedding completed. +Archive filter: (all chunks) +Mode: resume +Eligible chunks: 14 +Embedding batch size: 32 +Reset chunks: 2 +Processed archives: 1 +Processed batches: 1 +Embedded chunk rows now marked embedded: 14 +Terminal failures: 0 +Archives: 01KQHVREB6XPYF604RVZAP9NNY +Next step: refresh OpenSearch vectors with `php scripts/reindex_opensearch.php` +``` + +## 关于进度条为什么可能“0 到 100” + +这个脚本的进度条是按 **chunk 数** 计算的,但刷新粒度是 **一次 embedding request batch 完成后**。 + +所以如果: + +- 当前只有一个 archive 需要处理 +- 且该 archive 的待处理 chunk 数小于等于 `LLM_EMBEDDING_BATCH_SIZE` + +那么整个 archive 会在 1 次请求里完成,终端看起来就会像是从 `0` 直接跳到 `100`。这不是进度丢失,而是因为这次实际只跑了 `1` 个 batch。 + +## reset 行为 + +加 `--reset` 后,会把目标范围内的 chunk: + +- `embedding_status` 重置为 `pending` +- 清空 `embedding_ref` +- 清空 `embedding_model` +- 清空 `embedding_error` +- 同时把 `search_index_status` 置回 `pending` + +所以 `--reset` 的语义是:**把 embedding 和后续索引链路都当成需要重建**。 + +## 重要限制 + +这个脚本不会直接重建 OpenSearch。 + +重新 embedding 完成后,如果你希望 OpenSearch 中也使用新的向量,请继续执行: + +```bash +php scripts/reindex_opensearch.php +``` diff --git a/scriptdoc/reindex_opensearch.md b/scriptdoc/reindex_opensearch.md index d515257..49206c7 100644 --- a/scriptdoc/reindex_opensearch.md +++ b/scriptdoc/reindex_opensearch.md @@ -13,10 +13,29 @@ scripts/reindex_opensearch.php 脚本会做这些事: 1. 确保 OpenSearch 索引存在。 -2. 把已向量化 chunk 的 `search_index_status` 重置为待索引。 -3. 按 archive 批量重新投递索引任务。 -4. 调用现有 OpenSearch indexing handler 批量写入 chunk 文档。 -5. 输出重建统计结果。 +2. 判断当前是“断点续跑”还是“全量重建”。 +3. 按 archive 选取待处理数据,并按 OpenSearch bulk batch 调用现有 indexing handler。 +4. 批量写入 chunk 文档。 +5. 输出基于 chunk 数的进度条、batch 明细和最终统计结果。 + +## 断点续跑语义 + +默认模式下,脚本会尽量**保留已经完成的 `indexed` 进度**: + +- 如果 OpenSearch 索引还在,就只把 `queued / indexing / failed_retryable` 的 chunk 重新置回 `pending`,然后继续处理。 +- 如果 OpenSearch 索引已经不存在,脚本会自动切换到全量重建模式,把所有已向量化 chunk 重新置回 `pending`。 + +这意味着如果中途断电、断网、进程被杀: + +- 已经标记为 `indexed` 的 chunk 进度不会丢。 +- 未完成的那部分在下次重跑脚本时会继续。 + +如果你明确想从头开始全部重建,可以手动加 `--reset`。 + +更具体地说: + +- 如果 PostgreSQL 里该 chunk 已经是 `indexed`,默认 `resume` 不会重写它,即使 OpenSearch 里同 `_id` 文档已经存在。 +- `--reset` 会把目标范围内所有已向量化 chunk 的 `search_index_status` 统一重置为 `pending`,然后重新 upsert 到 OpenSearch。 ## 运行前提 @@ -40,6 +59,12 @@ OPENSEARCH_SSL_VERIFY=false php scripts/reindex_opensearch.php php scripts/reindex_opensearch.php ``` +强制从头全量重建: + +```bash +php scripts/reindex_opensearch.php --reset +``` + 只重建一个 archive: ```bash @@ -49,15 +74,33 @@ php scripts/reindex_opensearch.php --archive_uid=01KQHVREB6XPYF604RVZAP9NNY ## 成功输出示例 ```text +Progress granularity: OpenSearch bulk batches (up to 500 chunks each) +Reindexing [================================] 100.0% (14/14) +Batch #1 archive=01KQHVREB6XPYF604RVZAP9NNY chunks=14 progress=14/14 OpenSearch reindex completed. Index: proofdb_chunks Archive filter: (all embedded archives) -Reset chunks: 14 +Mode: resume +Eligible embedded chunks: 14 +OpenSearch bulk size: 500 +Reset chunks: 3 Indexed archives: 1 +Processed batches: 1 Indexed chunk rows now marked indexed: 14 Archives: 01KQHVREB6XPYF604RVZAP9NNY ``` +## 关于进度条为什么可能“0 到 100” + +这个脚本的进度条是按 **chunk 数** 计算的,但刷新粒度是 **一次 OpenSearch bulk batch 完成后**。 + +所以如果: + +- 当前只有一个 archive 需要处理 +- 且该 archive 的待处理 chunk 数小于等于 `opensearch.bulk.chunk_size` + +那么终端上就会像是从 `0` 直接跳到 `100`。这不是脚本没工作,而是因为这次实际只跑了 `1` 个 bulk batch。 + ## 适用场景 - `proofdb_chunks` 被误删后恢复。 diff --git a/scriptdoc/setup_database.md b/scriptdoc/setup_database.md index d5c5ab2..6da4516 100644 --- a/scriptdoc/setup_database.md +++ b/scriptdoc/setup_database.md @@ -14,6 +14,7 @@ scripts/setup_database.php - `archives` 表存在。 - `chunks` 表存在。 +- `archives.content` 与 `archives.raw` 冗余列会被移除。 - 档案与 chunk 的常用索引存在。 - embedding / search index 相关状态字段存在。 - `updated_time` 自动更新时间触发器存在。 diff --git a/scripts/backfill_archive_content.php b/scripts/backfill_archive_content.php index afa5219..98347fe 100644 --- a/scripts/backfill_archive_content.php +++ b/scripts/backfill_archive_content.php @@ -1,117 +1,5 @@ #!/usr/bin/env php orderBy('id'); -if ($archiveUid !== null && trim($archiveUid) !== '') { - $query->where('archive_uid', trim($archiveUid)); -} - -if (!$force) { - $query->where(function ($builder) { - $builder->whereNull('content')->orWhere('content', ''); - }); -} - -$archives = $query->get(['archive_uid', 'title', 'content', 'raw'])->all(); -$normalizer = new ArticleImportService(); - -$scanned = 0; -$updated = 0; -$fromRaw = 0; -$fromChunks = 0; -$skipped = 0; - -foreach ($archives as $archive) { - $scanned++; - $archiveUidValue = (string) $archive->archive_uid; - $raw = is_string($archive->raw ?? null) ? $archive->raw : null; - $content = null; - $source = 'none'; - - if (is_string($raw) && trim($raw) !== '') { - $content = $normalizer->normalizeArchiveContentString($raw); - $source = 'raw'; - } else { - $chunks = Db::table('chunks') - ->where('archive_uid', $archiveUidValue) - ->orderBy('chunk_index') - ->pluck('text') - ->all(); - - $chunks = array_values(array_filter(array_map( - static fn ($value): string => trim((string) $value), - $chunks - ), static fn (string $value): bool => $value !== '')); - - if ($chunks !== []) { - $content = trim(implode("\n\n", $chunks)); - $source = 'chunks'; - } - } - - if ($content === null || $content === '') { - $skipped++; - echo "[skip] {$archiveUidValue} no usable raw/chunks" . PHP_EOL; - continue; - } - - if ($dryRun) { - echo "[dry-run] {$archiveUidValue} source={$source} content_length=" . mb_strlen($content) . PHP_EOL; - if ($source === 'raw') { - $fromRaw++; - } else { - $fromChunks++; - } - continue; - } - - Db::table('archives') - ->where('archive_uid', $archiveUidValue) - ->update(['content' => $content]); - - $updated++; - if ($source === 'raw') { - $fromRaw++; - } else { - $fromChunks++; - } - - echo "[updated] {$archiveUidValue} source={$source} content_length=" . mb_strlen($content) . PHP_EOL; -} - -echo 'Archive content backfill completed.' . PHP_EOL; -echo 'Archive filter: ' . ($archiveUid ?: 'auto') . PHP_EOL; -echo 'Force mode: ' . ($force ? 'yes' : 'no') . PHP_EOL; -echo 'Dry run: ' . ($dryRun ? 'yes' : 'no') . PHP_EOL; -echo 'Scanned: ' . $scanned . PHP_EOL; -echo 'Updated: ' . $updated . PHP_EOL; -echo 'From raw: ' . $fromRaw . PHP_EOL; -echo 'From chunks: ' . $fromChunks . PHP_EOL; -echo 'Skipped: ' . $skipped . PHP_EOL; +fwrite(STDERR, "Deprecated: archives.content and archives.raw have been removed from PostgreSQL. This script is no longer supported.\n"); +exit(1); diff --git a/scripts/reembed_chunks.php b/scripts/reembed_chunks.php new file mode 100644 index 0000000..a95bbeb --- /dev/null +++ b/scripts/reembed_chunks.php @@ -0,0 +1,135 @@ +#!/usr/bin/env php +countChunks($archiveUid); + if ($totalChunks === 0) { + echo 'Chunk re-embedding completed.' . PHP_EOL; + echo 'Archive filter: ' . ($archiveUid ?: '(all chunks)') . PHP_EOL; + echo 'Mode: nothing-to-do' . PHP_EOL; + echo 'Eligible chunks: 0' . PHP_EOL; + exit(0); + } + + $mode = $forceReset ? 'reset' : 'resume'; + $resetCount = $mode === 'reset' + ? $repository->resetAllChunksToPending($archiveUid) + : $repository->resetRecoverableChunksToPending($archiveUid); + + $batchCount = 0; + $processedArchives = []; + $progress = completedCount($repository, $archiveUid); + echo 'Progress granularity: embedding request batches (up to ' . $batchSize . ' chunks each)' . PHP_EOL; + renderProgress($progress, $totalChunks, 'Re-embedding'); + + while (true) { + $archiveUids = $repository->queuePendingArchiveTasks(100); + if ($archiveUid !== null && trim($archiveUid) !== '') { + $archiveUids = array_values(array_filter($archiveUids, static fn (string $uid): bool => $uid === trim($archiveUid))); + } + + if ($archiveUids === []) { + break; + } + + foreach ($archiveUids as $uid) { + $processedChunkCount = $handler->handle([ + 'task_type' => 'embedding', + 'target_type' => 'archive', + 'target_uid' => $uid, + 'attempt' => 1, + ]); + if ($processedChunkCount <= 0) { + continue; + } + + $batchCount++; + $processedArchives[] = $uid; + $progress = completedCount($repository, $archiveUid); + renderProgress($progress, $totalChunks, 'Re-embedding'); + fwrite(STDOUT, PHP_EOL . sprintf( + 'Batch #%d archive=%s chunks=%d progress=%d/%d%s', + $batchCount, + $uid, + $processedChunkCount, + $progress, + $totalChunks, + PHP_EOL + )); + } + } + + $embeddedChunks = $repository->countChunksByStatuses([EmbeddingStatus::EMBEDDED], $archiveUid); + $terminalFailures = $repository->countChunksByStatuses([EmbeddingStatus::FAILED_TERMINAL], $archiveUid); + renderProgress($embeddedChunks + $terminalFailures, $totalChunks, 'Re-embedding', true); + + echo 'Chunk re-embedding completed.' . PHP_EOL; + echo 'Archive filter: ' . ($archiveUid ?: '(all chunks)') . PHP_EOL; + echo 'Mode: ' . $mode . ($forceReset ? ' (--reset)' : '') . PHP_EOL; + echo 'Eligible chunks: ' . $totalChunks . PHP_EOL; + echo 'Embedding batch size: ' . $batchSize . PHP_EOL; + echo 'Reset chunks: ' . $resetCount . PHP_EOL; + echo 'Processed archives: ' . count(array_unique($processedArchives)) . PHP_EOL; + echo 'Processed batches: ' . $batchCount . PHP_EOL; + echo 'Embedded chunk rows now marked embedded: ' . $embeddedChunks . PHP_EOL; + echo 'Terminal failures: ' . $terminalFailures . PHP_EOL; + if ($processedArchives !== []) { + echo 'Archives: ' . implode(', ', $processedArchives) . PHP_EOL; + } + echo 'Next step: refresh OpenSearch vectors with `php scripts/reindex_opensearch.php' + . ($archiveUid ? ' --archive_uid=' . $archiveUid : '') + . ($forceReset ? ' --reset' : '') + . '`' . PHP_EOL; +} catch (Throwable $exception) { + fwrite(STDERR, PHP_EOL . $exception::class . ': ' . $exception->getMessage() . PHP_EOL); + exit(1); +} + +function completedCount(ChunkEmbeddingRepository $repository, ?string $archiveUid): int +{ + return $repository->countChunksByStatuses([ + EmbeddingStatus::EMBEDDED, + EmbeddingStatus::FAILED_TERMINAL, + ], $archiveUid); +} + +function renderProgress(int $done, int $total, string $label, bool $final = false): void +{ + $total = max(1, $total); + $done = max(0, min($done, $total)); + $width = 32; + $filled = (int) floor(($done / $total) * $width); + $bar = str_repeat('=', $filled) . str_repeat(' ', max(0, $width - $filled)); + $percent = str_pad(number_format(($done / $total) * 100, 1), 5, ' ', STR_PAD_LEFT); + $line = sprintf("\r%s [%s] %s%% (%d/%d)", $label, $bar, $percent, $done, $total); + fwrite(STDOUT, $line); + + if ($final || $done >= $total) { + fwrite(STDOUT, PHP_EOL); + } +} diff --git a/scripts/reindex_opensearch.php b/scripts/reindex_opensearch.php index c285a03..86eb783 100644 --- a/scripts/reindex_opensearch.php +++ b/scripts/reindex_opensearch.php @@ -3,31 +3,59 @@ use app\service\Search\ChunkSearchIndexHandler; use app\service\Search\ChunkSearchIndexRepository; +use app\service\Search\OpenSearchClientFactory; use app\service\Search\OpenSearchChunkIndex; -use support\Db; require __DIR__ . '/../vendor/autoload.php'; require __DIR__ . '/../support/bootstrap.php'; -require __DIR__ . '/../vendor/webman/database/src/support/Db.php'; $archiveUid = null; +$forceReset = false; foreach (array_slice($argv, 1) as $argument) { if (str_starts_with($argument, '--archive_uid=')) { $archiveUid = substr($argument, strlen('--archive_uid=')); + continue; + } + + if ($argument === '--reset') { + $forceReset = true; } } $repository = new ChunkSearchIndexRepository(); $handler = new ChunkSearchIndexHandler(); $index = new OpenSearchChunkIndex(); +$clientFactory = new OpenSearchClientFactory(); +$bulkSize = max(1, (int) config('opensearch.bulk.chunk_size', 500)); try { + $client = $clientFactory->make(); + $indexName = config('opensearch.indices.chunks', 'proofdb_chunks'); + $indexExists = (bool) $client->indices()->exists(['index' => $indexName]); + $index->ensureExists(); - $resetCount = $repository->resetEmbeddedChunksToPending($archiveUid); - $archiveCount = 0; + $totalChunks = $repository->countEmbeddedChunks($archiveUid); + if ($totalChunks === 0) { + echo 'OpenSearch reindex completed.' . PHP_EOL; + echo 'Index: ' . $indexName . PHP_EOL; + echo 'Archive filter: ' . ($archiveUid ?: '(all embedded archives)') . PHP_EOL; + echo 'Mode: nothing-to-do' . PHP_EOL; + echo 'Eligible embedded chunks: 0' . PHP_EOL; + exit(0); + } + + $mode = $forceReset || !$indexExists ? 'reset' : 'resume'; + $resetCount = $mode === 'reset' + ? $repository->resetEmbeddedChunksToPending($archiveUid) + : $repository->resetRecoverableChunksToPending($archiveUid); + + $batchCount = 0; $indexedArchives = []; - $indexedChunks = 0; + $progress = $repository->countIndexedChunks($archiveUid); + + echo 'Progress granularity: OpenSearch bulk batches (up to ' . $bulkSize . ' chunks each)' . PHP_EOL; + renderProgress($progress, $totalChunks, 'Reindexing'); while (true) { $archiveUids = $repository->queuePendingArchiveTasks(100); @@ -36,28 +64,44 @@ try { } foreach ($archiveUids as $uid) { - $handler->handle([ + $processedChunkCount = $handler->handle([ 'task_type' => 'search_index', 'target_type' => 'archive', 'target_uid' => $uid, 'attempt' => 1, ]); - $archiveCount++; + if ($processedChunkCount <= 0) { + continue; + } + + $batchCount++; $indexedArchives[] = $uid; + $progress = $repository->countIndexedChunks($archiveUid); + renderProgress($progress, $totalChunks, 'Reindexing'); + fwrite(STDOUT, PHP_EOL . sprintf( + 'Batch #%d archive=%s chunks=%d progress=%d/%d%s', + $batchCount, + $uid, + $processedChunkCount, + $progress, + $totalChunks, + PHP_EOL + )); } } - $indexedChunksQuery = Db::table('chunks')->where('search_index_status', 3); - if ($archiveUid !== null && trim($archiveUid) !== '') { - $indexedChunksQuery->where('archive_uid', trim($archiveUid)); - } - $indexedChunks = (int) $indexedChunksQuery->count(); + $indexedChunks = $repository->countIndexedChunks($archiveUid); + renderProgress($indexedChunks, $totalChunks, 'Reindexing', true); echo 'OpenSearch reindex completed.' . PHP_EOL; - echo 'Index: ' . config('opensearch.indices.chunks', 'proofdb_chunks') . PHP_EOL; + echo 'Index: ' . $indexName . PHP_EOL; echo 'Archive filter: ' . ($archiveUid ?: '(all embedded archives)') . PHP_EOL; + echo 'Mode: ' . $mode . ($forceReset ? ' (--reset)' : (!$indexExists ? ' (index was missing)' : '')) . PHP_EOL; + echo 'Eligible embedded chunks: ' . $totalChunks . PHP_EOL; + echo 'OpenSearch bulk size: ' . $bulkSize . PHP_EOL; echo 'Reset chunks: ' . $resetCount . PHP_EOL; - echo 'Indexed archives: ' . $archiveCount . PHP_EOL; + echo 'Indexed archives: ' . count(array_unique($indexedArchives)) . PHP_EOL; + echo 'Processed batches: ' . $batchCount . PHP_EOL; echo 'Indexed chunk rows now marked indexed: ' . $indexedChunks . PHP_EOL; if ($indexedArchives !== []) { echo 'Archives: ' . implode(', ', $indexedArchives) . PHP_EOL; @@ -66,3 +110,19 @@ try { fwrite(STDERR, $exception::class . ': ' . $exception->getMessage() . PHP_EOL); exit(1); } + +function renderProgress(int $done, int $total, string $label, bool $final = false): void +{ + $total = max(1, $total); + $done = max(0, min($done, $total)); + $width = 32; + $filled = (int) floor(($done / $total) * $width); + $bar = str_repeat('=', $filled) . str_repeat(' ', max(0, $width - $filled)); + $percent = str_pad(number_format(($done / $total) * 100, 1), 5, ' ', STR_PAD_LEFT); + $line = sprintf("\r%s [%s] %s%% (%d/%d)", $label, $bar, $percent, $done, $total); + fwrite(STDOUT, $line); + + if ($final || $done >= $total) { + fwrite(STDOUT, PHP_EOL); + } +} diff --git a/scripts/setup_database.php b/scripts/setup_database.php index 91ec1d6..3d6b235 100644 --- a/scripts/setup_database.php +++ b/scripts/setup_database.php @@ -20,14 +20,14 @@ CREATE TABLE IF NOT EXISTS archives ( series TEXT, tags JSONB NOT NULL DEFAULT '[]'::jsonb, metadata JSONB NOT NULL DEFAULT '{}'::jsonb, - content TEXT, - raw TEXT, chunks JSONB NOT NULL DEFAULT '[]'::jsonb, created_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL, 'ALTER TABLE archives ADD COLUMN IF NOT EXISTS summary TEXT', + 'ALTER TABLE archives DROP COLUMN IF EXISTS content', + 'ALTER TABLE archives DROP COLUMN IF EXISTS raw', <<