diff --git a/scripts/iib/topic_cluster.py b/scripts/iib/topic_cluster.py index 7a09ca0..cfb551b 100644 --- a/scripts/iib/topic_cluster.py +++ b/scripts/iib/topic_cluster.py @@ -25,6 +25,10 @@ _TOPIC_CLUSTER_JOBS_MAX = 16 _EMBEDDING_MAX_TOKENS_SOFT = 7800 +# Some providers enforce a max token budget per *request* (sum of all inputs), not only per input. +# Keep a conservative per-request budget to avoid "context length exceeded" across batched inputs. +_EMBEDDING_REQUEST_MAX_TOKENS_SOFT = 7600 +_EMBEDDING_DEBUG = os.getenv("IIB_EMBEDDING_DEBUG", "0").strip().lower() in ["1", "true", "yes", "on"] def _estimate_tokens_soft(s: str) -> int: @@ -75,6 +79,35 @@ def _truncate_for_embedding_tokens(s: str, max_tokens: int = _EMBEDDING_MAX_TOKE return ("".join(out)).strip() +def _batched_by_token_budget( + items: List[Dict], + *, + max_items: int, + max_tokens_sum: int, +) -> List[List[Dict]]: + """ + Split a list of items (each contains 'text') into batches constrained by: + - max_items + - sum(estimated_tokens(text)) <= max_tokens_sum + """ + batches: List[List[Dict]] = [] + cur: List[Dict] = [] + cur_tokens = 0 + for it in items: + txt = str(it.get("text") or "") + t = _estimate_tokens_soft(txt) + # ensure progress even if one item is huge (should already be truncated per-input) + if cur and (len(cur) >= max_items or (cur_tokens + t) > max_tokens_sum): + batches.append(cur) + cur = [] + cur_tokens = 0 + cur.append(it) + cur_tokens += t + if cur: + batches.append(cur) + return batches + + def _job_now() -> float: return time.time() @@ -314,7 +347,10 @@ def _call_embeddings_sync( except requests.RequestException as e: raise HTTPException(status_code=502, detail=f"Embedding API request failed: {e}") if resp.status_code != 200: - raise HTTPException(status_code=resp.status_code, detail=resp.text) + # Do not leak upstream 401 to frontend (treat as bad request/config to avoid confusing auth state). + status = 400 if resp.status_code == 401 else resp.status_code + body = (resp.text or "")[:600] + raise HTTPException(status_code=status, detail=body) data = resp.json() items = data.get("data") or [] items.sort(key=lambda x: x.get("index", 0)) @@ -430,7 +466,8 @@ def _call_chat_title_sync( if resp.status_code != 200: # keep response body for debugging (truncated) body = (resp.text or "")[:600] - raise HTTPException(status_code=resp.status_code, detail=body) + status = 400 if resp.status_code == 401 else resp.status_code + raise HTTPException(status_code=status, detail=body) try: data = resp.json() except Exception as e: @@ -535,6 +572,97 @@ def mount_topic_cluster_routes( Mount embedding + topic clustering endpoints (MVP: manual, iib_output only). """ + async def _run_cluster_job(job_id: str, req) -> None: + try: + _job_upsert( + job_id, + { + "status": "running", + "stage": "init", + "created_at": _job_now(), + "req": req.model_dump() if hasattr(req, "model_dump") else req.dict(), + }, + ) + + folders = _extract_and_validate_folders(req) + _job_upsert(job_id, {"folders": folders, "stage": "embedding"}) + + # Aggregate per-folder embedding progress into totals + per_folder: Dict[str, Dict] = {} + + def _embed_cb(p: Dict) -> None: + if not isinstance(p, dict): + return + if p.get("stage") != "embedding": + return + f = str(p.get("folder") or "") + if f: + per_folder[f] = dict(p) + scanned = sum(int(x.get("scanned") or 0) for x in per_folder.values()) + to_embed = sum(int(x.get("to_embed") or 0) for x in per_folder.values()) + embedded_done = sum(int(x.get("embedded_done") or 0) for x in per_folder.values()) + updated = sum(int(x.get("updated") or 0) for x in per_folder.values()) + skipped = sum(int(x.get("skipped") or 0) for x in per_folder.values()) + _job_upsert( + job_id, + { + "stage": "embedding", + "progress": { + "scanned": scanned, + "to_embed": to_embed, + "embedded_done": embedded_done, + "updated": updated, + "skipped": skipped, + "folder": f, + "batch_n": int((p or {}).get("batch_n") or 0), + }, + }, + ) + + # Ensure embeddings exist (incremental per folder) + model = req.model or embedding_model + batch_size = max(1, min(int(req.batch_size or 64), 256)) + max_chars = max(256, min(int(req.max_chars or 4000), 8000)) + force = bool(req.force_embed) + for f in folders: + await _build_embeddings_one_folder( + folder=f, + model=model, + force=force, + batch_size=batch_size, + max_chars=max_chars, + progress_cb=_embed_cb, + ) + + # Clustering + titling progress + def _cluster_cb(p: Dict) -> None: + if not isinstance(p, dict): + return + st = str(p.get("stage") or "") + if st: + patch = {"stage": st, "status": "running"} + # keep small progress fields only + prog = {} + for k in [ + "items_total", + "items_done", + "clusters_total", + "clusters_done", + "folder", + ]: + if k in p: + prog[k] = p.get(k) + if prog: + patch["progress"] = {**(_job_get(job_id) or {}).get("progress", {}), **prog} + _job_upsert(job_id, patch) + + res = await _cluster_after_embeddings(req, folders, progress_cb=_cluster_cb) + _job_upsert(job_id, {"status": "done", "stage": "done", "result": res}) + except HTTPException as e: + _job_upsert(job_id, {"status": "error", "stage": "error", "error": str(e.detail)}) + except Exception as e: + _job_upsert(job_id, {"status": "error", "stage": "error", "error": f"{type(e).__name__}: {e}"}) + async def _build_embeddings_one_folder( *, folder: str, @@ -633,9 +761,19 @@ def mount_topic_cluster_routes( updated = 0 embedded_done = 0 - for i in range(0, len(to_embed), batch_size): - batch = to_embed[i : i + batch_size] + batches = _batched_by_token_budget( + to_embed, + max_items=batch_size, + max_tokens_sum=_EMBEDDING_REQUEST_MAX_TOKENS_SOFT, + ) + for bi, batch in enumerate(batches): inputs = [x["text"] for x in batch] + if _EMBEDDING_DEBUG: + token_sum = sum(_estimate_tokens_soft(s) for s in inputs) + token_max = max((_estimate_tokens_soft(s) for s in inputs), default=0) + print( + f"[iib][embed] folder={folder} batch={bi+1}/{len(batches)} n={len(inputs)} token_sum~={token_sum} token_max~={token_max}" + ) vectors = await _call_embeddings( inputs=inputs, model=model, @@ -666,6 +804,7 @@ def mount_topic_cluster_routes( "embedded_done": embedded_done, "updated": updated, "skipped": skipped, + "batch_n": len(inputs), } ) # yield between batches @@ -722,6 +861,30 @@ def mount_topic_cluster_routes( # Output language for titles/keywords (from frontend globalStore.lang) lang: Optional[str] = None + @app.post( + f"{db_api_base}/cluster_iib_output_job_start", + dependencies=[Depends(verify_secret), Depends(write_permission_required)], + ) + async def cluster_iib_output_job_start(req: ClusterIibOutputReq): + """ + Start a background job for embedding + clustering + LLM titling. + Returns job_id immediately; frontend should poll job_status to show progress. + """ + job_id = uuid.uuid4().hex + _job_upsert(job_id, {"status": "queued", "stage": "queued", "created_at": _job_now()}) + asyncio.create_task(_run_cluster_job(job_id, req)) + return {"job_id": job_id} + + @app.get( + f"{db_api_base}/cluster_iib_output_job_status", + dependencies=[Depends(verify_secret), Depends(write_permission_required)], + ) + async def cluster_iib_output_job_status(job_id: str): + j = _job_get(job_id) + if not j: + raise HTTPException(status_code=404, detail="job not found") + return j + def _extract_and_validate_folders(req: ClusterIibOutputReq) -> List[str]: folders: List[str] = [] if req.folder_paths: diff --git a/vue/src/api/db.ts b/vue/src/api/db.ts index 76d31f1..b36aca5 100644 --- a/vue/src/api/db.ts +++ b/vue/src/api/db.ts @@ -222,6 +222,45 @@ export const clusterIibOutput = async (req: ClusterIibOutputReq) => { return resp.data as ClusterIibOutputResp } +// ===== Async clustering job (progress polling) ===== +export interface ClusterIibOutputJobStartResp { + job_id: string +} + +export interface ClusterIibOutputJobStatusResp { + job_id: string + status: 'queued' | 'running' | 'done' | 'error' + stage?: string + folders?: string[] + progress?: { + // embedding totals + scanned?: number + to_embed?: number + embedded_done?: number + updated?: number + skipped?: number + folder?: string + // clustering + items_total?: number + items_done?: number + // titling + clusters_total?: number + clusters_done?: number + } + error?: string + result?: ClusterIibOutputResp +} + +export const startClusterIibOutputJob = async (req: ClusterIibOutputReq) => { + const resp = await axiosInst.value.post('/db/cluster_iib_output_job_start', req) + return resp.data as ClusterIibOutputJobStartResp +} + +export const getClusterIibOutputJobStatus = async (job_id: string) => { + const resp = await axiosInst.value.get('/db/cluster_iib_output_job_status', { params: { job_id } }) + return resp.data as ClusterIibOutputJobStatusResp +} + // ===== Natural language prompt query (RAG-like retrieval) ===== export interface PromptSearchReq { query: string diff --git a/vue/src/i18n/de.ts b/vue/src/i18n/de.ts index dc84ef0..fd8e995 100644 --- a/vue/src/i18n/de.ts +++ b/vue/src/i18n/de.ts @@ -46,6 +46,17 @@ export const de: Partial = { topicSearchGuideStep3: 'Geben Sie einen Satz ein, um zu suchen; ähnliche Bilder werden abgerufen und die Ergebnisse geöffnet', topicSearchGuideEmptyReasonNoScope: 'Leer, weil: kein Bereich ausgewählt (standardmäßig deaktiviert). Klicken Sie auf „Bereich“, um Ordner zu wählen.', topicSearchGuideEmptyReasonNoTopics: 'Leer, weil: für diesen Bereich noch keine Themen erzeugt wurden (Aktualisieren oder Min. Cluster/Schwelle senken).', + topicSearchJobFailed: 'Job fehlgeschlagen', + topicSearchJobStage: 'Phase', + topicSearchJobQueued: 'Job in Warteschlange…', + topicSearchJobStageEmbedding: 'Vektorisierung…', + topicSearchJobStageClustering: 'Clustering…', + topicSearchJobStageTitling: 'Titel werden erzeugt…', + topicSearchJobStageDone: 'Fertig', + topicSearchJobStageError: 'Fehler', + topicSearchJobEmbeddingDesc: '{0}/{1} vektorisiert (gescannt {2}); aktuell: {3}', + topicSearchJobClusteringDesc: 'Clustering {0}/{1}', + topicSearchJobTitlingDesc: 'Titel {0}/{1}', 'auto.refreshed': 'Automatische Aktualisierung erfolgreich durchgeführt!', copied: 'In die Zwischenablage kopiert!', 'index.expired': 'Index abgelaufen, automatische Aktualisierung wird durchgeführt', diff --git a/vue/src/i18n/en.ts b/vue/src/i18n/en.ts index 36bba67..91aa3c5 100644 --- a/vue/src/i18n/en.ts +++ b/vue/src/i18n/en.ts @@ -46,6 +46,17 @@ export const en: IIBI18nMap = { topicSearchGuideStep3: 'Type a sentence to search; it will retrieve similar images and open the result page', topicSearchGuideEmptyReasonNoScope: 'Empty because: no scope selected (disabled by default). Click “Scope” to choose folders.', topicSearchGuideEmptyReasonNoTopics: 'Empty because: no topics generated yet for this scope (try Refresh or lower Min cluster/Threshold).', + topicSearchJobFailed: 'Job failed', + topicSearchJobStage: 'Stage', + topicSearchJobQueued: 'Job queued…', + topicSearchJobStageEmbedding: 'Embedding…', + topicSearchJobStageClustering: 'Clustering…', + topicSearchJobStageTitling: 'Generating titles…', + topicSearchJobStageDone: 'Done', + topicSearchJobStageError: 'Error', + topicSearchJobEmbeddingDesc: 'Embedded {0}/{1} (scanned {2}); current: {3}', + topicSearchJobClusteringDesc: 'Clustering {0}/{1}', + topicSearchJobTitlingDesc: 'Titling {0}/{1}', success: 'Success', setCurrFrameAsVideoPoster: 'Set Current Frame as Video Cover', sync: 'Sync', diff --git a/vue/src/i18n/zh-hans.ts b/vue/src/i18n/zh-hans.ts index 460c2b7..35a0240 100644 --- a/vue/src/i18n/zh-hans.ts +++ b/vue/src/i18n/zh-hans.ts @@ -44,6 +44,17 @@ export const zhHans = { topicSearchGuideStep3: '输入一句话搜索,会召回相似图片并打开结果页', topicSearchGuideEmptyReasonNoScope: '当前为空:未选择范围(已默认关闭),请先点“范围”选择文件夹', topicSearchGuideEmptyReasonNoTopics: '当前为空:该范围内还未生成主题(可点刷新,或调低最小组/阈值)', + topicSearchJobFailed: '任务失败', + topicSearchJobStage: '阶段', + topicSearchJobQueued: '已提交任务,准备开始…', + topicSearchJobStageEmbedding: '向量化中(Embedding)', + topicSearchJobStageClustering: '归类中(Clustering)', + topicSearchJobStageTitling: '生成标题中(LLM)', + topicSearchJobStageDone: '完成', + topicSearchJobStageError: '失败', + topicSearchJobEmbeddingDesc: '已向量化 {0}/{1}(扫描 {2});当前:{3}', + topicSearchJobClusteringDesc: '正在归类 {0}/{1}', + topicSearchJobTitlingDesc: '正在生成标题 {0}/{1}', success: '成功', setCurrFrameAsVideoPoster: '设置当前帧为视频封面', sync: '同步', diff --git a/vue/src/i18n/zh-hant.ts b/vue/src/i18n/zh-hant.ts index 729febd..7459a87 100644 --- a/vue/src/i18n/zh-hant.ts +++ b/vue/src/i18n/zh-hant.ts @@ -46,6 +46,17 @@ export const zhHant: Partial = { topicSearchGuideStep3: '輸入一句話搜尋,召回相似圖片並打開結果頁', topicSearchGuideEmptyReasonNoScope: '目前為空:尚未選擇範圍(預設關閉),請先點「範圍」選擇資料夾', topicSearchGuideEmptyReasonNoTopics: '目前為空:此範圍尚未生成主題(可點刷新,或調低最小組/閾值)', + topicSearchJobFailed: '任務失敗', + topicSearchJobStage: '階段', + topicSearchJobQueued: '已提交任務,準備開始…', + topicSearchJobStageEmbedding: '向量化中(Embedding)', + topicSearchJobStageClustering: '歸類中(Clustering)', + topicSearchJobStageTitling: '生成標題中(LLM)', + topicSearchJobStageDone: '完成', + topicSearchJobStageError: '失敗', + topicSearchJobEmbeddingDesc: '已向量化 {0}/{1}(掃描 {2});目前:{3}', + topicSearchJobClusteringDesc: '正在歸類 {0}/{1}', + topicSearchJobTitlingDesc: '正在生成標題 {0}/{1}', clearCacheIfNotTakeEffect: '如果更改沒有生效,請嘗試清理頁面緩存', success: '成功', setCurrFrameAsVideoPoster: '設置當前幀為視頻封面', diff --git a/vue/src/page/TopicSearch/TopicSearch.vue b/vue/src/page/TopicSearch/TopicSearch.vue index ec231a3..f26a172 100644 --- a/vue/src/page/TopicSearch/TopicSearch.vue +++ b/vue/src/page/TopicSearch/TopicSearch.vue @@ -1,9 +1,16 @@