diff --git a/scripts/iib/topic_cluster.py b/scripts/iib/topic_cluster.py index ae89fe5..a384722 100644 --- a/scripts/iib/topic_cluster.py +++ b/scripts/iib/topic_cluster.py @@ -397,27 +397,91 @@ def _call_embeddings_sync( base_url: str, api_key: str, ) -> List[List[float]]: + logger.info("[embeddings] === _call_embeddings_sync START ===") + logger.info("[embeddings] base_url=%s model=%s n_inputs=%s", base_url, model, len(inputs)) + if not api_key: + logger.error("[embeddings] API Key not configured") raise HTTPException(status_code=500, detail="OpenAI API Key not configured") + logger.info("[embeddings] API Key configured (length=%s)", len(api_key)) + url = f"{_normalize_base_url(base_url)}/embeddings" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = {"model": model, "input": inputs} + + logger.info("[embeddings] Request URL: %s", url) + logger.info("[embeddings] Request model: %s", model) + logger.info("[embeddings] Request n_inputs: %s", len(inputs)) + + if "localhost" in base_url or "127.0.0.1" in base_url: + logger.info("[embeddings] Detected local API request (Ollama/LocalAI): %s", base_url) + + logger.debug( + "[embeddings] Request payload: %s", + json.dumps({"model": model, "input": ["(truncated)" if len(s) > 50 else s for s in inputs]}, ensure_ascii=False) + ) + try: + logger.info("[embeddings] Initiating HTTP request, timeout=120s") resp = requests.post(url, json=payload, headers=headers, timeout=120) + logger.info("[embeddings] HTTP request completed") + except requests.exceptions.Timeout as e: + logger.error("[embeddings] Request timeout: url=%s error=%s", url, str(e)) + logger.error("[embeddings] If using Ollama, ensure service is running: ollama serve") + raise HTTPException(status_code=504, detail=f"Embedding API request timeout: {e}") + except requests.exceptions.ConnectionError as e: + logger.error("[embeddings] Connection failed: url=%s error=%s", url, str(e)) + logger.error("[embeddings] Please check if API address is correct and Ollama is running") + raise HTTPException(status_code=502, detail=f"Embedding API connection failed: {e}") except requests.RequestException as e: + logger.error( + "[embeddings] Request failed: url=%s error=%s type=%s", + url, str(e), type(e).__name__ + ) raise HTTPException(status_code=502, detail=f"Embedding API request failed: {e}") + + logger.info( + "[embeddings] Response: status=%s content_length=%s", + resp.status_code, len(resp.content) + ) + if resp.status_code != 200: - # Do not leak upstream 401 to frontend (treat as bad request/config to avoid confusing auth state). status = 400 if resp.status_code == 401 else resp.status_code body = (resp.text or "")[:600] + logger.error( + "[embeddings] Non-200 response: status=%s body=%s", + status, body + ) + if resp.status_code == 404: + logger.error("[embeddings] 404 error, please check API address and model name") raise HTTPException(status_code=status, detail=body) - data = resp.json() + + try: + data = resp.json() + except Exception as e: + logger.error( + "[embeddings] Failed to parse JSON: error=%s text=%s", + str(e), resp.text[:500] + ) + raise HTTPException(status_code=500, detail="Invalid JSON response") + items = data.get("data") or [] items.sort(key=lambda x: x.get("index", 0)) embeddings = [x.get("embedding") for x in items] + if any((not isinstance(v, list) for v in embeddings)): + logger.error( + "[embeddings] Invalid embeddings format: n_items=%s valid_embeddings=%s", + len(items), sum(1 for v in embeddings if isinstance(v, list)) + ) raise HTTPException(status_code=500, detail="Invalid embeddings response format") + + logger.info( + "[embeddings] Success: n_embeddings=%s dim=%s", + len(embeddings), len(embeddings[0]) if embeddings else 0 + ) + return embeddings @@ -454,14 +518,30 @@ def _call_chat_title_sync( """ Ask LLM to generate a short topic title and a few keywords. Returns dict or None. """ + logger.info("[chat_title] === _call_chat_title_sync START ===") + logger.info("[chat_title] base_url=%s model=%s lang=%s", base_url, model, output_lang) + if not api_key: + logger.error("[chat_title] API Key not configured") raise HTTPException(status_code=500, detail="OpenAI API Key not configured") + + logger.info("[chat_title] API Key configured (length=%s)", len(api_key)) + url = f"{_normalize_base_url(base_url)}/chat/completions" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + logger.info("[chat_title] Request URL: %s", url) + logger.info("[chat_title] Request model: %s", model) + + if "localhost" in base_url or "127.0.0.1" in base_url: + logger.info("[chat_title] Detected local API request (Ollama/LocalAI): %s", base_url) + samples = [(_clean_prompt_for_semantic(_clean_for_title(s) or s) or s).strip() for s in prompt_samples if (s or "").strip()] samples = [s[:400] for s in samples][:6] + logger.info("[chat_title] Prompt samples count: %s", len(samples)) + if not samples: + logger.error("[chat_title] No prompt samples for title generation") raise HTTPException(status_code=400, detail="No prompt samples for title generation") json_example = '{"title":"...","keywords":["...","..."]}' @@ -697,7 +777,11 @@ def mount_topic_cluster_routes( # We only fail at specific API endpoints so users without deps can still use other features. async def _run_cluster_job(job_id: str, req) -> None: + logger.info("[cluster_job] === _run_cluster_job START ===") + logger.info("[cluster_job] job_id=%s", job_id) + try: + logger.info("[cluster_job] Initializing job") _job_upsert( job_id, { @@ -707,8 +791,11 @@ def mount_topic_cluster_routes( "req": req.model_dump() if hasattr(req, "model_dump") else req.dict(), }, ) + logger.info("[cluster_job] Job initialized successfully") + logger.info("[cluster_job] Extracting and validating folders") folders = _extract_and_validate_folders(req) + logger.info("[cluster_job] Folders validated: %s", folders) _job_upsert(job_id, {"folders": folders, "stage": "embedding"}) # Aggregate per-folder embedding progress into totals @@ -855,13 +942,26 @@ def mount_topic_cluster_routes( - stage: "embedding" - folder, scanned, to_embed, embedded_done, updated, skipped """ + logger.info("[build_embeddings] === _build_embeddings_one_folder START ===") + logger.info("[build_embeddings] folder=%s model=%s force=%s batch_size=%s max_chars=%s", + folder, model, force, batch_size, max_chars) + if not openai_api_key: + logger.error("[build_embeddings] OpenAI API Key not configured") raise HTTPException(status_code=500, detail="OpenAI API Key not configured") + if not openai_base_url: + logger.error("[build_embeddings] OpenAI Base URL not configured") raise HTTPException(status_code=500, detail="OpenAI Base URL not configured") + logger.info("[build_embeddings] Configuration check passed") + logger.info("[build_embeddings] API URL: %s", openai_base_url) + folder = os.path.normpath(folder) + logger.info("[build_embeddings] Normalized folder path: %s", folder) + if not os.path.exists(folder) or not os.path.isdir(folder): + logger.error("[build_embeddings] Folder not found: %s", folder) raise HTTPException(status_code=400, detail=f"Folder not found: {folder}") conn = DataBase.get_conn() @@ -962,13 +1062,21 @@ def mount_topic_cluster_routes( f"[iib][embed] folder={folder} batch={bi+1}/{len(batches)} n={len(inputs)} token_sum~={token_sum} token_max~={token_max}" ) try: + logger.info("[build_embeddings] Calling embedding API for batch %d/%d, size=%d", + bi+1, len(batches), len(inputs)) vectors = await _call_embeddings( inputs=inputs, model=model, base_url=openai_base_url, api_key=openai_api_key, ) + logger.info("[build_embeddings] Embedding API success for batch %d/%d", + bi+1, len(batches)) except HTTPException as e: + logger.error("[build_embeddings] Embedding API failed for batch %d/%d: %s", + bi+1, len(batches), str(e.detail)) + if "localhost" in openai_base_url or "127.0.0.1" in openai_base_url: + logger.error("[build_embeddings] Local API request failed, please check if Ollama is running: ollama serve") # Cache failures for this batch and continue (skip these images for now). err = str(e.detail) for it in batch: @@ -1274,6 +1382,9 @@ def mount_topic_cluster_routes( folders: List[str], progress_cb: Optional[Callable[[Dict], None]] = None, ) -> Dict: + logger.info("[cluster_after] === _cluster_after_embeddings START ===") + logger.info("[cluster_after] folders=%s", folders) + folder = folders[0] model = req.model or embedding_model threshold = float(req.threshold or 0.90) @@ -1282,17 +1393,24 @@ def mount_topic_cluster_routes( title_model = req.title_model or os.getenv("TOPIC_TITLE_MODEL") or ai_model output_lang = _normalize_output_lang(req.lang) assign_noise_threshold = req.assign_noise_threshold + + logger.info("[cluster_after] model=%s threshold=%s min_cluster_size=%s", model, threshold, min_cluster_size) + logger.info("[cluster_after] title_model=%s output_lang=%s", title_model, output_lang) + if assign_noise_threshold is None: - # More conservative: noise reassignment is helpful to reduce noise, - # but if too permissive it can "glue" loosely-related themes into one big topic. - # Keep it >= threshold (or slightly higher) so we only reassign when very confident. assign_noise_threshold = max(0.88, min(threshold + 0.02, 0.97)) else: assign_noise_threshold = max(0.0, min(float(assign_noise_threshold), 0.999)) + + logger.info("[cluster_after] assign_noise_threshold=%s", assign_noise_threshold) + use_title_cache = bool(True if req.use_title_cache is None else req.use_title_cache) force_title = bool(req.force_title) + logger.info("[cluster_after] use_title_cache=%s force_title=%s", use_title_cache, force_title) + if progress_cb: + logger.info("[cluster_after] Calling progress callback with clustering stage") progress_cb({"stage": "clustering", "folder": folder, "folders": folders}) conn = DataBase.get_conn()