feat: async clustering job with progress UI

pull/870/head
wuqinchuan 2025-12-29 00:10:53 +08:00 committed by zanllp
parent e522d8e878
commit 2edf9e52d7
7 changed files with 353 additions and 8 deletions

View File

@ -25,6 +25,10 @@ _TOPIC_CLUSTER_JOBS_MAX = 16
_EMBEDDING_MAX_TOKENS_SOFT = 7800
# Some providers enforce a max token budget per *request* (sum of all inputs), not only per input.
# Keep a conservative per-request budget to avoid "context length exceeded" across batched inputs.
_EMBEDDING_REQUEST_MAX_TOKENS_SOFT = 7600
_EMBEDDING_DEBUG = os.getenv("IIB_EMBEDDING_DEBUG", "0").strip().lower() in ["1", "true", "yes", "on"]
def _estimate_tokens_soft(s: str) -> int:
@ -75,6 +79,35 @@ def _truncate_for_embedding_tokens(s: str, max_tokens: int = _EMBEDDING_MAX_TOKE
return ("".join(out)).strip()
def _batched_by_token_budget(
items: List[Dict],
*,
max_items: int,
max_tokens_sum: int,
) -> List[List[Dict]]:
"""
Split a list of items (each contains 'text') into batches constrained by:
- max_items
- sum(estimated_tokens(text)) <= max_tokens_sum
"""
batches: List[List[Dict]] = []
cur: List[Dict] = []
cur_tokens = 0
for it in items:
txt = str(it.get("text") or "")
t = _estimate_tokens_soft(txt)
# ensure progress even if one item is huge (should already be truncated per-input)
if cur and (len(cur) >= max_items or (cur_tokens + t) > max_tokens_sum):
batches.append(cur)
cur = []
cur_tokens = 0
cur.append(it)
cur_tokens += t
if cur:
batches.append(cur)
return batches
def _job_now() -> float:
return time.time()
@ -314,7 +347,10 @@ def _call_embeddings_sync(
except requests.RequestException as e:
raise HTTPException(status_code=502, detail=f"Embedding API request failed: {e}")
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
# Do not leak upstream 401 to frontend (treat as bad request/config to avoid confusing auth state).
status = 400 if resp.status_code == 401 else resp.status_code
body = (resp.text or "")[:600]
raise HTTPException(status_code=status, detail=body)
data = resp.json()
items = data.get("data") or []
items.sort(key=lambda x: x.get("index", 0))
@ -430,7 +466,8 @@ def _call_chat_title_sync(
if resp.status_code != 200:
# keep response body for debugging (truncated)
body = (resp.text or "")[:600]
raise HTTPException(status_code=resp.status_code, detail=body)
status = 400 if resp.status_code == 401 else resp.status_code
raise HTTPException(status_code=status, detail=body)
try:
data = resp.json()
except Exception as e:
@ -535,6 +572,97 @@ def mount_topic_cluster_routes(
Mount embedding + topic clustering endpoints (MVP: manual, iib_output only).
"""
async def _run_cluster_job(job_id: str, req) -> None:
try:
_job_upsert(
job_id,
{
"status": "running",
"stage": "init",
"created_at": _job_now(),
"req": req.model_dump() if hasattr(req, "model_dump") else req.dict(),
},
)
folders = _extract_and_validate_folders(req)
_job_upsert(job_id, {"folders": folders, "stage": "embedding"})
# Aggregate per-folder embedding progress into totals
per_folder: Dict[str, Dict] = {}
def _embed_cb(p: Dict) -> None:
if not isinstance(p, dict):
return
if p.get("stage") != "embedding":
return
f = str(p.get("folder") or "")
if f:
per_folder[f] = dict(p)
scanned = sum(int(x.get("scanned") or 0) for x in per_folder.values())
to_embed = sum(int(x.get("to_embed") or 0) for x in per_folder.values())
embedded_done = sum(int(x.get("embedded_done") or 0) for x in per_folder.values())
updated = sum(int(x.get("updated") or 0) for x in per_folder.values())
skipped = sum(int(x.get("skipped") or 0) for x in per_folder.values())
_job_upsert(
job_id,
{
"stage": "embedding",
"progress": {
"scanned": scanned,
"to_embed": to_embed,
"embedded_done": embedded_done,
"updated": updated,
"skipped": skipped,
"folder": f,
"batch_n": int((p or {}).get("batch_n") or 0),
},
},
)
# Ensure embeddings exist (incremental per folder)
model = req.model or embedding_model
batch_size = max(1, min(int(req.batch_size or 64), 256))
max_chars = max(256, min(int(req.max_chars or 4000), 8000))
force = bool(req.force_embed)
for f in folders:
await _build_embeddings_one_folder(
folder=f,
model=model,
force=force,
batch_size=batch_size,
max_chars=max_chars,
progress_cb=_embed_cb,
)
# Clustering + titling progress
def _cluster_cb(p: Dict) -> None:
if not isinstance(p, dict):
return
st = str(p.get("stage") or "")
if st:
patch = {"stage": st, "status": "running"}
# keep small progress fields only
prog = {}
for k in [
"items_total",
"items_done",
"clusters_total",
"clusters_done",
"folder",
]:
if k in p:
prog[k] = p.get(k)
if prog:
patch["progress"] = {**(_job_get(job_id) or {}).get("progress", {}), **prog}
_job_upsert(job_id, patch)
res = await _cluster_after_embeddings(req, folders, progress_cb=_cluster_cb)
_job_upsert(job_id, {"status": "done", "stage": "done", "result": res})
except HTTPException as e:
_job_upsert(job_id, {"status": "error", "stage": "error", "error": str(e.detail)})
except Exception as e:
_job_upsert(job_id, {"status": "error", "stage": "error", "error": f"{type(e).__name__}: {e}"})
async def _build_embeddings_one_folder(
*,
folder: str,
@ -633,9 +761,19 @@ def mount_topic_cluster_routes(
updated = 0
embedded_done = 0
for i in range(0, len(to_embed), batch_size):
batch = to_embed[i : i + batch_size]
batches = _batched_by_token_budget(
to_embed,
max_items=batch_size,
max_tokens_sum=_EMBEDDING_REQUEST_MAX_TOKENS_SOFT,
)
for bi, batch in enumerate(batches):
inputs = [x["text"] for x in batch]
if _EMBEDDING_DEBUG:
token_sum = sum(_estimate_tokens_soft(s) for s in inputs)
token_max = max((_estimate_tokens_soft(s) for s in inputs), default=0)
print(
f"[iib][embed] folder={folder} batch={bi+1}/{len(batches)} n={len(inputs)} token_sum~={token_sum} token_max~={token_max}"
)
vectors = await _call_embeddings(
inputs=inputs,
model=model,
@ -666,6 +804,7 @@ def mount_topic_cluster_routes(
"embedded_done": embedded_done,
"updated": updated,
"skipped": skipped,
"batch_n": len(inputs),
}
)
# yield between batches
@ -722,6 +861,30 @@ def mount_topic_cluster_routes(
# Output language for titles/keywords (from frontend globalStore.lang)
lang: Optional[str] = None
@app.post(
f"{db_api_base}/cluster_iib_output_job_start",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
async def cluster_iib_output_job_start(req: ClusterIibOutputReq):
"""
Start a background job for embedding + clustering + LLM titling.
Returns job_id immediately; frontend should poll job_status to show progress.
"""
job_id = uuid.uuid4().hex
_job_upsert(job_id, {"status": "queued", "stage": "queued", "created_at": _job_now()})
asyncio.create_task(_run_cluster_job(job_id, req))
return {"job_id": job_id}
@app.get(
f"{db_api_base}/cluster_iib_output_job_status",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
async def cluster_iib_output_job_status(job_id: str):
j = _job_get(job_id)
if not j:
raise HTTPException(status_code=404, detail="job not found")
return j
def _extract_and_validate_folders(req: ClusterIibOutputReq) -> List[str]:
folders: List[str] = []
if req.folder_paths:

View File

@ -222,6 +222,45 @@ export const clusterIibOutput = async (req: ClusterIibOutputReq) => {
return resp.data as ClusterIibOutputResp
}
// ===== Async clustering job (progress polling) =====
export interface ClusterIibOutputJobStartResp {
job_id: string
}
export interface ClusterIibOutputJobStatusResp {
job_id: string
status: 'queued' | 'running' | 'done' | 'error'
stage?: string
folders?: string[]
progress?: {
// embedding totals
scanned?: number
to_embed?: number
embedded_done?: number
updated?: number
skipped?: number
folder?: string
// clustering
items_total?: number
items_done?: number
// titling
clusters_total?: number
clusters_done?: number
}
error?: string
result?: ClusterIibOutputResp
}
export const startClusterIibOutputJob = async (req: ClusterIibOutputReq) => {
const resp = await axiosInst.value.post('/db/cluster_iib_output_job_start', req)
return resp.data as ClusterIibOutputJobStartResp
}
export const getClusterIibOutputJobStatus = async (job_id: string) => {
const resp = await axiosInst.value.get('/db/cluster_iib_output_job_status', { params: { job_id } })
return resp.data as ClusterIibOutputJobStatusResp
}
// ===== Natural language prompt query (RAG-like retrieval) =====
export interface PromptSearchReq {
query: string

View File

@ -46,6 +46,17 @@ export const de: Partial<IIBI18nMap> = {
topicSearchGuideStep3: 'Geben Sie einen Satz ein, um zu suchen; ähnliche Bilder werden abgerufen und die Ergebnisse geöffnet',
topicSearchGuideEmptyReasonNoScope: 'Leer, weil: kein Bereich ausgewählt (standardmäßig deaktiviert). Klicken Sie auf „Bereich“, um Ordner zu wählen.',
topicSearchGuideEmptyReasonNoTopics: 'Leer, weil: für diesen Bereich noch keine Themen erzeugt wurden (Aktualisieren oder Min. Cluster/Schwelle senken).',
topicSearchJobFailed: 'Job fehlgeschlagen',
topicSearchJobStage: 'Phase',
topicSearchJobQueued: 'Job in Warteschlange…',
topicSearchJobStageEmbedding: 'Vektorisierung…',
topicSearchJobStageClustering: 'Clustering…',
topicSearchJobStageTitling: 'Titel werden erzeugt…',
topicSearchJobStageDone: 'Fertig',
topicSearchJobStageError: 'Fehler',
topicSearchJobEmbeddingDesc: '{0}/{1} vektorisiert (gescannt {2}); aktuell: {3}',
topicSearchJobClusteringDesc: 'Clustering {0}/{1}',
topicSearchJobTitlingDesc: 'Titel {0}/{1}',
'auto.refreshed': 'Automatische Aktualisierung erfolgreich durchgeführt!',
copied: 'In die Zwischenablage kopiert!',
'index.expired': 'Index abgelaufen, automatische Aktualisierung wird durchgeführt',

View File

@ -46,6 +46,17 @@ export const en: IIBI18nMap = {
topicSearchGuideStep3: 'Type a sentence to search; it will retrieve similar images and open the result page',
topicSearchGuideEmptyReasonNoScope: 'Empty because: no scope selected (disabled by default). Click “Scope” to choose folders.',
topicSearchGuideEmptyReasonNoTopics: 'Empty because: no topics generated yet for this scope (try Refresh or lower Min cluster/Threshold).',
topicSearchJobFailed: 'Job failed',
topicSearchJobStage: 'Stage',
topicSearchJobQueued: 'Job queued…',
topicSearchJobStageEmbedding: 'Embedding…',
topicSearchJobStageClustering: 'Clustering…',
topicSearchJobStageTitling: 'Generating titles…',
topicSearchJobStageDone: 'Done',
topicSearchJobStageError: 'Error',
topicSearchJobEmbeddingDesc: 'Embedded {0}/{1} (scanned {2}); current: {3}',
topicSearchJobClusteringDesc: 'Clustering {0}/{1}',
topicSearchJobTitlingDesc: 'Titling {0}/{1}',
success: 'Success',
setCurrFrameAsVideoPoster: 'Set Current Frame as Video Cover',
sync: 'Sync',

View File

@ -44,6 +44,17 @@ export const zhHans = {
topicSearchGuideStep3: '输入一句话搜索,会召回相似图片并打开结果页',
topicSearchGuideEmptyReasonNoScope: '当前为空:未选择范围(已默认关闭),请先点“范围”选择文件夹',
topicSearchGuideEmptyReasonNoTopics: '当前为空:该范围内还未生成主题(可点刷新,或调低最小组/阈值)',
topicSearchJobFailed: '任务失败',
topicSearchJobStage: '阶段',
topicSearchJobQueued: '已提交任务,准备开始…',
topicSearchJobStageEmbedding: '向量化中Embedding',
topicSearchJobStageClustering: '归类中Clustering',
topicSearchJobStageTitling: '生成标题中LLM',
topicSearchJobStageDone: '完成',
topicSearchJobStageError: '失败',
topicSearchJobEmbeddingDesc: '已向量化 {0}/{1}(扫描 {2});当前:{3}',
topicSearchJobClusteringDesc: '正在归类 {0}/{1}',
topicSearchJobTitlingDesc: '正在生成标题 {0}/{1}',
success: '成功',
setCurrFrameAsVideoPoster: '设置当前帧为视频封面',
sync: '同步',

View File

@ -46,6 +46,17 @@ export const zhHant: Partial<IIBI18nMap> = {
topicSearchGuideStep3: '輸入一句話搜尋,召回相似圖片並打開結果頁',
topicSearchGuideEmptyReasonNoScope: '目前為空:尚未選擇範圍(預設關閉),請先點「範圍」選擇資料夾',
topicSearchGuideEmptyReasonNoTopics: '目前為空:此範圍尚未生成主題(可點刷新,或調低最小組/閾值)',
topicSearchJobFailed: '任務失敗',
topicSearchJobStage: '階段',
topicSearchJobQueued: '已提交任務,準備開始…',
topicSearchJobStageEmbedding: '向量化中Embedding',
topicSearchJobStageClustering: '歸類中Clustering',
topicSearchJobStageTitling: '生成標題中LLM',
topicSearchJobStageDone: '完成',
topicSearchJobStageError: '失敗',
topicSearchJobEmbeddingDesc: '已向量化 {0}/{1}(掃描 {2});目前:{3}',
topicSearchJobClusteringDesc: '正在歸類 {0}/{1}',
topicSearchJobTitlingDesc: '正在生成標題 {0}/{1}',
clearCacheIfNotTakeEffect: '如果更改沒有生效,請嘗試清理頁面緩存',
success: '成功',
setCurrFrameAsVideoPoster: '設置當前幀為視頻封面',

View File

@ -1,9 +1,16 @@
<script setup lang="ts">
import { getGlobalSettingRaw, setAppFeSettingForce } from '@/api'
import { clusterIibOutput, searchIibOutputByPrompt, type ClusterIibOutputResp, type PromptSearchResp } from '@/api/db'
import {
getClusterIibOutputJobStatus,
searchIibOutputByPrompt,
startClusterIibOutputJob,
type ClusterIibOutputJobStatusResp,
type ClusterIibOutputResp,
type PromptSearchResp
} from '@/api/db'
import { t } from '@/i18n'
import { useGlobalStore } from '@/store/useGlobalStore'
import { computed, onMounted, ref, watch } from 'vue'
import { computed, onBeforeUnmount, onMounted, ref, watch } from 'vue'
import { uniqueId } from 'lodash-es'
import { message } from 'ant-design-vue'
@ -15,6 +22,56 @@ const threshold = ref(0.86)
const minClusterSize = ref(2)
const result = ref<ClusterIibOutputResp | null>(null)
const job = ref<ClusterIibOutputJobStatusResp | null>(null)
const jobId = ref<string>('')
let _jobTimer: any = null
const jobRunning = computed(() => {
const st = job.value?.status
return st === 'queued' || st === 'running'
})
const jobStageText = computed(() => {
const st = String(job.value?.stage || '')
if (!st || st === 'queued' || st === 'init') return t('topicSearchJobQueued')
if (st === 'embedding') return t('topicSearchJobStageEmbedding')
if (st === 'clustering') return t('topicSearchJobStageClustering')
if (st === 'titling') return t('topicSearchJobStageTitling')
if (st === 'done') return t('topicSearchJobStageDone')
if (st === 'error') return t('topicSearchJobStageError')
return `${t('topicSearchJobStage')}: ${st}`
})
const jobPercent = computed(() => {
const p = job.value?.progress
if (!p) return 0
const total = Number(p.to_embed ?? 0)
const done = Number(p.embedded_done ?? 0)
if (total <= 0) return 0
const v = Math.floor((done / total) * 100)
return Math.max(0, Math.min(100, v))
})
const jobDesc = computed(() => {
const p = job.value?.progress
if (!p) return ''
if (job.value?.stage === 'embedding') {
const done = Number(p.embedded_done ?? 0)
const total = Number(p.to_embed ?? 0)
const scanned = Number(p.scanned ?? 0)
const folder = String(p.folder ?? '')
return t('topicSearchJobEmbeddingDesc', [done, total, scanned, folder])
}
if (job.value?.stage === 'clustering') {
const done = Number(p.items_done ?? 0)
const total = Number(p.items_total ?? 0)
return t('topicSearchJobClusteringDesc', [done, total])
}
if (job.value?.stage === 'titling') {
const done = Number(p.clusters_done ?? 0)
const total = Number(p.clusters_total ?? 0)
return t('topicSearchJobTitlingDesc', [done, total])
}
return ''
})
const query = ref('')
const qLoading = ref(false)
const qResult = ref<PromptSearchResp | null>(null)
@ -109,6 +166,29 @@ const scheduleSaveScope = () => {
}, 500)
}
const stopJobPoll = () => {
if (_jobTimer) {
clearInterval(_jobTimer)
_jobTimer = null
}
}
const pollJob = async () => {
const id = jobId.value
if (!id) return
const st = await getClusterIibOutputJobStatus(id)
job.value = st
if (st.status === 'done') {
stopJobPoll()
loading.value = false
if (st.result) result.value = st.result
} else if (st.status === 'error') {
stopJobPoll()
loading.value = false
message.error(st.error || t('topicSearchJobFailed'))
}
}
const refresh = async () => {
if (g.conf?.is_readonly) return
if (!scopeCount.value) {
@ -116,16 +196,26 @@ const refresh = async () => {
scopeOpen.value = true
return
}
stopJobPoll()
loading.value = true
job.value = null
jobId.value = ''
try {
result.value = await clusterIibOutput({
const started = await startClusterIibOutputJob({
threshold: threshold.value,
min_cluster_size: minClusterSize.value,
lang: g.lang,
folder_paths: scopeFolders.value
})
} finally {
jobId.value = started.job_id
// poll immediately + interval
await pollJob()
_jobTimer = setInterval(() => {
void pollJob()
}, 800)
} catch (e) {
loading.value = false
throw e
}
}
@ -193,6 +283,10 @@ onMounted(() => {
})()
})
onBeforeUnmount(() => {
stopJobPoll()
})
watch(
() => scopeFolders.value,
() => {
@ -246,6 +340,11 @@ watch(
show-icon
/>
<div v-if="jobRunning" style="margin: 10px 0 0 0;">
<a-alert type="info" show-icon :message="jobStageText" :description="jobDesc" />
<a-progress :percent="jobPercent" size="small" style="margin-top: 8px;" />
</div>
<a-spin :spinning="loading">
<div v-if="qResult" style="margin-top: 10px;">
<a-alert