#!/usr/bin/env python """ Caption API Test Suite Comprehensive tests for all Caption API endpoints and parameters: - GET/POST /sdapi/v1/openclip (OpenCLIP direct) - POST /sdapi/v1/caption (Unified dispatch: openclip, tagger, vlm) - POST /sdapi/v1/vqa (VLM direct) - GET /sdapi/v1/vqa/models, /sdapi/v1/vqa/prompts - POST /sdapi/v1/tagger (Tagger direct) - GET /sdapi/v1/tagger/models Usage: python cli/test-caption-api.py [--url URL] [--image PATH] Examples: # Test against local server with default test image python cli/test-caption-api.py # Test against custom URL with specific image python cli/test-caption-api.py --url http://127.0.0.1:7860 --image html/sdnext-robot-2k.jpg """ import os import re import sys import time import base64 import argparse import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Default test images (in order of preference) DEFAULT_TEST_IMAGES = [ 'html/sdnext-robot-2k.jpg', 'html/favicon.png', 'extensions-builtin/sdnext-modernui/html/logo.png', ] # OCR test image (must have readable text) OCR_TEST_IMAGE = 'models/Reference/HiDream-ai--HiDream-I1-Fast.jpg' # Bracket test image (must produce tags with parentheses, e.g. pokemon_(creature)) BRACKET_TEST_IMAGE = 'models/Reference/SDXL-Flash_Mini.jpg' # Custom prefill text used for dual-prefill verification across tests CUSTOM_PREFILL = "I'm looking at robot which" class CaptionAPITest: """Test harness for Caption API endpoints.""" # VQA model families for architecture testing VQA_FAMILIES = { 'qwen': ['qwen'], 'gemma': ['gemma'], # excluding paligemma 'smolvlm': ['smol'], 'florence': ['florence'], 'promptgen': ['promptgen'], 'moondream': ['moondream'], 'fastvlm': ['fastvlm'], 'git': ['git'], 'blip': ['blip'], 'pix2struct': ['pix'], 'paligemma': ['paligemma'], 'vilt': ['vilt'], 'ovis': ['ovis'], 'sa2va': ['sa2'], 'toriigate': ['torii'], 'mimo': ['mimo'], 'joytag': ['joytag'], 'joycaption': ['joycaption'], } # BLIP model types for caption testing (smaller models only to avoid reloading large models) BLIP_MODELS = [ 'blip-base', 'blip-large', 'blip2-opt-2.7b', ] def __init__(self, base_url, image_path=None, username=None, password=None, timeout=300): self.base_url = base_url.rstrip('/') self.image_path = image_path self.image_b64 = None self.ocr_image_b64 = None # Separate image with text for OCR tests self.bracket_image_b64 = None # Separate image that produces bracket-containing tags self.timeout = timeout # Request timeout in seconds # Categorized results tracking self.results = { 'openclip': {'passed': 0, 'failed': 0, 'skipped': 0, 'tests': []}, 'vqa': {'passed': 0, 'failed': 0, 'skipped': 0, 'tests': []}, 'tagger': {'passed': 0, 'failed': 0, 'skipped': 0, 'tests': []}, 'dispatch': {'passed': 0, 'failed': 0, 'skipped': 0, 'tests': []}, 'parity': {'passed': 0, 'failed': 0, 'skipped': 0, 'tests': []}, } self._current_category = 'openclip' # Default category # Track critical errors per backend to skip subsequent tests self._critical_errors = { 'openclip': None, 'tagger': None, 'vqa': None, } self.auth = None if username and password: self.auth = (username, password) # Cache for model lists to avoid repeated calls self._caption_models = None self._vqa_models = None self._tagger_models = None def set_category(self, category): """Set the current test category for result tracking.""" self._current_category = category def record_result(self, status, message): """Record a test result in the current category.""" cat = self._current_category self.results[cat]['tests'].append((status, message)) self.results[cat][status] += 1 def log_pass(self, msg): print(f" [PASS] {msg}") self.record_result('passed', msg) def log_fail(self, msg): print(f" [FAIL] {msg}") self.record_result('failed', msg) def log_skip(self, msg): print(f" [SKIP] {msg}") self.record_result('skipped', msg) def log_info(self, msg): print(f" [INFO] {msg}") def log_critical(self, backend, msg): """Log a critical error and mark the backend as failed.""" print(f" [CRITICAL] {msg}") self._critical_errors[backend] = msg self.record_result('failed', f"CRITICAL: {msg}") def has_critical_error(self, backend): """Check if a backend has a critical error and should skip tests.""" if self._critical_errors.get(backend): return True return False def skip_if_critical(self, backend, test_name): """Skip test if backend has critical error. Returns True if skipped.""" if self.has_critical_error(backend): self.log_skip(f"{test_name}: skipped due to prior critical error") return True return False def is_critical_error(self, response_text): """Check if a response indicates a critical/fatal error that should stop testing.""" if not response_text: return False text_lower = str(response_text).lower() # Critical error patterns that indicate backend is broken # Note: patterns are substring matches, so be careful with short strings that could match common words critical_patterns = [ 'runtimeerror', 'cuda error', 'out of memory', # 'oom' removed - matches words like "room", "zoom", "bloom"; 'out of memory' covers this case 'device-side assert', 'cublas', 'cudnn', 'nccl', 'input type', # tensor type mismatch 'weight type', # tensor type mismatch 'cannot be performed', 'illegal memory access', 'segmentation fault', ] # Patterns that need word boundary checking (could match common words) word_boundary_patterns = [ 'killed', # could match "skilled", "thrilled" 'critical', # could match "critical thinking" ] for pattern in critical_patterns: if pattern in text_lower: return True # Check word boundary patterns with regex for pattern in word_boundary_patterns: if re.search(rf'\b{pattern}\b', text_lower): return True return False def check_critical_error(self, data, backend): """Check response for critical errors and mark backend if found. Returns error message or None.""" if not data: return None # Check various response fields for critical errors fields_to_check = ['caption', 'tags', 'answer', 'error', 'reason', 'detail'] for field in fields_to_check: value = data.get(field) if value and self.is_critical_error(value): error_msg = f"{field}: {self.truncate(str(value), 100)}" self.log_critical(backend, error_msg) return error_msg return None @staticmethod def truncate(text, max_len=80): """Truncate text for display, adding ... if truncated.""" if text and len(str(text)) > max_len: return str(text)[:max_len] + "..." return str(text) if text else "" def log_response(self, response, key_fields=None): """Print response trace with key fields.""" if key_fields is None: key_fields = ['caption', 'tags', 'answer', 'backend'] for field in key_fields: if response.get(field): value = response[field] if isinstance(value, str): print(f" Response {field}: \"{self.truncate(value)}\"") elif isinstance(value, dict): # For scores dict, show first few entries preview = dict(list(value.items())[:3]) print(f" Response {field}: {preview}") else: print(f" Response {field}: {value}") def is_error_answer(self, answer): """Check if an answer string indicates an error occurred.""" if not answer: return False answer_lower = answer.lower().strip() # Common error patterns in VQA/caption responses error_patterns = [ 'error', 'exception', 'failed', 'traceback', 'cannot', 'unable to', ] # Check if answer is just an error keyword or starts with one for pattern in error_patterns: if answer_lower == pattern or answer_lower.startswith(f'{pattern}:') or answer_lower.startswith(f'{pattern} '): return True return False def is_meaningful_answer(self, answer, min_length=3): """Check if an answer is meaningful (not just punctuation or too short).""" if not answer: return False # Strip whitespace and check length stripped = answer.strip() if len(stripped) < min_length: return False # Check if it's just punctuation if all(c in '.,!?;:\'"()-_' for c in stripped): return False return True def _check_prefill(self, base_request: dict, test_label: str): """Re-run a VQA request with custom prefill and verify it appears in output.""" req = {**base_request, 'prefill': CUSTOM_PREFILL, 'keep_prefill': True} data = self.post('/sdapi/v1/vqa', req) if 'error' in data: self.log_skip(f"{test_label} prefill: API error") elif data.get('answer') and not self.is_error_answer(data['answer']): if data['answer'].startswith(CUSTOM_PREFILL): self.log_pass(f"{test_label} prefill: output starts with custom prefill") else: self.log_fail(f"{test_label} prefill: expected '{CUSTOM_PREFILL[:30]}...' but got '{data['answer'][:30]}...'") else: self.log_fail(f"{test_label} prefill: empty/error") def get_model_family(self, model_name): """Determine model family from model name.""" name_lower = model_name.lower() for family, patterns in self.VQA_FAMILIES.items(): for pattern in patterns: if pattern in name_lower: # Special case: gemma but not paligemma if family == 'gemma' and 'pali' in name_lower: continue return family return 'unknown' def get_tagger_type(self, model): """Determine tagger type and version from model info.""" model_type = model.get('type', 'unknown') model_name = model.get('name', '').lower() if model_type == 'deepbooru': return 'deepbooru', None elif model_type == 'waifudiffusion': # Determine WD version if 'v3' in model_name: return 'waifudiffusion', 'v3' elif 'v2' in model_name: return 'waifudiffusion', 'v2' else: return 'waifudiffusion', 'v1' return model_type, None # ========================================================================= # HTTP Helpers # ========================================================================= def get(self, endpoint, params=None): """Make GET request and return JSON response.""" url = f"{self.base_url}{endpoint}" try: resp = requests.get(url, params=params, auth=self.auth, timeout=self.timeout, verify=False) resp.raise_for_status() return resp.json() except requests.exceptions.Timeout: return {'error': 'timeout', 'reason': f'Request timed out after {self.timeout}s'} except requests.exceptions.HTTPError as e: try: return {'error': 'http', 'status': e.response.status_code, 'reason': e.response.json().get('detail', str(e))} except Exception: return {'error': 'http', 'status': e.response.status_code, 'reason': str(e)} except Exception as e: return {'error': 'exception', 'reason': str(e)} def _infer_backend_from_endpoint(self, endpoint, json_data): """Infer the backend from the endpoint URL or request data.""" if '/openclip' in endpoint: return 'openclip' elif '/tagger' in endpoint: return 'tagger' elif '/vqa' in endpoint: return 'vlm' elif '/caption' in endpoint: # Dispatch endpoint - check backend field in request return json_data.get('backend', 'openclip') if json_data else 'openclip' return None def post(self, endpoint, json_data, check_critical=True): """Make POST request and return JSON response. Auto-checks for critical errors unless check_critical=False.""" url = f"{self.base_url}{endpoint}" backend = self._infer_backend_from_endpoint(endpoint, json_data) try: resp = requests.post(url, json=json_data, auth=self.auth, timeout=self.timeout, verify=False) resp.raise_for_status() data = resp.json() # Auto-check for critical errors in the response (skip for deliberate error tests) if check_critical: if backend and backend != 'vlm': # VLM backend name differs self._auto_check_critical(data, backend) elif backend == 'vlm': self._auto_check_critical(data, 'vqa') return data except requests.exceptions.Timeout: return {'error': 'timeout', 'reason': f'Request timed out after {self.timeout}s'} except requests.exceptions.HTTPError as e: try: return {'error': 'http', 'status': e.response.status_code, 'reason': e.response.json().get('detail', str(e))} except Exception: return {'error': 'http', 'status': e.response.status_code, 'reason': str(e)} except Exception as e: return {'error': 'exception', 'reason': str(e)} def _auto_check_critical(self, data, backend): """Auto-check response for critical errors (called by post method).""" if not data or self.has_critical_error(backend): return # Check various response fields for critical errors fields_to_check = ['caption', 'tags', 'answer', 'error', 'reason', 'detail'] for field in fields_to_check: value = data.get(field) if value and self.is_critical_error(value): error_msg = f"{field}: {self.truncate(str(value), 100)}" print(f" [CRITICAL] {backend} backend error: {error_msg}") self._critical_errors[backend] = error_msg return # ========================================================================= # Setup and Teardown # ========================================================================= def setup(self): """Load test image and verify server connectivity.""" print("=" * 70) print("CAPTION API TEST SUITE") print("=" * 70) print(f"\nServer: {self.base_url}") print(f"Timeout: {self.timeout}s") # Check server connectivity print("\nChecking server connectivity...") try: resp = requests.get(f"{self.base_url}/sdapi/v1/options", auth=self.auth, timeout=10, verify=False) if resp.status_code == 200: print(" Server is reachable") else: print(f" Warning: Server returned status {resp.status_code}") except Exception as e: print(f" ERROR: Cannot connect to server: {e}") print(" Make sure the server is running with --docs flag") return False # Find and load test image if self.image_path: if os.path.exists(self.image_path): print(f"\nUsing provided image: {self.image_path}") else: print(f"\nERROR: Provided image not found: {self.image_path}") return False else: # Find default test image script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) for img in DEFAULT_TEST_IMAGES: full_path = os.path.join(script_dir, img) if os.path.exists(full_path): self.image_path = full_path print(f"\nUsing default test image: {img}") break if not self.image_path: print("\nERROR: No test image found") return False # Load and encode image try: with open(self.image_path, 'rb') as f: image_data = f.read() self.image_b64 = base64.b64encode(image_data).decode('utf-8') print(f" Image loaded: {len(image_data)} bytes") except Exception as e: print(f" ERROR: Failed to load image: {e}") return False # Load OCR test image (image with readable text) script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ocr_image_path = os.path.join(script_dir, OCR_TEST_IMAGE) if os.path.exists(ocr_image_path): try: with open(ocr_image_path, 'rb') as f: ocr_data = f.read() self.ocr_image_b64 = base64.b64encode(ocr_data).decode('utf-8') print(f" OCR test image loaded: {OCR_TEST_IMAGE} ({len(ocr_data)} bytes)") except Exception as e: print(f" Warning: Failed to load OCR test image: {e}") else: print(f" Warning: OCR test image not found: {OCR_TEST_IMAGE}") # Load bracket test image (image that produces tags with parentheses) bracket_image_path = os.path.join(script_dir, BRACKET_TEST_IMAGE) if os.path.exists(bracket_image_path): try: with open(bracket_image_path, 'rb') as f: bracket_data = f.read() self.bracket_image_b64 = base64.b64encode(bracket_data).decode('utf-8') print(f" Bracket test image loaded: {BRACKET_TEST_IMAGE} ({len(bracket_data)} bytes)") except Exception as e: print(f" Warning: Failed to load bracket test image: {e}") else: print(f" Warning: Bracket test image not found: {BRACKET_TEST_IMAGE}") return True def print_summary(self): """Print test summary by category.""" print("\n" + "=" * 70) print("TEST SUMMARY BY CATEGORY") print("=" * 70) total_passed = 0 total_failed = 0 total_skipped = 0 for category, data in self.results.items(): cat_passed = data['passed'] cat_failed = data['failed'] cat_skipped = data['skipped'] cat_total = cat_passed + cat_failed + cat_skipped if cat_total == 0: continue total_passed += cat_passed total_failed += cat_failed total_skipped += cat_skipped # Calculate success rate (excluding skipped) cat_run = cat_passed + cat_failed if cat_run > 0: pct = cat_passed / cat_run * 100 print(f"\n {category.upper():10} {cat_passed:3}/{cat_run:3} passed ({pct:5.1f}%), {cat_skipped} skipped") else: print(f"\n {category.upper():10} 0/0 tests, {cat_skipped} skipped") # Show failures for this category failures = [(s, m) for s, m in data['tests'] if s == 'failed'] if failures: for _, msg in failures: print(f" [FAIL] {msg}") # Show skipped tests for this category skipped = [(s, m) for s, m in data['tests'] if s == 'skipped'] if skipped: for _, msg in skipped: print(f" [SKIP] {msg}") # Overall totals print("\n" + "-" * 70) overall_run = total_passed + total_failed if overall_run > 0: overall_pct = total_passed / overall_run * 100 print(f"\n TOTAL: {total_passed}/{overall_run} passed ({overall_pct:.1f}%), {total_skipped} skipped") else: print(f"\n TOTAL: 0/0 tests, {total_skipped} skipped") print("\n" + "=" * 70) # ========================================================================= # TEST: GET /sdapi/v1/openclip - List Models # ========================================================================= def test_openclip_list_models(self): """Test GET /sdapi/v1/openclip returns model list.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: GET /sdapi/v1/openclip") print("=" * 70) data = self.get('/sdapi/v1/openclip') # Test 1: Returns list if 'error' in data: self.log_fail(f"Request failed: {data.get('reason', data)}") return if isinstance(data, list): self.log_pass(f"Returns list with {len(data)} models") self._caption_models = data else: self.log_fail(f"Expected list, got {type(data)}") return # Test 2: Contains OpenCLIP models (format: arch/dataset) clip_models = [m for m in data if '/' in m] if clip_models: self.log_pass(f"Contains {len(clip_models)} OpenCLIP models") self.log_info(f"Examples: {clip_models[:3]}") else: self.log_skip("No OpenCLIP models found (may need to download)") # ========================================================================= # TEST: POST /sdapi/v1/openclip - OpenCLIP Modes # ========================================================================= def test_openclip_post_modes(self): """Test all 5 interrogation modes via direct endpoint.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (modes)") print("=" * 70) # Skip if critical error already occurred if self.skip_if_critical('openclip', 'openclip modes'): return # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] self.log_info(f"Using model: {model}") modes = ['best', 'fast', 'classic', 'caption', 'negative'] for mode in modes: # Check for critical error before each mode test if self.has_critical_error('openclip'): self.log_skip(f"mode='{mode}': skipped due to critical error") continue t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': mode }) elapsed = time.time() - t0 # Check for critical error in response if self.check_critical_error(data, 'openclip'): continue if 'error' in data: self.log_skip(f"mode='{mode}': {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"mode='{mode}' returns caption ({len(data['caption'])} chars, {elapsed:.1f}s)") self.log_info(f"Caption: {self.truncate(data['caption'], 60)}") elif self.is_error_answer(data.get('caption', '')): self.log_fail(f"mode='{mode}' returned error: {data['caption']}") else: self.log_fail(f"mode='{mode}' returned empty caption") # ========================================================================= # TEST: POST /sdapi/v1/openclip - Analyze # ========================================================================= def test_openclip_analyze(self): """Test analyze=True returns breakdown fields.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (analyze)") print("=" * 70) # Skip if critical error already occurred if self.skip_if_critical('openclip', 'openclip analyze'): return # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] # Test without analyze data_no_analyze = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'analyze': False }) # Check for critical error if self.check_critical_error(data_no_analyze, 'openclip'): return # Test with analyze data_analyze = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'analyze': True }) # Check for critical error if self.check_critical_error(data_analyze, 'openclip'): return if 'error' in data_analyze: self.log_skip(f"Analyze test: {data_analyze.get('reason', 'failed')}") return # Verify analyze fields present analyze_fields = ['medium', 'artist', 'movement', 'trending', 'flavor'] fields_found = 0 for field in analyze_fields: if data_analyze.get(field): self.log_pass(f"analyze=True returns '{field}'") self.log_info(f" {field}: {self.truncate(data_analyze[field], 40)}") fields_found += 1 else: self.log_skip(f"'{field}' empty or missing (may be image-dependent)") if fields_found == 0: self.log_fail("analyze=True returned no breakdown fields") # Verify fields absent without analyze if 'error' not in data_no_analyze: absent_in_no_analyze = all( data_no_analyze.get(field) is None for field in analyze_fields ) if absent_in_no_analyze: self.log_pass("analyze=False omits breakdown fields") else: self.log_fail("analyze=False should not return breakdown fields") # ========================================================================= # TEST: POST /sdapi/v1/openclip - Invalid Inputs # ========================================================================= def test_openclip_invalid_inputs(self): """Test error handling for invalid inputs.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (invalid inputs)") print("=" * 70) # Test missing image (check_critical=False since we expect errors) data = self.post('/sdapi/v1/openclip', { 'image': '', 'model': 'ViT-L-14/openai' }, check_critical=False) if 'error' in data and data.get('status') == 404: self.log_pass("Missing image returns 404") else: self.log_fail(f"Missing image should return 404, got: {data}") # Test invalid model (check_critical=False since we expect errors) data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': 'invalid-nonexistent-model' }, check_critical=False) if 'error' in data: self.log_pass(f"Invalid model returns error: {data.get('status', 'error')}") else: self.log_fail("Invalid model should return error") # ========================================================================= # TEST: POST /sdapi/v1/openclip - CLIP/BLIP Models # ========================================================================= def test_openclip_clip_blip_models(self): """Test clip_model and blip_model parameter overrides.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (clip_model, blip_model)") print("=" * 70) # Skip if critical error already occurred if self.skip_if_critical('openclip', 'openclip clip_blip_models'): return # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return # Test with explicit clip_model override model = clip_models[0] t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'clip_model': model, # Explicit CLIP model 'mode': 'fast' }) elapsed = time.time() - t0 # Check for critical error if self.check_critical_error(data, 'openclip'): return if 'error' in data: self.log_skip(f"clip_model override: {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"clip_model override accepted ({elapsed:.1f}s)") else: self.log_fail(f"clip_model override returned empty/error: {data.get('caption', '')}") # Check for critical error before continuing if self.has_critical_error('openclip'): return # Test with blip_model override (uses 'caption' mode internally) # Valid blip_model values: 'blip-base', 'blip-large', 'blip2-opt-2.7b', 'blip2-opt-6.7b', 'blip2-flip-t5-xl', 'blip2-flip-t5-xxl' t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'blip_model': 'blip-base', # Use smaller model to test override 'mode': 'caption' }) elapsed = time.time() - t0 # Check for critical error if self.check_critical_error(data, 'openclip'): return if 'error' in data: self.log_skip(f"blip_model override: {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"blip_model='blip-base' override accepted ({elapsed:.1f}s)") else: self.log_fail(f"blip_model override returned empty/error: {data.get('caption', '')}") # ========================================================================= # TEST: POST /sdapi/v1/openclip - Caption Length # ========================================================================= def test_openclip_length(self): """Test max_length constraints.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (max_length)") print("=" * 70) # Skip if critical error already occurred if self.skip_if_critical('openclip', 'openclip length'): return # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] # Test max_length effect by comparing short vs long limits data_max_short = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'caption', 'max_length': 10 # Very short }) # Check for critical error if self.check_critical_error(data_max_short, 'openclip'): return data_max_long = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'caption', 'max_length': 100 # Longer }) # Check for critical error if self.check_critical_error(data_max_long, 'openclip'): return if 'error' in data_max_short or 'error' in data_max_long: self.log_skip("max_length test: API error") elif data_max_short.get('caption') and data_max_long.get('caption'): len_short = len(data_max_short['caption']) len_long = len(data_max_long['caption']) self.log_info(f"max_length=10: {len_short} chars - '{data_max_short['caption'][:50]}...'") self.log_info(f"max_length=100: {len_long} chars - '{data_max_long['caption'][:50]}...'") if len_short < len_long: self.log_pass(f"max_length has effect: {len_short} < {len_long} chars") elif len_short == len_long: self.log_skip(f"max_length no effect detected (both {len_short} chars, may be model limit)") else: self.log_fail(f"max_length reversed: short={len_short}, long={len_long}") else: self.log_fail("max_length test returned empty captions") # ========================================================================= # TEST: POST /sdapi/v1/openclip - Flavors # ========================================================================= def test_openclip_flavors(self): """Test min_flavors and max_flavors controls.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (min_flavors, max_flavors)") print("=" * 70) # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] # Test max_flavors effect by comparing few vs many data_few = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'fast', 'max_flavors': 3 # Fewer flavor tags }) data_many = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'fast', 'max_flavors': 20 # More flavor tags }) if 'error' in data_few or 'error' in data_many: self.log_skip("max_flavors test: API error") elif data_few.get('caption') and data_many.get('caption'): len_few = len(data_few['caption']) len_many = len(data_many['caption']) self.log_info(f"max_flavors=3: {len_few} chars - '{data_few['caption'][:50]}...'") self.log_info(f"max_flavors=20: {len_many} chars - '{data_many['caption'][:50]}...'") if len_many > len_few: self.log_pass(f"max_flavors has effect: {len_few} < {len_many} chars") elif len_many == len_few: self.log_skip(f"max_flavors no effect detected (both {len_few} chars)") else: self.log_fail(f"max_flavors reversed: few={len_few}, many={len_many}") else: self.log_fail("max_flavors test returned empty captions") # Test min_flavors effect: only applies in mode='best' which iterates from min to max flavors # Use a narrow max_flavors window so min_flavors has a visible floor effect data_min_low = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'best', 'min_flavors': 1, 'max_flavors': 3 }) data_min_high = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'best', 'min_flavors': 8, 'max_flavors': 10 }) if 'error' in data_min_low or 'error' in data_min_high: self.log_skip("min_flavors test: API error") elif data_min_low.get('caption') and data_min_high.get('caption'): len_low = len(data_min_low['caption']) len_high = len(data_min_high['caption']) self.log_info(f"min_flavors=1,max=3: {len_low} chars - '{data_min_low['caption'][:50]}...'") self.log_info(f"min_flavors=8,max=10: {len_high} chars - '{data_min_high['caption'][:50]}...'") if len_high > len_low: self.log_pass(f"min_flavors has effect: {len_low} < {len_high} chars") elif len_high == len_low: self.log_fail(f"min_flavors has no effect (both {len_low} chars)") else: self.log_fail(f"min_flavors reversed: low={len_low}, high={len_high}") else: self.log_fail("min_flavors test returned empty captions") # ========================================================================= # TEST: POST /sdapi/v1/openclip - Advanced Settings # ========================================================================= def test_openclip_advanced_settings(self): """Test chunk_size, flavor_count, and num_beams parameters.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/openclip (chunk_size, flavor_count, num_beams)") print("=" * 70) # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] # Test chunk_size override t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'fast', 'chunk_size': 1024 # Batch size for processing candidates }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"chunk_size override: {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"chunk_size=1024 accepted ({elapsed:.1f}s)") self.log_info("NOTE: acceptance-only test, does not verify output effect") else: self.log_fail("chunk_size override returned empty/error") # Test flavor_count override t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'fast', 'flavor_count': 16 # Intermediate candidate pool size }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"flavor_count override: {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"flavor_count=16 accepted ({elapsed:.1f}s)") self.log_info("NOTE: acceptance-only test, does not verify output effect") else: self.log_fail("flavor_count override returned empty/error") # Test num_beams override (beam search for caption generation) t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'caption', 'num_beams': 3 # Beam search paths }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"num_beams override: {data.get('reason', 'failed')}") elif data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"num_beams=3 accepted ({elapsed:.1f}s)") self.log_info("NOTE: acceptance-only test, does not verify output effect") else: self.log_fail("num_beams override returned empty/error") # ========================================================================= # TEST: GET /sdapi/v1/vqa/models - VLM Models List # ========================================================================= def test_vqa_models_list(self): """Test GET /sdapi/v1/vqa/models returns model details.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: GET /sdapi/v1/vqa/models") print("=" * 70) data = self.get('/sdapi/v1/vqa/models') if 'error' in data: self.log_fail(f"Request failed: {data.get('reason', data)}") return # Test 1: Returns list if isinstance(data, list) and len(data) > 0: self.log_pass(f"Returns list with {len(data)} models") self._vqa_models = data else: self.log_fail(f"Expected non-empty list, got {type(data)}") return # Test 2: Check model structure model = data[0] required_fields = ['name', 'repo', 'prompts', 'capabilities'] for field in required_fields: if field in model: self.log_pass(f"Model has '{field}' field") else: self.log_fail(f"Model missing '{field}' field") # Test 3: Capabilities include expected values capabilities_found = set() for m in data: capabilities_found.update(m.get('capabilities', [])) expected = ['caption', 'vqa', 'detection', 'ocr', 'thinking'] for cap in expected: if cap in capabilities_found: self.log_pass(f"Capability '{cap}' found in models") # Log some model names model_names = [m['name'] for m in data[:5]] self.log_info(f"Sample models: {model_names}") # ========================================================================= # TEST: GET /sdapi/v1/vqa/prompts - VLM Prompts List # ========================================================================= def test_vqa_prompts_list(self): """Test GET /sdapi/v1/vqa/prompts returns prompt categories.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: GET /sdapi/v1/vqa/prompts") print("=" * 70) # Test without model filter data = self.get('/sdapi/v1/vqa/prompts') if 'error' in data: self.log_fail(f"Request failed: {data.get('reason', data)}") return # Verify categories expected_categories = ['common', 'florence', 'promptgen', 'moondream'] for cat in expected_categories: if cat in data and isinstance(data[cat], list): self.log_pass(f"Has '{cat}' category with {len(data[cat])} prompts") else: self.log_skip(f"Category '{cat}' missing or empty") # Test with model filter if self._vqa_models and len(self._vqa_models) > 0: model_name = self._vqa_models[0]['name'] data_filtered = self.get('/sdapi/v1/vqa/prompts', params={'model': model_name}) if 'available' in data_filtered: self.log_pass(f"Model filter returns 'available' prompts for '{model_name}'") else: self.log_fail("Model filter should return 'available' field") # ========================================================================= # TEST: POST /sdapi/v1/vqa - Basic Caption # ========================================================================= def test_vqa_caption_basic(self): """Test basic VQA captioning.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/vqa (basic)") print("=" * 70) if self.skip_if_critical('vqa', 'vqa basic'): return t0 = time.time() data = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': 'describe the image' }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"VQA: {data.get('reason', 'failed')} (model may not be loaded)") return answer = data.get('answer', '') if answer and not self.is_error_answer(answer): answer_preview = answer[:100] + '...' if len(answer) > 100 else answer self.log_pass(f"VQA returns answer ({elapsed:.1f}s)") self.log_info(f"Answer: {answer_preview}") elif self.is_error_answer(answer): self.log_fail(f"VQA returned error: {answer}") else: self.log_fail("VQA returned empty answer") # ========================================================================= # TEST: POST /sdapi/v1/vqa - Different Prompts # ========================================================================= def test_vqa_different_prompts(self): """Test different VQA prompts.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/vqa (prompts)") print("=" * 70) if self.skip_if_critical('vqa', 'vqa prompts'): return prompts = ['Short Caption', 'Normal Caption', 'Long Caption'] results = {} for prompt in prompts: t0 = time.time() data = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': prompt }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"prompt='{prompt}': {data.get('reason', 'failed')}") elif data.get('answer') and not self.is_error_answer(data['answer']): results[prompt] = len(data['answer']) self.log_pass(f"prompt='{prompt}' returns answer ({len(data['answer'])} chars, {elapsed:.1f}s)") elif self.is_error_answer(data.get('answer', '')): self.log_fail(f"prompt='{prompt}' returned error: {data['answer']}") else: self.log_fail(f"prompt='{prompt}' returned empty answer") # Length sanity check: Short should be noticeably shorter than Normal/Long if 'Short Caption' in results and 'Normal Caption' in results and 'Long Caption' in results: if results['Short Caption'] >= results['Normal Caption'] or results['Short Caption'] >= results['Long Caption']: self.log_info(f"NOTE: Short ({results['Short Caption']}) >= Normal ({results['Normal Caption']}) or Long ({results['Long Caption']}); LLM output length is non-deterministic and prompt-dependent") if results['Long Caption'] < results['Normal Caption']: self.log_info(f"NOTE: Long ({results['Long Caption']}) < Normal ({results['Normal Caption']}); LLM may interpret length prompts differently per run") # Dual prefill: re-run 'Normal Caption' with custom prefill self._check_prefill({'image': self.image_b64, 'question': 'Normal Caption'}, "different_prompts") # ========================================================================= # TEST: POST /sdapi/v1/vqa - Annotated Image # ========================================================================= def test_vqa_annotated_image(self): """Test include_annotated=True returns annotated image for detection.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/vqa (annotated image)") print("=" * 70) if self.skip_if_critical('vqa', 'vqa annotated'): return # Find a Florence model for detection florence_model = None if self._vqa_models: for m in self._vqa_models: if 'florence' in m['name'].lower(): florence_model = m['name'] break if not florence_model: florence_model = 'Microsoft Florence 2 Base' # Default self.log_info(f"Using default model: {florence_model}") # Test without annotation data_no_annot = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'model': florence_model, 'question': '', 'include_annotated': False }) # Test with annotation t0 = time.time() data_annot = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'model': florence_model, 'question': '', 'include_annotated': True }) elapsed = time.time() - t0 if 'error' in data_annot: self.log_skip(f"Detection test: {data_annot.get('reason', 'failed')} (model may not be loaded)") return # Verify answer present and not an error answer = data_annot.get('answer', '') if self.is_meaningful_answer(answer) and not self.is_error_answer(answer): answer_preview = answer[:100] + '...' if len(answer) > 100 else answer self.log_pass(f"Detection returns answer ({elapsed:.1f}s): {answer_preview}") elif self.is_error_answer(answer): self.log_fail(f"Detection task returned error: {answer}") return else: self.log_fail(f"Detection returned non-meaningful answer: '{answer}'") return # Verify annotated_image field if data_annot.get('annotated_image'): # Verify it's valid base64 try: img_data = base64.b64decode(data_annot['annotated_image']) if len(img_data) > 1000: # Reasonable image size self.log_pass(f"annotated_image returned ({len(img_data)} bytes)") else: self.log_fail("annotated_image too small") except Exception as e: self.log_fail(f"annotated_image invalid base64: {e}") else: # Check if answer contains detection results (bounding boxes) if ' 100 else answer self.log_pass(f"thinking_mode=True ({elapsed:.1f}s, {len(answer)} chars)") self.log_info(f"Answer: {answer_preview}") else: self.log_fail("thinking_mode=True returned empty/error") # Test with keep_thinking=True t0 = time.time() data_keep = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'model': thinking_model, 'question': 'What is happening in this image?', 'thinking_mode': True, 'keep_thinking': True }) elapsed = time.time() - t0 if 'error' in data_keep: self.log_skip(f"keep_thinking=True test: {data_keep.get('reason', 'failed')}") elif data_keep.get('answer') and not self.is_error_answer(data_keep['answer']): answer = data_keep['answer'] # Thinking trace is reformatted: →"Reasoning:" and →"Answer:" by strip_think_xml_tags() has_thinking = 'reasoning:' in answer.lower() or ' tags for this input") # Show first part of answer (may include thinking trace) answer_preview = answer[:150] + '...' if len(answer) > 150 else answer self.log_info(f"Answer: {answer_preview}") else: self.log_fail("keep_thinking=True returned empty/error") # ========================================================================= # TEST: POST /sdapi/v1/vqa - Prefill # ========================================================================= def test_vqa_prefill(self): """Test prefill and keep_prefill parameters.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/vqa (prefill, keep_prefill)") print("=" * 70) if self.skip_if_critical('vqa', 'vqa prefill'): return prefill_text = "Vlado is the best, and I'm looking at his robot which" # Test with prefill to guide response start t0 = time.time() data_prefill = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': 'describe the image', 'prefill': prefill_text }) elapsed = time.time() - t0 if 'error' in data_prefill: self.log_skip(f"prefill test: {data_prefill.get('reason', 'failed')}") return answer = data_prefill.get('answer', '') if answer and not self.is_error_answer(answer): self.log_pass(f"prefill accepted ({elapsed:.1f}s)") self.log_info(f"Prefill: '{prefill_text}'") self.log_info(f"Answer: {answer[:100]}...") elif self.is_error_answer(answer): self.log_fail(f"prefill returned error: {answer}") else: self.log_fail("prefill returned empty answer") # Test with keep_prefill=True (include prefill in output) t0 = time.time() data_keep = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': 'describe the image', 'prefill': prefill_text, 'keep_prefill': True }) elapsed = time.time() - t0 if 'error' in data_keep: self.log_skip(f"keep_prefill=True test: {data_keep.get('reason', 'failed')}") elif data_keep.get('answer') and not self.is_error_answer(data_keep['answer']): answer_keep = data_keep['answer'] self.log_info(f"keep_prefill=True answer: {answer_keep[:80]}...") if answer_keep.startswith(prefill_text): self.log_pass(f"keep_prefill=True includes prefill in output ({elapsed:.1f}s)") else: self.log_fail(f"keep_prefill=True should start with '{prefill_text}' but got: '{answer_keep[:40]}...'") else: self.log_fail("keep_prefill=True returned empty/error") # Test with keep_prefill=False (strip prefill from output) t0 = time.time() data_strip = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': 'describe the image', 'prefill': prefill_text, 'keep_prefill': False }) elapsed = time.time() - t0 if 'error' in data_strip: self.log_skip(f"keep_prefill=False test: {data_strip.get('reason', 'failed')}") elif data_strip.get('answer') and not self.is_error_answer(data_strip['answer']): answer_strip = data_strip['answer'] self.log_info(f"keep_prefill=False answer: {answer_strip[:80]}...") if not answer_strip.startswith(prefill_text): self.log_pass(f"keep_prefill=False strips prefill from output ({elapsed:.1f}s)") else: self.log_fail("keep_prefill=False should strip prefill but answer still starts with it") else: self.log_fail("keep_prefill=False returned empty/error") # ========================================================================= # TEST: VQA Model Architectures # ========================================================================= def test_vqa_model_architectures(self): """Test all VQA model architecture families.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: VQA Model Architectures") print("=" * 70) if self.skip_if_critical('vqa', 'vqa architectures'): return if not self._vqa_models: self._vqa_models = self.get('/sdapi/v1/vqa/models') if 'error' in self._vqa_models or not isinstance(self._vqa_models, list): self.log_skip("Cannot get VQA model list") return # Group models by family families_found = {} for model in self._vqa_models: family = self.get_model_family(model['name']) if family not in families_found: families_found[family] = model['name'] self.log_info(f"Found {len(families_found)} model families: {list(families_found.keys())}") # Report which families are present vs absent for family in self.VQA_FAMILIES.keys(): if family in families_found: self.log_pass(f"Architecture '{family}' available: {families_found[family]}") else: self.log_skip(f"Architecture '{family}' not available") # Report unknown models if 'unknown' in families_found: unknown_models = [m['name'] for m in self._vqa_models if self.get_model_family(m['name']) == 'unknown'] self.log_info(f"Unrecognized models: {unknown_models[:5]}") # ========================================================================= # TEST: VQA Florence Special Prompts # ========================================================================= def test_vqa_florence_special_prompts(self): """Test Florence-2 specific prompts for detection, OCR, etc.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: VQA Florence Special Prompts") print("=" * 70) if self.skip_if_critical('vqa', 'vqa florence prompts'): return # Find Florence models: base and PromptGen (which supports extra prompts) florence_model = None promptgen_model = None if self._vqa_models: for m in self._vqa_models: name_lower = m['name'].lower() if 'promptgen' in name_lower and promptgen_model is None: promptgen_model = m['name'] elif 'florence' in name_lower and 'promptgen' not in name_lower and 'cog' not in name_lower and florence_model is None: florence_model = m['name'] if not florence_model: self.log_skip("No Florence model available") return self.log_info(f"Using Florence model: {florence_model}") if promptgen_model: self.log_info(f"Using PromptGen model: {promptgen_model}") # Base Florence prompts (supported by all Florence models) base_prompts = { '': 'Object Detection', '': 'Optical Character Recognition', '': 'Dense Region Captioning', '': 'Standard Caption', '': 'Detailed Caption', } # PromptGen-only prompts (require MiaoshouAI PromptGen fine-tune) promptgen_prompts = { '': 'Tag Generation', } def run_florence_prompt(model, prompt, description): # Use OCR test image for OCR prompts (image with readable text) if prompt == '' and self.ocr_image_b64: test_image = self.ocr_image_b64 else: test_image = self.image_b64 t0 = time.time() data = self.post('/sdapi/v1/vqa', { 'image': test_image, 'model': model, 'question': prompt }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"{description} ({prompt}): {data.get('reason', 'failed')}") elif self.is_meaningful_answer(data.get('answer')) and not self.is_error_answer(data['answer']): answer_preview = data['answer'][:60] + '...' if len(data['answer']) > 60 else data['answer'] self.log_pass(f"{description} ({prompt}): {elapsed:.1f}s") self.log_info(f" Answer: {answer_preview}") elif data.get('answer'): # Got an answer but it's not meaningful (e.g., just punctuation) self.log_fail(f"{description} ({prompt}): non-meaningful response: '{data['answer']}'") else: self.log_fail(f"{description} ({prompt}): empty/error response") for prompt, description in base_prompts.items(): run_florence_prompt(florence_model, prompt, description) for prompt, description in promptgen_prompts.items(): if promptgen_model: run_florence_prompt(promptgen_model, prompt, f"{description} [PromptGen]") else: self.log_skip(f"{description} ({prompt}): requires PromptGen model, none available") # ========================================================================= # TEST: VQA Moondream Detection Features # ========================================================================= def test_vqa_moondream_detection(self): """Test Moondream detection and pointing features.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: VQA Moondream Detection Features") print("=" * 70) if self.skip_if_critical('vqa', 'vqa moondream'): return # Find a Moondream model moondream_model = None moondream_version = None if self._vqa_models: for m in self._vqa_models: if 'moondream' in m['name'].lower(): moondream_model = m['name'] # Detect version if '3' in m['name']: moondream_version = 3 elif '2' in m['name']: moondream_version = 2 else: moondream_version = 1 break if not moondream_model: self.log_skip("No Moondream model available") return self.log_info(f"Using Moondream model: {moondream_model} (v{moondream_version})") # Moondream-specific prompts moondream_prompts = [ ('Point at the main subject', 'Point detection'), ('Detect all objects', 'Object detection'), ('What is in the center of the image?', 'Region query'), ] # Add gaze detection for Moondream 2+ if moondream_version and moondream_version >= 2: moondream_prompts.append(('Detect Gaze', 'Gaze detection')) for prompt, description in moondream_prompts: t0 = time.time() data = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'model': moondream_model, 'question': 'Use Prompt', 'prompt': prompt, 'include_annotated': True }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"{description}: {data.get('reason', 'failed')}") elif self.is_meaningful_answer(data.get('answer')) and not self.is_error_answer(data['answer']): answer_preview = data['answer'][:60] + '...' if len(data['answer']) > 60 else data['answer'] has_annotated = bool(data.get('annotated_image')) self.log_pass(f"{description}: {elapsed:.1f}s (annotated={has_annotated})") self.log_info(f" Answer: {answer_preview}") elif data.get('answer'): self.log_fail(f"{description}: non-meaningful response: '{data['answer']}'") else: self.log_skip(f"{description}: may not be supported by this model version") # ========================================================================= # TEST: VQA Architecture Capabilities # ========================================================================= def test_vqa_architecture_capabilities(self): """Test architecture-specific capabilities like vision, thinking, detection.""" self.set_category('vqa') print("\n" + "=" * 70) print("TEST: VQA Architecture Capabilities") print("=" * 70) if self.skip_if_critical('vqa', 'vqa capabilities'): return if not self._vqa_models: self._vqa_models = self.get('/sdapi/v1/vqa/models') if 'error' in self._vqa_models or not isinstance(self._vqa_models, list): self.log_skip("Cannot get VQA model list") return # Collect all capabilities across models capability_models = {} for model in self._vqa_models: caps = model.get('capabilities', []) for cap in caps: if cap not in capability_models: capability_models[cap] = [] capability_models[cap].append(model['name']) self.log_info(f"Found {len(capability_models)} capabilities: {list(capability_models.keys())}") # Test each capability with one model capability_tests = { 'caption': 'describe the image', 'vqa': 'What is the main subject of this image?', 'detection': '', 'ocr': '', 'thinking': 'Analyze this image step by step', } for capability, test_prompt in capability_tests.items(): if capability not in capability_models: self.log_skip(f"Capability '{capability}': no models available") continue # Use first available model with this capability model_name = capability_models[capability][0] self.log_info(f"Testing '{capability}' with: {model_name}") # Use OCR test image for OCR capability (image with readable text) if capability == 'ocr' and self.ocr_image_b64: test_image = self.ocr_image_b64 else: test_image = self.image_b64 request_data = { 'image': test_image, 'model': model_name, 'question': test_prompt } # Enable thinking mode for thinking capability test if capability == 'thinking': request_data['thinking_mode'] = True t0 = time.time() data = self.post('/sdapi/v1/vqa', request_data) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Capability '{capability}': {data.get('reason', 'model not loaded')}") elif self.is_meaningful_answer(data.get('answer')) and not self.is_error_answer(data['answer']): answer = data['answer'] answer_preview = answer[:80] + '...' if len(answer) > 80 else answer self.log_pass(f"Capability '{capability}' ({elapsed:.1f}s): {answer_preview}") if elapsed > 60: self.log_info(f"NOTE: {model_name} took {elapsed:.1f}s which is suspiciously slow; may need performance investigation") elif data.get('answer'): self.log_fail(f"Capability '{capability}': non-meaningful response: '{data['answer']}'") else: self.log_fail(f"Capability '{capability}': empty/error response") # ========================================================================= # TEST: OpenCLIP BLIP Architectures # ========================================================================= def test_openclip_blip_architectures(self): """Test all BLIP caption model types.""" self.set_category('openclip') print("\n" + "=" * 70) print("TEST: OpenCLIP BLIP Architectures") print("=" * 70) if self.skip_if_critical('openclip', 'openclip blip'): return # Check if we have OpenCLIP models (needed for caption endpoint) if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available for BLIP testing") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] for blip_model in self.BLIP_MODELS: t0 = time.time() data = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'caption', 'blip_model': blip_model }) elapsed = time.time() - t0 if 'error' in data: # Check if it's a model not found error vs other error reason = data.get('reason', '') if 'not found' in str(reason).lower() or data.get('status') == 404: self.log_skip(f"BLIP '{blip_model}': model not downloaded") else: self.log_skip(f"BLIP '{blip_model}': {reason}") elif data.get('caption') and not self.is_error_answer(data['caption']): caption_preview = data['caption'][:70] + '...' if len(data['caption']) > 70 else data['caption'] self.log_pass(f"BLIP '{blip_model}' ({elapsed:.1f}s): {caption_preview}") else: self.log_fail(f"BLIP '{blip_model}': empty/error response") # ========================================================================= # TEST: GET /sdapi/v1/tagger/models - Tagger Models List # ========================================================================= def test_tagger_models_list(self): """Test GET /sdapi/v1/tagger/models returns model list.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: GET /sdapi/v1/tagger/models") print("=" * 70) data = self.get('/sdapi/v1/tagger/models') if 'error' in data: self.log_fail(f"Request failed: {data.get('reason', data)}") return # Test 1: Returns list if isinstance(data, list) and len(data) > 0: self.log_pass(f"Returns list with {len(data)} models") self._tagger_models = data else: self.log_fail(f"Expected non-empty list, got {type(data)}") return # Test 2: Check model structure model = data[0] if 'name' in model and 'type' in model: self.log_pass("Models have 'name' and 'type' fields") else: self.log_fail("Models missing required fields") # Test 3: Contains deepbooru has_deepbooru = any(m.get('name') == 'deepbooru' or m.get('type') == 'deepbooru' for m in data) if has_deepbooru: self.log_pass("Contains DeepBooru model") else: self.log_fail("Missing DeepBooru model") # Test 4: Contains WaifuDiffusion models wd_models = [m for m in data if m.get('type') == 'waifudiffusion'] if wd_models: self.log_pass(f"Contains {len(wd_models)} WaifuDiffusion models") self.log_info(f"Models: {[m['name'] for m in wd_models[:3]]}") else: self.log_skip("No WaifuDiffusion models found") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Basic # ========================================================================= def test_tagger_basic(self): """Test basic tagging.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (basic)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger basic'): return t0 = time.time() data = self.post('/sdapi/v1/tagger', { 'image': self.image_b64 }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Tagger: {data.get('reason', 'failed')} (model may not be loaded)") return tags = data.get('tags', '') if tags and not self.is_error_answer(tags): tags_preview = tags[:80] + '...' if len(tags) > 80 else tags tag_count = len(tags.split(', ')) self.log_pass(f"Returns tags ({tag_count} tags, {elapsed:.1f}s)") self.log_info(f"Tags: {tags_preview}") elif self.is_error_answer(tags): self.log_fail(f"Tagger returned error: {tags}") else: self.log_fail("Tagger returned empty tags") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Threshold # ========================================================================= def test_tagger_threshold(self): """Test threshold affects tag count.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (threshold)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger threshold'): return data_high = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'threshold': 0.9 }) data_low = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'threshold': 0.1 }) if 'error' in data_high or 'error' in data_low: self.log_skip("Threshold test: model not loaded") return tags_high = data_high.get('tags', '') tags_low = data_low.get('tags', '') count_high = len(tags_high.split(', ')) if tags_high else 0 count_low = len(tags_low.split(', ')) if tags_low else 0 self.log_info(f"threshold=0.9 ({count_high} tags): {tags_high[:70]}{'...' if len(tags_high) > 70 else ''}") self.log_info(f"threshold=0.1 ({count_low} tags): {tags_low[:70]}{'...' if len(tags_low) > 70 else ''}") if count_low > count_high: self.log_pass(f"threshold has effect: 0.9={count_high} tags < 0.1={count_low} tags") elif count_high == 0 and count_low == 0: self.log_skip("No tags returned (model may not be loaded)") else: self.log_fail(f"threshold no effect: 0.9={count_high}, 0.1={count_low}") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Max Tags # ========================================================================= def test_tagger_max_tags(self): """Test max_tags limits output count.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (max_tags)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger max_tags'): return data_5 = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'max_tags': 5, 'threshold': 0.1 }) data_50 = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'max_tags': 50, 'threshold': 0.1 }) if 'error' in data_5 or 'error' in data_50: self.log_skip("max_tags test: model not loaded") return tags_5 = data_5.get('tags', '') tags_50 = data_50.get('tags', '') count_5 = len(tags_5.split(', ')) if tags_5 else 0 count_50 = len(tags_50.split(', ')) if tags_50 else 0 self.log_info(f"max_tags=5 ({count_5} tags): {tags_5}") self.log_info(f"max_tags=50 ({count_50} tags): {tags_50[:80]}{'...' if len(tags_50) > 80 else ''}") if count_5 <= 5: self.log_pass(f"max_tags=5 correctly limits to {count_5} tags") else: self.log_fail(f"max_tags=5 returned {count_5} tags (expected <= 5)") if count_50 > count_5: self.log_pass(f"max_tags=50 returns more: {count_5} < {count_50} tags") else: self.log_fail("max_tags=50 should return more than max_tags=5") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Sort Alpha # ========================================================================= def test_tagger_sort_alpha(self): """Test sort_alpha sorts tags alphabetically.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (sort_alpha)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger sort_alpha'): return data_conf = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'sort_alpha': False, 'max_tags': 20, 'threshold': 0.1 }) data_alpha = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'sort_alpha': True, 'max_tags': 20, 'threshold': 0.1 }) if 'error' in data_conf or 'error' in data_alpha: self.log_skip("sort_alpha test: model not loaded") return list_conf = [t.strip() for t in data_conf.get('tags', '').split(',') if t.strip()] list_alpha = [t.strip() for t in data_alpha.get('tags', '').split(',') if t.strip()] if len(list_alpha) < 2: self.log_skip("Not enough tags to test sorting") return self.log_info(f"By confidence: {', '.join(list_conf[:8])}...") self.log_info(f"Alphabetical: {', '.join(list_alpha[:8])}...") is_sorted = list_alpha == sorted(list_alpha, key=str.lower) if is_sorted: self.log_pass("sort_alpha=True returns alphabetically sorted tags") else: self.log_fail("sort_alpha=True did not sort tags alphabetically") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Use Spaces # ========================================================================= def test_tagger_use_spaces(self): """Test use_spaces converts underscores to spaces.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (use_spaces)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger use_spaces'): return data_under = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'use_spaces': False, 'max_tags': 20, 'threshold': 0.1 }) data_space = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'use_spaces': True, 'max_tags': 20, 'threshold': 0.1 }) if 'error' in data_under or 'error' in data_space: self.log_skip("use_spaces test: model not loaded") return tags_under = data_under.get('tags', '') tags_space = data_space.get('tags', '') self.log_info(f"use_spaces=False: {tags_under[:60]}...") self.log_info(f"use_spaces=True: {tags_space[:60]}...") # Check if underscores are converted to spaces has_underscore_before = '_' in tags_under has_underscore_after = '_' in tags_space.replace(', ', ',') # ignore comma-space if has_underscore_before and not has_underscore_after: self.log_pass("use_spaces=True converts underscores to spaces") elif not has_underscore_before: self.log_skip("No underscores in tags to convert") else: self.log_fail("use_spaces=True did not convert underscores") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Escape Brackets # ========================================================================= def test_tagger_escape_brackets(self): """Test escape_brackets escapes parentheses.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (escape_brackets)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger escape_brackets'): return # Use bracket test image (produces tags with parentheses like "pokemon_(creature)") test_image = self.bracket_image_b64 or self.image_b64 if not self.bracket_image_b64: self.log_info("NOTE: bracket test image not available, using default image (may not produce bracket tags)") data_escaped = self.post('/sdapi/v1/tagger', { 'image': test_image, 'escape_brackets': True, 'max_tags': 50, 'threshold': 0.1 }) data_raw = self.post('/sdapi/v1/tagger', { 'image': test_image, 'escape_brackets': False, 'max_tags': 50, 'threshold': 0.1 }) if 'error' in data_escaped or 'error' in data_raw: self.log_skip("escape_brackets test: model not loaded") return tags_escaped = data_escaped.get('tags', '') tags_raw = data_raw.get('tags', '') self.log_info(f"escape=True: {tags_escaped[:70]}...") self.log_info(f"escape=False: {tags_raw[:70]}...") # Check for escaped brackets (\\( or \\)) has_escaped = '\\(' in tags_escaped or '\\)' in tags_escaped has_unescaped = '(' in tags_raw.replace('\\(', '') or ')' in tags_raw.replace('\\)', '') if has_escaped: self.log_pass("escape_brackets=True escapes parentheses") elif has_unescaped: self.log_fail("escape_brackets=True did not escape parentheses") else: self.log_skip("No brackets in tags to escape") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Exclude Tags # ========================================================================= def test_tagger_exclude_tags(self): """Test exclude_tags removes specified tags.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (exclude_tags)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger exclude_tags'): return data_all = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'max_tags': 50, 'threshold': 0.1, 'exclude_tags': '' }) if 'error' in data_all: self.log_skip("exclude_tags test: model not loaded") return tag_list = [t.strip().replace(' ', '_') for t in data_all.get('tags', '').split(',') if t.strip()] if len(tag_list) < 2: self.log_skip("Not enough tags to test exclusion") return # Exclude the first tag tag_to_exclude = tag_list[0] data_filtered = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'max_tags': 50, 'threshold': 0.1, 'exclude_tags': tag_to_exclude }) if 'error' in data_filtered: self.log_skip("exclude_tags filtered request failed") return self.log_info(f"Excluding tag: '{tag_to_exclude}'") self.log_info(f"Before: {data_all.get('tags', '')[:60]}...") self.log_info(f"After: {data_filtered.get('tags', '')[:60]}...") # Check if the tag was removed filtered_list = [t.strip().replace(' ', '_') for t in data_filtered.get('tags', '').split(',') if t.strip()] tag_space_variant = tag_to_exclude.replace('_', ' ') tag_present = tag_to_exclude in filtered_list or tag_space_variant in [t.strip() for t in data_filtered.get('tags', '').split(',')] if not tag_present: self.log_pass(f"exclude_tags removes '{tag_to_exclude}'") else: self.log_fail(f"exclude_tags did not remove '{tag_to_exclude}'") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Show Scores # ========================================================================= def test_tagger_show_scores(self): """Test show_scores returns scores dict.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (show_scores)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger show_scores'): return data_no_scores = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'show_scores': False, 'max_tags': 5 }) data_scores = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'show_scores': True, 'max_tags': 5 }) if 'error' in data_no_scores or 'error' in data_scores: self.log_skip("show_scores test: model not loaded") return # Show the actual output tags_with_scores = data_scores.get('tags', '') tags_no_scores = data_no_scores.get('tags', '') self.log_info(f"show_scores=True tags: {tags_with_scores}") self.log_info(f"show_scores=False tags: {tags_no_scores}") # Check scores dict is returned if 'scores' in data_scores and isinstance(data_scores['scores'], dict) and len(data_scores['scores']) > 0: scores_dict = data_scores['scores'] # Show first few scores scores_preview = dict(list(scores_dict.items())[:3]) self.log_info(f"scores dict (first 3): {scores_preview}") self.log_pass(f"show_scores=True returns scores dict with {len(scores_dict)} entries") # Verify scores are floats 0-1 scores = list(scores_dict.values()) if all(isinstance(s, (int, float)) and 0 <= s <= 1 for s in scores): self.log_pass("All scores are floats in 0-1 range") else: self.log_fail(f"Some scores out of range: {scores}") else: self.log_fail("show_scores=True did not return scores dict") # Check tags contain scores (colon notation) if ':' in tags_with_scores: self.log_pass("Tags string includes score notation (:)") else: self.log_skip("Tags string does not include inline scores") # Check scores absent without flag if data_no_scores.get('scores') is None: self.log_pass("show_scores=False omits scores dict") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Include Rating # ========================================================================= def test_tagger_include_rating(self): """Test include_rating adds rating tags.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (include_rating)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger include_rating'): return data_no_rating = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'include_rating': False, 'max_tags': 100, 'threshold': 0.01 }) data_rating = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'include_rating': True, 'max_tags': 100, 'threshold': 0.01 }) if 'error' in data_no_rating or 'error' in data_rating: self.log_skip("include_rating test: model not loaded") return tags_no_rating = data_no_rating.get('tags', '').lower() tags_rating = data_rating.get('tags', '').lower() self.log_info(f"include_rating=False: {tags_no_rating[:60]}...") self.log_info(f"include_rating=True: {tags_rating[:60]}...") # Rating tags typically are like "safe", "questionable", "explicit", "general", "sensitive" rating_keywords = ['rating:', 'safe', 'questionable', 'explicit', 'general', 'sensitive'] has_rating_before = any(kw in tags_no_rating for kw in rating_keywords) has_rating_after = any(kw in tags_rating for kw in rating_keywords) if has_rating_after and not has_rating_before: self.log_pass("include_rating=True adds rating tags") elif has_rating_after and has_rating_before: self.log_skip("Rating tags appear in both (threshold may be very low)") elif not has_rating_after: self.log_skip("No rating tags detected") else: self.log_fail("include_rating did not work as expected") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Character Threshold # ========================================================================= def test_tagger_character_threshold(self): """Test character_threshold for character-specific tags (WaifuDiffusion only).""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (character_threshold)") print("=" * 70) if self.skip_if_critical('tagger', 'tagger character_threshold'): return # Find a WaifuDiffusion model (character_threshold only applies to WD models) wd_model = None if self._tagger_models: for m in self._tagger_models: if m.get('type') == 'waifudiffusion': wd_model = m['name'] break if not wd_model: self.log_skip("No WaifuDiffusion models available (character_threshold only applies to WD)") return self.log_info(f"Using WaifuDiffusion model: {wd_model}") # Test with low character_threshold (more character tags) t0 = time.time() data_low = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'model': wd_model, 'character_threshold': 0.5, 'threshold': 0.1, 'max_tags': 100 }) elapsed_low = time.time() - t0 # Test with high character_threshold (fewer character tags) t0 = time.time() data_high = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'model': wd_model, 'character_threshold': 0.99, 'threshold': 0.1, 'max_tags': 100 }) elapsed_high = time.time() - t0 if 'error' in data_low: self.log_skip(f"character_threshold=0.5 test: {data_low.get('reason', 'failed')}") elif data_low.get('tags') and not self.is_error_answer(data_low['tags']): self.log_pass(f"character_threshold=0.5 accepted ({elapsed_low:.1f}s)") else: self.log_fail("character_threshold=0.5 returned empty/error") if 'error' in data_high: self.log_skip(f"character_threshold=0.99 test: {data_high.get('reason', 'failed')}") elif data_high.get('tags') and not self.is_error_answer(data_high['tags']): self.log_pass(f"character_threshold=0.99 accepted ({elapsed_high:.1f}s)") # Compare tag counts - higher threshold should have fewer (or same) character tags count_low = len(data_low.get('tags', '').split(', ')) count_high = len(data_high.get('tags', '').split(', ')) self.log_info(f"Tag counts: threshold=0.5→{count_low}, threshold=0.99→{count_high}") if count_low > count_high: self.log_pass(f"character_threshold affects tag filtering: {count_low} > {count_high}") elif count_low == count_high: self.log_info("NOTE: acceptance-only test, tag counts identical; test image likely has no character tags (character_threshold only filters anime character names)") else: self.log_fail(f"character_threshold reversed: low={count_low} < high={count_high}") else: self.log_fail("character_threshold=0.99 returned empty/error") # ========================================================================= # TEST: Tagger Model Types (Architecture Coverage) # ========================================================================= def test_tagger_model_types(self): """Test all tagger model types (deepbooru, waifudiffusion).""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: Tagger Model Types") print("=" * 70) if self.skip_if_critical('tagger', 'tagger model types'): return if not self._tagger_models: self._tagger_models = self.get('/sdapi/v1/tagger/models') if 'error' in self._tagger_models or not isinstance(self._tagger_models, list): self.log_skip("Cannot get tagger model list") return # Group models by type types_found = {} for model in self._tagger_models: model_type, version = self.get_tagger_type(model) type_key = f"{model_type}" + (f"-{version}" if version else "") if type_key not in types_found: types_found[type_key] = model['name'] self.log_info(f"Found {len(types_found)} tagger types: {list(types_found.keys())}") # Test one model from each type for type_key, model_name in types_found.items(): t0 = time.time() data = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'model': model_name, 'max_tags': 10, 'threshold': 0.3 }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Type '{type_key}' ({model_name}): {data.get('reason', 'failed')}") elif data.get('tags') and not self.is_error_answer(data['tags']): tags = data['tags'] tag_count = len(tags.split(', ')) tags_preview = tags[:60] + '...' if len(tags) > 60 else tags self.log_pass(f"Type '{type_key}' ({elapsed:.1f}s, {tag_count} tags): {tags_preview}") else: self.log_fail(f"Type '{type_key}' ({model_name}): empty/error response") # ========================================================================= # TEST: Tagger WaifuDiffusion Versions # ========================================================================= def test_tagger_wd_versions(self): """Test WaifuDiffusion version differences (v2 vs v3).""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: Tagger WaifuDiffusion Versions") print("=" * 70) if self.skip_if_critical('tagger', 'tagger wd_versions'): return if not self._tagger_models: self._tagger_models = self.get('/sdapi/v1/tagger/models') if 'error' in self._tagger_models or not isinstance(self._tagger_models, list): self.log_skip("Cannot get tagger model list") return # Find WD v2 and v3 models wd_v2 = None wd_v3 = None for model in self._tagger_models: if model.get('type') == 'waifudiffusion': name_lower = model['name'].lower() if 'v3' in name_lower and not wd_v3: wd_v3 = model['name'] elif 'v2' in name_lower and not wd_v2: wd_v2 = model['name'] if not wd_v2 and not wd_v3: self.log_skip("No WaifuDiffusion models available") return results = {} # Test WD v2 if wd_v2: self.log_info(f"Testing WD v2: {wd_v2}") t0 = time.time() data_v2 = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'model': wd_v2, 'max_tags': 20, 'threshold': 0.3 }) elapsed = time.time() - t0 if 'error' in data_v2: self.log_skip(f"WD v2: {data_v2.get('reason', 'failed')}") elif data_v2.get('tags') and not self.is_error_answer(data_v2['tags']): tag_count = len(data_v2['tags'].split(', ')) self.log_pass(f"WD v2 ({wd_v2}): {tag_count} tags ({elapsed:.1f}s)") results['v2'] = data_v2['tags'] else: self.log_fail("WD v2: empty/error response") else: self.log_skip("No WD v2 model available") # Test WD v3 if wd_v3: self.log_info(f"Testing WD v3: {wd_v3}") t0 = time.time() data_v3 = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'model': wd_v3, 'max_tags': 20, 'threshold': 0.3 }) elapsed = time.time() - t0 if 'error' in data_v3: self.log_skip(f"WD v3: {data_v3.get('reason', 'failed')}") elif data_v3.get('tags') and not self.is_error_answer(data_v3['tags']): tag_count = len(data_v3['tags'].split(', ')) self.log_pass(f"WD v3 ({wd_v3}): {tag_count} tags ({elapsed:.1f}s)") results['v3'] = data_v3['tags'] else: self.log_fail("WD v3: empty/error response") else: self.log_skip("No WD v3 model available") # Compare outputs if both available if 'v2' in results and 'v3' in results: v2_tags = {t.strip() for t in results['v2'].split(',')} v3_tags = {t.strip() for t in results['v3'].split(',')} common = len(v2_tags & v3_tags) v2_only = len(v2_tags - v3_tags) v3_only = len(v3_tags - v2_tags) self.log_info(f"Tag comparison: {common} common, {v2_only} v2-only, {v3_only} v3-only") # ========================================================================= # TEST: POST /sdapi/v1/tagger - Invalid Inputs # ========================================================================= def test_tagger_invalid_inputs(self): """Test error handling for invalid inputs.""" self.set_category('tagger') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/tagger (invalid inputs)") print("=" * 70) # Test missing image (check_critical=False since we expect errors) data = self.post('/sdapi/v1/tagger', { 'image': '' }, check_critical=False) if 'error' in data and data.get('status') == 404: self.log_pass("Missing image returns 404") else: self.log_fail(f"Missing image should return 404, got: {data}") # ========================================================================= # DISPATCH ENDPOINT TESTS # ========================================================================= def test_dispatch_openclip_basic(self): """Test dispatch endpoint routes to OpenCLIP backend.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: openclip)") print("=" * 70) if self.skip_if_critical('openclip', 'dispatch openclip'): return # Check if we have OpenCLIP models if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] t0 = time.time() data = self.post('/sdapi/v1/caption', { 'backend': 'openclip', 'image': self.image_b64, 'model': model, 'mode': 'fast' }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Dispatch to openclip: {data.get('reason', 'failed')}") return # Verify backend field if data.get('backend') == 'openclip': self.log_pass("backend='openclip' returned in response") else: self.log_fail(f"Expected backend='openclip', got '{data.get('backend')}'") # Verify caption field if data.get('caption') and not self.is_error_answer(data['caption']): self.log_pass(f"Dispatch to openclip returns caption ({elapsed:.1f}s)") self.log_info(f"Caption: {self.truncate(data['caption'], 60)}") else: self.log_fail("Dispatch to openclip returned empty/error caption") def test_dispatch_openclip_modes(self): """Test all OpenCLIP modes via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: openclip modes)") print("=" * 70) if self.skip_if_critical('openclip', 'dispatch openclip modes'): return if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] modes = ['best', 'fast', 'classic', 'caption', 'negative'] for mode in modes: t0 = time.time() data = self.post('/sdapi/v1/caption', { 'backend': 'openclip', 'image': self.image_b64, 'model': model, 'mode': mode }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"dispatch openclip mode='{mode}': {data.get('reason', 'failed')}") elif data.get('caption') and data.get('backend') == 'openclip': self.log_pass(f"dispatch openclip mode='{mode}' ({elapsed:.1f}s)") else: self.log_fail(f"dispatch openclip mode='{mode}' failed") def test_dispatch_openclip_analyze(self): """Test OpenCLIP analyze via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: openclip analyze)") print("=" * 70) if self.skip_if_critical('openclip', 'dispatch openclip analyze'): return if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] data = self.post('/sdapi/v1/caption', { 'backend': 'openclip', 'image': self.image_b64, 'model': model, 'analyze': True }) if 'error' in data: self.log_skip(f"dispatch openclip analyze: {data.get('reason', 'failed')}") return analyze_fields = ['medium', 'artist', 'movement', 'trending', 'flavor'] fields_found = sum(1 for f in analyze_fields if data.get(f)) if fields_found > 0: self.log_pass(f"dispatch openclip analyze returns {fields_found}/5 breakdown fields") else: self.log_skip("No breakdown fields returned (may be image-dependent)") def test_dispatch_tagger_basic(self): """Test dispatch endpoint routes to Tagger backend.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: tagger)") print("=" * 70) if self.skip_if_critical('tagger', 'dispatch tagger'): return t0 = time.time() data = self.post('/sdapi/v1/caption', { 'backend': 'tagger', 'image': self.image_b64, 'threshold': 0.5, 'max_tags': 20 }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Dispatch to tagger: {data.get('reason', 'failed')}") return # Verify backend field if data.get('backend') == 'tagger': self.log_pass("backend='tagger' returned in response") else: self.log_fail(f"Expected backend='tagger', got '{data.get('backend')}'") # Verify tags field if data.get('tags') and not self.is_error_answer(data['tags']): tags = data['tags'] tag_count = len(tags.split(', ')) self.log_pass(f"Dispatch to tagger returns {tag_count} tags ({elapsed:.1f}s)") self.log_info(f"Tags: {self.truncate(tags, 60)}") else: self.log_fail("Dispatch to tagger returned empty/error tags") def test_dispatch_tagger_params(self): """Test tagger parameters via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: tagger params)") print("=" * 70) if self.skip_if_critical('tagger', 'dispatch tagger params'): return # Test with various parameters data = self.post('/sdapi/v1/caption', { 'backend': 'tagger', 'image': self.image_b64, 'threshold': 0.3, 'max_tags': 10, 'sort_alpha': True, 'use_spaces': True }) if 'error' in data: self.log_skip(f"dispatch tagger params: {data.get('reason', 'failed')}") return if data.get('tags') and data.get('backend') == 'tagger': tags = data['tags'] tag_list = [t.strip() for t in tags.split(',') if t.strip()] if len(tag_list) <= 10: self.log_pass(f"dispatch tagger max_tags=10 respected ({len(tag_list)} tags)") else: self.log_fail(f"dispatch tagger max_tags=10 not respected ({len(tag_list)} tags)") else: self.log_fail("dispatch tagger params failed") def test_dispatch_tagger_scores(self): """Test tagger show_scores via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: tagger scores)") print("=" * 70) if self.skip_if_critical('tagger', 'dispatch tagger scores'): return data = self.post('/sdapi/v1/caption', { 'backend': 'tagger', 'image': self.image_b64, 'show_scores': True, 'max_tags': 5 }) if 'error' in data: self.log_skip(f"dispatch tagger scores: {data.get('reason', 'failed')}") return if data.get('scores') and isinstance(data['scores'], dict): self.log_pass(f"dispatch tagger show_scores returns {len(data['scores'])} scores") self.log_response(data, key_fields=['scores']) else: self.log_fail("dispatch tagger show_scores did not return scores dict") def test_dispatch_vlm_basic(self): """Test dispatch endpoint routes to VLM backend.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: vlm)") print("=" * 70) if self.skip_if_critical('vqa', 'dispatch vlm'): return t0 = time.time() data = self.post('/sdapi/v1/caption', { 'backend': 'vlm', 'image': self.image_b64, 'question': 'describe the image' }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"Dispatch to vlm: {data.get('reason', 'failed')} (model may not be loaded)") return # Verify backend field if data.get('backend') == 'vlm': self.log_pass("backend='vlm' returned in response") else: self.log_fail(f"Expected backend='vlm', got '{data.get('backend')}'") # Verify answer field if data.get('answer') and not self.is_error_answer(data['answer']): self.log_pass(f"Dispatch to vlm returns answer ({elapsed:.1f}s)") self.log_info(f"Answer: {self.truncate(data['answer'], 60)}") else: self.log_fail("Dispatch to vlm returned empty/error answer") def test_dispatch_vlm_prompts(self): """Test different VLM prompts via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: vlm prompts)") print("=" * 70) if self.skip_if_critical('vqa', 'dispatch vlm prompts'): return prompts = ['Short Caption', 'Normal Caption', 'Long Caption'] for prompt in prompts: t0 = time.time() data = self.post('/sdapi/v1/caption', { 'backend': 'vlm', 'image': self.image_b64, 'question': prompt }) elapsed = time.time() - t0 if 'error' in data: self.log_skip(f"dispatch vlm prompt='{prompt}': {data.get('reason', 'failed')}") elif data.get('answer') and data.get('backend') == 'vlm': self.log_pass(f"dispatch vlm prompt='{prompt}' ({len(data['answer'])} chars, {elapsed:.1f}s)") else: self.log_fail(f"dispatch vlm prompt='{prompt}' failed") def test_dispatch_vlm_annotated(self): """Test VLM include_annotated via dispatch.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: vlm annotated)") print("=" * 70) if self.skip_if_critical('vqa', 'dispatch vlm annotated'): return # Find a Florence model for detection florence_model = None if self._vqa_models: for m in self._vqa_models: if 'florence' in m['name'].lower(): florence_model = m['name'] break if not florence_model: florence_model = 'Microsoft Florence 2 Base' data = self.post('/sdapi/v1/caption', { 'backend': 'vlm', 'image': self.image_b64, 'model': florence_model, 'question': '', 'include_annotated': True }) if 'error' in data: self.log_skip(f"dispatch vlm annotated: {data.get('reason', 'failed')}") return if data.get('answer') and data.get('backend') == 'vlm': self.log_pass("dispatch vlm with include_annotated returns answer") if data.get('annotated_image'): self.log_pass("dispatch vlm returns annotated_image") else: self.log_skip("No annotated_image (detection may not have found objects)") else: self.log_fail("dispatch vlm annotated failed") def test_dispatch_backend_field(self): """Test backend field is always returned correctly.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: backend field)") print("=" * 70) backends = ['openclip', 'tagger', 'vlm'] for backend in backends: req = {'backend': backend, 'image': self.image_b64} if backend == 'vlm': req['question'] = 'describe' data = self.post('/sdapi/v1/caption', req) if 'error' in data: self.log_skip(f"backend='{backend}': {data.get('reason', 'failed')}") elif data.get('backend') == backend: self.log_pass(f"backend='{backend}' returned correctly") else: self.log_fail(f"backend='{backend}' not returned, got '{data.get('backend')}'") def test_dispatch_invalid_backend(self): """Test error handling for invalid backend value.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: invalid backend)") print("=" * 70) # check_critical=False since we expect errors data = self.post('/sdapi/v1/caption', { 'backend': 'invalid_backend', 'image': self.image_b64 }, check_critical=False) if 'error' in data: self.log_pass(f"Invalid backend returns error: {data.get('status', 'error')}") else: self.log_fail("Invalid backend should return error") def test_dispatch_missing_image(self): """Test error handling for missing image.""" self.set_category('dispatch') print("\n" + "=" * 70) print("TEST: POST /sdapi/v1/caption (dispatch: missing image)") print("=" * 70) for backend in ['openclip', 'tagger', 'vlm']: req = {'backend': backend, 'image': ''} if backend == 'vlm': req['question'] = 'describe' # check_critical=False since we expect errors data = self.post('/sdapi/v1/caption', req, check_critical=False) if 'error' in data and data.get('status') == 404: self.log_pass(f"dispatch {backend} missing image returns 404") else: self.log_fail(f"dispatch {backend} missing image should return 404, got: {data}") # ========================================================================= # PARITY TESTS: Dispatch vs Direct Endpoints # ========================================================================= def test_parity_openclip(self): """Test dispatch and direct OpenCLIP endpoints return same caption.""" self.set_category('parity') print("\n" + "=" * 70) print("TEST: Parity - OpenCLIP dispatch vs direct") print("=" * 70) if self.skip_if_critical('openclip', 'parity openclip'): return if not self._caption_models: self._caption_models = self.get('/sdapi/v1/openclip') clip_models = [m for m in self._caption_models if '/' in m] if isinstance(self._caption_models, list) else [] if not clip_models: self.log_skip("No OpenCLIP models available") return model = 'ViT-L-14/openai' if 'ViT-L-14/openai' in clip_models else clip_models[0] # Direct endpoint data_direct = self.post('/sdapi/v1/openclip', { 'image': self.image_b64, 'model': model, 'mode': 'caption' }) # Dispatch endpoint data_dispatch = self.post('/sdapi/v1/caption', { 'backend': 'openclip', 'image': self.image_b64, 'model': model, 'mode': 'caption' }) if 'error' in data_direct or 'error' in data_dispatch: self.log_skip("One or both requests failed") return direct_caption = data_direct.get('caption', '') dispatch_caption = data_dispatch.get('caption', '') self.log_info(f"Direct: {self.truncate(direct_caption, 50)}") self.log_info(f"Dispatch: {self.truncate(dispatch_caption, 50)}") if direct_caption == dispatch_caption: self.log_pass("OpenCLIP dispatch and direct return identical captions") elif direct_caption and dispatch_caption: self.log_pass("OpenCLIP dispatch and direct both return captions (may differ due to timing)") else: self.log_fail("OpenCLIP parity test failed") def test_parity_tagger(self): """Test dispatch and direct Tagger endpoints return same tags.""" self.set_category('parity') print("\n" + "=" * 70) print("TEST: Parity - Tagger dispatch vs direct") print("=" * 70) if self.skip_if_critical('tagger', 'parity tagger'): return # Direct endpoint data_direct = self.post('/sdapi/v1/tagger', { 'image': self.image_b64, 'threshold': 0.5, 'max_tags': 20 }) # Dispatch endpoint data_dispatch = self.post('/sdapi/v1/caption', { 'backend': 'tagger', 'image': self.image_b64, 'threshold': 0.5, 'max_tags': 20 }) if 'error' in data_direct or 'error' in data_dispatch: self.log_skip("One or both requests failed") return direct_tags = data_direct.get('tags', '') dispatch_tags = data_dispatch.get('tags', '') self.log_info(f"Direct: {self.truncate(direct_tags, 50)}") self.log_info(f"Dispatch: {self.truncate(dispatch_tags, 50)}") if direct_tags == dispatch_tags: self.log_pass("Tagger dispatch and direct return identical tags") elif direct_tags and dispatch_tags: self.log_pass("Tagger dispatch and direct both return tags") else: self.log_fail("Tagger parity test failed") def test_parity_vlm(self): """Test dispatch and direct VLM endpoints return same answer.""" self.set_category('parity') print("\n" + "=" * 70) print("TEST: Parity - VLM dispatch vs direct") print("=" * 70) if self.skip_if_critical('vqa', 'parity vlm'): return # Direct endpoint data_direct = self.post('/sdapi/v1/vqa', { 'image': self.image_b64, 'question': 'Short Caption' }) # Dispatch endpoint data_dispatch = self.post('/sdapi/v1/caption', { 'backend': 'vlm', 'image': self.image_b64, 'question': 'Short Caption' }) if 'error' in data_direct or 'error' in data_dispatch: self.log_skip("One or both requests failed (model may not be loaded)") return direct_answer = data_direct.get('answer', '') dispatch_answer = data_dispatch.get('answer', '') self.log_info(f"Direct: {self.truncate(direct_answer, 50)}") self.log_info(f"Dispatch: {self.truncate(dispatch_answer, 50)}") if direct_answer == dispatch_answer: self.log_pass("VLM dispatch and direct return identical answers") elif direct_answer and dispatch_answer: self.log_pass("VLM dispatch and direct both return answers (may differ due to sampling)") else: self.log_fail("VLM parity test failed") # ========================================================================= # Run All Tests # ========================================================================= def run_all_tests(self): """Run all tests.""" if not self.setup(): return False # OpenCLIP direct endpoint tests self.test_openclip_list_models() self.test_openclip_post_modes() self.test_openclip_analyze() self.test_openclip_invalid_inputs() self.test_openclip_clip_blip_models() self.test_openclip_length() self.test_openclip_flavors() self.test_openclip_advanced_settings() self.test_openclip_blip_architectures() # VQA direct endpoint tests self.test_vqa_models_list() self.test_vqa_prompts_list() self.test_vqa_caption_basic() self.test_vqa_different_prompts() self.test_vqa_annotated_image() self.test_vqa_system_prompt() self.test_vqa_invalid_inputs() self.test_vqa_prompt_field() self.test_vqa_generation_params() self.test_vqa_sampling() self.test_vqa_thinking_mode() self.test_vqa_prefill() # VQA Architecture tests self.test_vqa_model_architectures() self.test_vqa_florence_special_prompts() self.test_vqa_moondream_detection() self.test_vqa_architecture_capabilities() # Tagger direct endpoint tests self.test_tagger_models_list() self.test_tagger_basic() self.test_tagger_threshold() self.test_tagger_max_tags() self.test_tagger_sort_alpha() self.test_tagger_use_spaces() self.test_tagger_escape_brackets() self.test_tagger_exclude_tags() self.test_tagger_show_scores() self.test_tagger_include_rating() self.test_tagger_character_threshold() self.test_tagger_model_types() self.test_tagger_wd_versions() self.test_tagger_invalid_inputs() # Dispatch endpoint tests (unified /sdapi/v1/caption) self.test_dispatch_openclip_basic() self.test_dispatch_openclip_modes() self.test_dispatch_openclip_analyze() self.test_dispatch_tagger_basic() self.test_dispatch_tagger_params() self.test_dispatch_tagger_scores() self.test_dispatch_vlm_basic() self.test_dispatch_vlm_prompts() self.test_dispatch_vlm_annotated() self.test_dispatch_backend_field() self.test_dispatch_invalid_backend() self.test_dispatch_missing_image() # Parity tests (dispatch vs direct endpoints) self.test_parity_openclip() self.test_parity_tagger() self.test_parity_vlm() self.print_summary() # Check if any tests failed (excluding skipped) total_failed = sum(data['failed'] for data in self.results.values()) return total_failed == 0 def main(): parser = argparse.ArgumentParser( description='Caption API Test Suite', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Test against local server with default test image python cli/test-caption-api.py # Test against custom URL with specific image python cli/test-caption-api.py --url http://127.0.0.1:7860 --image html/sdnext-robot-2k.jpg # Test with authentication python cli/test-caption-api.py --username admin --password secret # Test with longer timeout for slow models python cli/test-caption-api.py --timeout 600 """ ) parser.add_argument('--url', default='http://127.0.0.1:7860', help='Server URL (default: http://127.0.0.1:7860)') parser.add_argument('--image', help='Path to test image') parser.add_argument('--username', help='HTTP Basic Auth username') parser.add_argument('--password', help='HTTP Basic Auth password') parser.add_argument('--timeout', type=int, default=300, help='Request timeout in seconds (default: 300)') args = parser.parse_args() test = CaptionAPITest( base_url=args.url, image_path=args.image, username=args.username, password=args.password, timeout=args.timeout ) success = test.run_all_tests() sys.exit(0 if success else 1) if __name__ == "__main__": main()