queue = new AiMetadataQueue(); $this->archives = new ArchiveRepository(); $this->enrichment = new ArchiveMetadataEnrichmentService(); } public function onWorkerStart(): void { Timer::add(10, fn (): int => $this->queue->releaseDueDelayed()); Timer::add(max(30, (int) config('queue.ai_metadata.dispatcher_interval_seconds', 60)), function (): void { try { if (!$this->enrichment->isEnabled()) { return; } foreach ($this->archives->queuePendingAiMetadataArchives((int) config('queue.ai_metadata.dispatcher_batch_size', 20)) as $archiveUid) { $this->queue->push($archiveUid); } } catch (Throwable) { } }); while (true) { $this->queue->releaseDueDelayed(); $archiveUid = $this->queue->pop($this->queue->blockTimeout()); if ($archiveUid === null) { sleep($this->queue->idleSleepSeconds()); continue; } $this->handle($archiveUid); } } private function handle(string $archiveUid): void { try { $archive = $this->archives->findArchive($archiveUid); if ($archive === null) { $this->queue->clearRetry($archiveUid); return; } if (!$this->archives->archiveNeedsMetadata($archive)) { $this->archives->markAiMetadataStatus($archiveUid, 'completed', null, ['attempted' => false]); $this->queue->clearRetry($archiveUid); return; } if (!$this->enrichment->isEnabled()) { $this->archives->markAiMetadataStatus($archiveUid, 'disabled', 'AI metadata enrichment is not enabled.'); $this->queue->clearRetry($archiveUid); return; } $this->archives->markAiMetadataStatus($archiveUid, 'processing'); $payload = $archive; $payload['content'] = $this->archives->findChunksText($archiveUid); $enriched = $this->enrichment->enrich($payload); $aiMeta = $enriched['metadata']['ai_enrichment'] ?? []; if (($aiMeta['attempted'] ?? false) !== true || ($aiMeta['error'] ?? null)) { $retryable = ($aiMeta['enabled'] ?? true) === true && ($aiMeta['error'] ?? null) !== null; $status = $retryable ? 'failed_retryable' : 'failed_terminal'; $error = $aiMeta['error'] ?? 'AI metadata enrichment did not complete.'; $this->archives->markAiMetadataStatus($archiveUid, $status, $error, $aiMeta); if ($retryable) { $this->queue->retryLater($archiveUid, $error); } else { $this->queue->clearRetry($archiveUid); } return; } $fields = []; foreach (['title', 'year', 'author', 'tags', 'summary'] as $field) { if (array_key_exists($field, $enriched)) { $fields[$field] = $enriched[$field]; } } $this->archives->updateMetadata($archiveUid, $fields, $aiMeta); $this->archives->markAiMetadataStatus($archiveUid, 'completed', null, $aiMeta); $this->queue->clearRetry($archiveUid); } catch (Throwable $exception) { $status = $this->archives->isAiMetadataRetryable($exception) ? 'failed_retryable' : 'failed_terminal'; $this->archives->markAiMetadataStatus($archiveUid, $status, $exception->getMessage()); if ($status === 'failed_retryable') { $this->queue->retryLater($archiveUid, $exception->getMessage()); } else { $this->queue->clearRetry($archiveUid); } } } }