sd-webui-infinite-image-bro.../scripts/iib/organize_files.py

693 lines
25 KiB
Python

"""
Smart file organization based on topic clustering.
Organizes scattered image files into folders based on semantic clustering.
"""
import os
import uuid
import asyncio
import threading
import time
import shutil
from typing import Dict, List, Optional, Callable, Any
from collections import defaultdict
from pydantic import BaseModel
from fastapi import FastAPI, Depends, HTTPException
from scripts.iib.logger import logger
from scripts.iib.db.datamodel import DataBase, Image as DbImg
from scripts.iib.tool import get_img_geninfo_txt_path, is_media_file
# ========== Job Storage ==========
_ORGANIZE_JOBS: Dict[str, Dict] = {}
_ORGANIZE_JOBS_LOCK = threading.Lock()
_ORGANIZE_JOBS_MAX = 16
def _job_now() -> float:
return time.time()
def _organize_job_get(job_id: str) -> Optional[Dict]:
with _ORGANIZE_JOBS_LOCK:
j = _ORGANIZE_JOBS.get(job_id)
return dict(j) if isinstance(j, dict) else None
def _organize_job_upsert(job_id: str, patch: Dict) -> None:
with _ORGANIZE_JOBS_LOCK:
cur = _ORGANIZE_JOBS.get(job_id)
if not isinstance(cur, dict):
cur = {"job_id": job_id}
cur.update(patch or {})
cur["updated_at"] = _job_now()
_ORGANIZE_JOBS[job_id] = cur
_organize_job_trim()
def _organize_job_trim() -> None:
with _ORGANIZE_JOBS_LOCK:
if len(_ORGANIZE_JOBS) <= _ORGANIZE_JOBS_MAX:
return
items = sorted(
_ORGANIZE_JOBS.items(),
key=lambda kv: kv[1].get("updated_at", 0),
reverse=True
)
keep = dict(items[:_ORGANIZE_JOBS_MAX])
_ORGANIZE_JOBS.clear()
_ORGANIZE_JOBS.update(keep)
# ========== Request/Response Models ==========
class OrganizeFilesReq(BaseModel):
folder_paths: List[str]
dest_folder: Optional[str] = None
threshold: float = 0.90
min_cluster_size: int = 2
lang: str = "en"
recursive: bool = False # If True, treat all files in subfolders as files to organize
# Folder naming options
folder_naming: str = "title" # "title" | "keywords" | "id"
max_folder_name_length: int = 50
# Operation options
action: str = "move" # "move" | "copy"
handle_noise: str = "unsorted" # "skip" | "unsorted" | "leave"
noise_folder_name: str = "未分类"
class OrganizeFolderEdit(BaseModel):
cluster_id: str
new_folder_name: str
class OrganizeFilesConfirmReq(BaseModel):
job_id: str
folder_edits: Optional[List[OrganizeFolderEdit]] = None
skip_cluster_ids: Optional[List[str]] = None
# ========== Utility Functions ==========
def _sanitize_folder_name(name: str, max_len: int = 50) -> str:
"""Sanitize folder name, remove illegal characters but keep spaces for readability."""
if not name:
return "cluster"
import re
# Remove illegal characters for Windows/Unix
illegal = r'<>:"/\|?*'
for c in illegal:
name = name.replace(c, ' ')
# Remove leading/trailing spaces and dots
name = name.strip(' .')
# Replace multiple consecutive spaces with single space (keep spaces, don't convert to underscore)
name = re.sub(r'\s+', ' ', name)
# Truncate
if len(name) > max_len:
name = name[:max_len].rstrip(' ._')
return name or "cluster"
def _generate_folder_name(
cluster: Dict,
naming: str,
existing_disk_names: set,
max_len: int
) -> str:
"""
Generate folder name for a cluster (without uniqueness check).
Args:
cluster: cluster info dict
naming: naming strategy ("title" | "keywords" | "id")
existing_disk_names: set of folder names already existing on disk (case-insensitive, for merging)
max_len: max folder name length
Returns:
folder name string
Note: This function may return the same name for different clusters.
The caller (_build_file_mappings) is responsible for merging clusters with the same name.
"""
if naming == "title":
base = cluster.get("title", "") or f"cluster_{cluster.get('id', '')}"
elif naming == "keywords":
kws = cluster.get("keywords", [])[:3]
base = "_".join(kws) if kws else f"cluster_{cluster.get('id', '')}"
else: # id
base = f"cluster_{cluster.get('id', '')}"
base = _sanitize_folder_name(base, max_len)
# Check if this name matches an existing folder on disk (case-insensitive)
# If so, use the disk name (preserving case) for merging
existing_disk_lower_map = {n.lower(): n for n in existing_disk_names}
if base.lower() in existing_disk_lower_map:
return existing_disk_lower_map[base.lower()]
return base
def _build_file_mappings(
cluster_result: Dict,
dest_folder: str,
folder_naming: str,
max_len: int,
handle_noise: str,
noise_folder_name: str
) -> Dict:
"""
Build file mappings from cluster result.
Returns: {clusters, noise, all_mappings}
Note: Clusters with the same generated folder name will be merged into one.
"""
# Get existing folder names on disk (for merge detection)
existing_disk_names = set()
if os.path.isdir(dest_folder):
for item in os.listdir(dest_folder):
item_path = os.path.join(dest_folder, item)
if os.path.isdir(item_path):
existing_disk_names.add(item)
all_mappings = []
# First pass: generate folder names and group clusters by folder name
# This merges clusters that would end up with the same folder name
folder_to_clusters: Dict[str, Dict] = {} # folder_name -> merged cluster info
for cluster in cluster_result.get("clusters", []):
folder_name = _generate_folder_name(cluster, folder_naming, existing_disk_names, max_len)
if folder_name in folder_to_clusters:
# Merge into existing
existing = folder_to_clusters[folder_name]
existing["paths"].extend(cluster.get("paths", []))
existing["cluster_ids"].append(cluster["id"])
# Merge keywords (dedupe)
for kw in cluster.get("keywords", []):
if kw not in existing["keywords"]:
existing["keywords"].append(kw)
else:
# New folder
folder_to_clusters[folder_name] = {
"folder_name": folder_name,
"title": cluster.get("title", ""),
"keywords": list(cluster.get("keywords", [])),
"paths": list(cluster.get("paths", [])),
"cluster_ids": [cluster["id"]],
}
# Second pass: build file mappings from merged clusters
clusters_preview = []
for folder_name, merged in folder_to_clusters.items():
file_mappings = []
# Use first cluster_id as the main id, or combine them
cluster_id = merged["cluster_ids"][0] if len(merged["cluster_ids"]) == 1 else "_".join(merged["cluster_ids"])
for path in merged["paths"]:
filename = os.path.basename(path)
dest_path = os.path.join(dest_folder, folder_name, filename)
mapping = {
"src_path": path,
"dest_folder_name": folder_name,
"dest_path": dest_path,
"cluster_id": cluster_id,
"is_noise": False
}
file_mappings.append(mapping)
all_mappings.append(mapping)
clusters_preview.append({
"cluster_id": cluster_id,
"suggested_folder_name": folder_name,
"title": merged["title"],
"keywords": merged["keywords"],
"size": len(file_mappings),
"file_mappings": file_mappings
})
# Process noise files
noise_mappings = []
noise_paths = cluster_result.get("noise", [])
noise_folder = ""
if handle_noise == "unsorted" and noise_paths:
# Put into "unsorted" folder
noise_folder_base = _sanitize_folder_name(noise_folder_name, max_len)
# Check if noise folder already exists on disk (merge into it)
existing_disk_lower_map = {n.lower(): n for n in existing_disk_names}
if noise_folder_base.lower() in existing_disk_lower_map:
# Use existing folder name (case-preserved)
noise_folder = existing_disk_lower_map[noise_folder_base.lower()]
else:
# Check if it conflicts with a cluster folder name
used_folder_names = set(folder_to_clusters.keys())
used_lower_map = {n.lower(): n for n in used_folder_names}
if noise_folder_base.lower() in used_lower_map:
# Merge into the cluster folder (or add suffix to avoid confusion)
# For noise, we add suffix to avoid mixing with clustered files
noise_folder = noise_folder_base
counter = 1
while noise_folder.lower() in used_lower_map or noise_folder.lower() in existing_disk_lower_map:
noise_folder = f"{noise_folder_base}_{counter}"
counter += 1
else:
noise_folder = noise_folder_base
for path in noise_paths:
filename = os.path.basename(path)
dest_path = os.path.join(dest_folder, noise_folder, filename)
mapping = {
"src_path": path,
"dest_folder_name": noise_folder,
"dest_path": dest_path,
"cluster_id": "__noise__",
"is_noise": True
}
noise_mappings.append(mapping)
all_mappings.append(mapping)
elif handle_noise == "leave":
# Keep in original location, don't move
for path in noise_paths:
mapping = {
"src_path": path,
"dest_folder_name": "",
"dest_path": path, # Keep original
"cluster_id": "__noise__",
"is_noise": True
}
noise_mappings.append(mapping)
# Don't add to all_mappings since no move needed
# "skip": completely ignore noise files
noise_cluster = {
"cluster_id": "__noise__",
"suggested_folder_name": noise_folder,
"title": "未分类" if noise_folder_name == "未分类" else "Unsorted",
"keywords": [],
"size": len(noise_paths),
"file_mappings": noise_mappings
}
return {
"clusters": clusters_preview,
"noise": noise_cluster,
"all_mappings": all_mappings
}
# ========== Route Mounting ==========
def mount_organize_routes(
app: FastAPI,
db_api_base: str,
verify_secret,
write_permission_required,
# Cluster job functions from topic_cluster
start_cluster_job_func: Callable,
get_cluster_job_status_func: Callable,
):
"""Mount file organization routes."""
async def _run_organize_job(job_id: str, req: OrganizeFilesReq):
"""
Background task for file organization.
Uses topic_cluster job API internally.
"""
try:
logger.info(f"[organize_files][{job_id}] Starting organize job")
_organize_job_upsert(job_id, {
"status": "running",
"progress": {
"stage": "clustering",
"embedded_done": 0,
"to_embed": 0,
"clusters_done": 0,
"clusters_total": 0,
"moved_done": 0,
"moved_total": 0,
"current_file": "",
"created_folders": [],
"errors": []
}
})
# 1. Start cluster job using topic_cluster API
# Get existing folder names in dest directory for AI to consider reusing
dest_folder = req.dest_folder or req.folder_paths[0]
dest_folder = os.path.abspath(dest_folder)
existing_folder_names = []
if os.path.isdir(dest_folder):
for item in os.listdir(dest_folder):
item_path = os.path.join(dest_folder, item)
if os.path.isdir(item_path):
existing_folder_names.append(item)
logger.info(f"[organize_files][{job_id}] Starting cluster job (recursive={req.recursive}, existing_folders={len(existing_folder_names)})")
cluster_job_id = await start_cluster_job_func(
folder_paths=req.folder_paths,
threshold=req.threshold,
min_cluster_size=req.min_cluster_size,
lang=req.lang,
recursive=req.recursive,
existing_folder_names=existing_folder_names,
)
# 2. Poll cluster job status until done
logger.info(f"[organize_files][{job_id}] Polling cluster job {cluster_job_id}")
max_wait = 3600 # 1 hour max
start_time = time.time()
poll_interval = 0.5
while True:
if time.time() - start_time > max_wait:
raise Exception("Cluster job timeout")
cluster_status = await get_cluster_job_status_func(cluster_job_id)
status = cluster_status.get("status", "")
stage = cluster_status.get("stage", "")
progress = cluster_status.get("progress", {})
# Update organize job progress based on cluster job
_organize_job_upsert(job_id, {
"progress": {
"stage": stage,
"embedded_done": progress.get("embedded_done", 0),
"to_embed": progress.get("to_embed", 0),
"clusters_done": progress.get("clusters_done", 0),
"clusters_total": progress.get("clusters_total", 0),
"moved_done": 0,
"moved_total": 0,
"current_file": "",
"created_folders": [],
"errors": []
}
})
if status == "done":
cluster_result = cluster_status.get("result", {})
break
elif status == "error":
raise Exception(f"Cluster job failed: {cluster_status.get('error', 'Unknown error')}")
await asyncio.sleep(poll_interval)
# Increase poll interval gradually
poll_interval = min(poll_interval * 1.2, 2.0)
# 3. Build file mappings for preview
logger.info(f"[organize_files][{job_id}] Building file mappings")
dest_folder = req.dest_folder or req.folder_paths[0]
dest_folder = os.path.abspath(dest_folder)
preview_data = _build_file_mappings(
cluster_result,
dest_folder,
req.folder_naming,
req.max_folder_name_length,
req.handle_noise,
req.noise_folder_name
)
preview = {
"job_id": job_id,
"dest_folder": dest_folder,
"total_files": len(preview_data["all_mappings"]),
"clusters": preview_data["clusters"],
"noise": preview_data["noise"],
"all_mappings": preview_data["all_mappings"]
}
_organize_job_upsert(job_id, {
"status": "preview_ready",
"preview": preview,
"progress": {
"stage": "preview_ready",
"embedded_done": 0,
"to_embed": 0,
"clusters_done": len(preview_data["clusters"]),
"clusters_total": len(preview_data["clusters"]),
"moved_done": 0,
"moved_total": 0,
"current_file": "",
"created_folders": [],
"errors": []
}
})
logger.info(f"[organize_files][{job_id}] Preview ready: {len(preview_data['all_mappings'])} files")
except Exception as e:
import traceback
logger.error(f"[organize_files][{job_id}] Error: {e}")
logger.error(traceback.format_exc())
_organize_job_upsert(job_id, {
"status": "error",
"error": str(e),
})
async def _execute_organize(job_id: str, req: OrganizeFilesConfirmReq):
"""
Execute file organization (move/copy files).
"""
try:
job = _organize_job_get(job_id)
if not job:
return
original_req = job.get("req", {})
action = original_req.get("action", "move")
preview = job.get("preview", {})
all_mappings = list(preview.get("all_mappings", []))
dest_folder = preview.get("dest_folder", "")
logger.info(f"[organize_files][{job_id}] Executing organize: action={action}, mappings={len(all_mappings)}")
# Apply user edits
folder_edits = {}
for edit in (req.folder_edits or []):
folder_edits[edit.cluster_id] = edit.new_folder_name
skip_ids = set(req.skip_cluster_ids or [])
# Filter and update mappings
final_mappings = []
for m in all_mappings:
# Skip if cluster is skipped
if m["cluster_id"] in skip_ids:
continue
# Apply folder name edits
if m["cluster_id"] in folder_edits:
new_name = _sanitize_folder_name(folder_edits[m["cluster_id"]])
m["dest_folder_name"] = new_name
m["dest_path"] = os.path.join(dest_folder, new_name, os.path.basename(m["src_path"]))
# Skip files that don't need moving (same src and dest)
if m["dest_path"] != m["src_path"]:
final_mappings.append(m)
total = len(final_mappings)
_organize_job_upsert(job_id, {
"status": "moving",
"progress": {
"stage": "moving",
"moved_total": total,
"moved_done": 0,
"current_file": "",
"created_folders": [],
"errors": []
}
})
# Group by destination folder
by_dest: Dict[str, List[Dict]] = defaultdict(list)
for m in final_mappings:
dest_dir = os.path.dirname(m["dest_path"])
by_dest[dest_dir].append(m)
created_folders = []
errors = []
moved_done = 0
conn = DataBase.get_conn()
# Execute moves/copies
for dest_dir, mappings in by_dest.items():
try:
# Update progress
current_folder = os.path.basename(dest_dir)
_organize_job_upsert(job_id, {
"progress": {
"stage": "moving",
"moved_done": moved_done,
"moved_total": total,
"current_file": f"-> {current_folder}/",
"created_folders": created_folders,
"errors": errors
}
})
# Create destination folder
if not os.path.exists(dest_dir):
os.makedirs(dest_dir, exist_ok=True)
created_folders.append(dest_dir)
# Move or copy each file
for m in mappings:
src_path = m["src_path"]
dest_path = m["dest_path"]
try:
txt_path = get_img_geninfo_txt_path(src_path)
if action == "move":
if txt_path:
shutil.move(txt_path, dest_dir)
img = DbImg.get(conn, src_path)
if img:
img.update_path(conn, dest_path, force=True)
shutil.move(src_path, dest_path)
else:
shutil.copy2(src_path, dest_path)
if txt_path:
shutil.copy2(txt_path, dest_dir)
moved_done += 1
except Exception as e:
errors.append({
"src": src_path,
"dest": dest_path,
"error": str(e)
})
logger.error(f"[organize_files][{job_id}] Error moving {src_path}: {e}")
conn.commit()
logger.info(f"[organize_files][{job_id}] Processed {len(mappings)} files to {dest_dir}")
except Exception as e:
conn.rollback()
error_info = {"dest": dest_dir, "count": len(mappings), "error": str(e)}
errors.append(error_info)
logger.error(f"[organize_files][{job_id}] Error with folder {dest_dir}: {e}")
# Yield control
await asyncio.sleep(0)
# Done
_organize_job_upsert(job_id, {
"status": "done",
"progress": {
"stage": "done",
"moved_done": moved_done,
"moved_total": total,
"current_file": "",
"created_folders": created_folders,
"errors": errors
},
"result": {
"moved_count": moved_done,
"created_folders": created_folders,
"errors": errors
}
})
logger.info(f"[organize_files][{job_id}] Completed: moved {moved_done} files to {len(created_folders)} folders")
except Exception as e:
import traceback
logger.error(f"[organize_files][{job_id}] Execute error: {e}")
logger.error(traceback.format_exc())
_organize_job_upsert(job_id, {
"status": "error",
"error": str(e),
})
@app.post(
f"{db_api_base}/organize_files_start",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
async def organize_files_start(req: OrganizeFilesReq):
"""
Start file organization task (background).
Returns job_id for progress polling.
"""
# Validate folders
folders = []
for p in req.folder_paths:
if isinstance(p, str) and p.strip():
folders.append(os.path.normpath(p.strip()))
if not folders:
raise HTTPException(400, "folder_paths is required")
for f in folders:
if not os.path.exists(f) or not os.path.isdir(f):
raise HTTPException(400, f"Folder not found: {f}")
# Update req with normalized folders
req.folder_paths = folders
job_id = uuid.uuid4().hex
_organize_job_upsert(job_id, {
"status": "queued",
"req": req.dict(),
"created_at": _job_now()
})
asyncio.create_task(_run_organize_job(job_id, req))
logger.info(f"[organize_files] Started job {job_id} for folders: {folders}")
return {"job_id": job_id}
@app.get(
f"{db_api_base}/organize_files_status",
dependencies=[Depends(verify_secret)],
)
async def organize_files_status(job_id: str):
"""
Query task status and progress.
Status flow: queued -> running -> preview_ready -> (confirm) -> moving -> done
"""
job = _organize_job_get(job_id)
if not job:
raise HTTPException(404, "Job not found")
return job
@app.post(
f"{db_api_base}/organize_files_confirm",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
async def organize_files_confirm(req: OrganizeFilesConfirmReq):
"""
Confirm and execute file organization.
- Can modify folder names via folder_edits
- Can skip clusters via skip_cluster_ids
"""
job = _organize_job_get(req.job_id)
if not job:
raise HTTPException(404, "Job not found")
if job.get("status") != "preview_ready":
raise HTTPException(400, f"Job not ready for confirmation, current status: {job.get('status')}")
asyncio.create_task(_execute_organize(req.job_id, req))
return {"ok": True, "job_id": req.job_id}