Merge pull request #933 from osirigunso/fix/comfyui-populated-text

Fix ComfyUI prompt extraction for ImpactWildcardProcessor
fix/non-standard-exif-support
zanllp 2026-03-24 00:22:49 +08:00 committed by GitHub
commit 5694044a09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 157 additions and 12 deletions

View File

@ -487,37 +487,148 @@ def is_img_created_by_comfyui(img: Image):
def is_img_created_by_comfyui_with_webui_gen_info(img: Image): def is_img_created_by_comfyui_with_webui_gen_info(img: Image):
return is_img_created_by_comfyui(img) and img.info.get('parameters') return is_img_created_by_comfyui(img) and img.info.get('parameters')
def extract_comfyui_prompt_with_wildcard_support(data: Dict, KSampler_entry: Dict):
"""
Enhanced prompt extraction for workflows using ImpactWildcardProcessor.
This function handles:
- populated_text over wildcard_text
- Recursive resolution of upstream prompt nodes
- Intermediate conditioning nodes such as FluxGuidance
Returns:
tuple of (positive_prompt, negative_prompt)
"""
def get_node(node_id):
return data.get(str(node_id)) or data.get(node_id)
def normalize_text(value):
if not isinstance(value, str):
return None
return value.strip()
def extract_direct_text(node):
if not isinstance(node, dict):
return None, False
class_type = node.get("class_type", "")
inputs = node.get("inputs", {}) or {}
if class_type == "ImpactWildcardProcessor":
populated = normalize_text(inputs.get("populated_text"))
return (populated or ""), True
if "CLIPTextEncode" in class_type:
for key in ("text", "t5xxl"):
if key in inputs:
value = inputs.get(key)
if isinstance(value, str):
return value.strip(), True
if isinstance(value, list) and len(value) >= 1:
return None, False
return "", True
for key in ("text", "t5xxl", "prompt", "string", "value"):
if key in inputs and isinstance(inputs.get(key), str):
return inputs.get(key).strip(), True
return None, False
def resolve_text_from_ref(ref, visited=None):
if visited is None:
visited = set()
node_id = ref[0] if isinstance(ref, list) and len(ref) >= 1 else ref
node_key = str(node_id)
if not node_key or node_key in visited:
return ""
visited.add(node_key)
node = get_node(node_id)
if not isinstance(node, dict):
return ""
direct_text, is_terminal = extract_direct_text(node)
if direct_text is not None:
return direct_text
if is_terminal:
return ""
inputs = node.get("inputs", {}) or {}
class_type = node.get("class_type", "")
if class_type == "FluxGuidance":
conditioning = inputs.get("conditioning")
if isinstance(conditioning, list) and len(conditioning) >= 1:
return resolve_text_from_ref(conditioning, visited)
return ""
for key in ("text", "t5xxl", "conditioning", "positive", "negative", "prompt", "string", "value"):
value = inputs.get(key)
if isinstance(value, list) and len(value) >= 1:
resolved = resolve_text_from_ref(value, visited)
if resolved or resolved == "":
return resolved
return ""
try:
positive_ref = KSampler_entry.get("positive")
negative_ref = KSampler_entry.get("negative")
positive_prompt = resolve_text_from_ref(positive_ref) if positive_ref else ""
negative_prompt = resolve_text_from_ref(negative_ref) if negative_ref else ""
return positive_prompt or "", negative_prompt or ""
except Exception as e:
print(e)
return "", ""
def get_comfyui_exif_data(img: Image): def get_comfyui_exif_data(img: Image):
prompt = None
if img.format == "PNG": if img.format == "PNG":
prompt = img.info.get('prompt') prompt = img.info.get('prompt')
elif img.format == "WEBP": elif img.format == "WEBP":
exif = img.info.get("exif") exif = img.info.get("exif")
split = [x.decode("utf-8", errors="ignore") for x in exif.split(b"\x00")] if exif:
prompt_str = find(split, lambda x: x.lower().startswith("prompt:")) split = [x.decode("utf-8", errors="ignore") for x in exif.split(b"\x00")]
if prompt_str: prompt_str = find(split, lambda x: x.lower().startswith("prompt:"))
prompt = prompt_str.split(":", 1)[1] if prompt_str else None if prompt_str:
prompt = prompt_str.split(":", 1)[1] if prompt_str else None
if not prompt: if not prompt:
return {} return {}
data: Dict[str, Any] = json.loads(prompt)
meta_key = '3' meta_key = '3'
data: Dict[str, any] = json.loads(prompt)
for i in data.keys(): for i in data.keys():
try: try:
if data[i]["class_type"].startswith("KSampler"): if data[i]["class_type"].startswith("KSampler"):
meta_key = i meta_key = i
break break
except: except Exception:
pass pass
if meta_key not in data:
return {}
meta = {} meta = {}
KSampler_entry = data[meta_key]["inputs"] KSampler_entry = data[meta_key]["inputs"]
#print(KSampler_entry) # for testing
# As a workaround to bypass parsing errors in the parser. # As a workaround to bypass parsing errors in the parser.
# https://github.com/jiw0220/stable-diffusion-image-metadata/blob/00b8d42d4d1a536862bba0b07c332bdebb2a0ce5/src/index.ts#L130 # https://github.com/jiw0220/stable-diffusion-image-metadata/blob/00b8d42d4d1a536862bba0b07c332bdebb2a0ce5/src/index.ts#L130
meta["Steps"] = KSampler_entry.get("steps", "Unknown") meta["Steps"] = KSampler_entry.get("steps", "Unknown")
meta["Sampler"] = KSampler_entry["sampler_name"] meta["Sampler"] = KSampler_entry.get("sampler_name", "Unknown")
meta["Model"] = data[KSampler_entry["model"][0]]["inputs"].get("ckpt_name") try:
meta["Model"] = data[KSampler_entry["model"][0]]["inputs"].get("ckpt_name")
except Exception:
meta["Model"] = None
meta["Source Identifier"] = "ComfyUI" meta["Source Identifier"] = "ComfyUI"
def get_text_from_clip(idx: str) :
def get_text_from_clip(idx: str):
try: try:
inputs = data[idx]["inputs"] inputs = data[idx]["inputs"]
if "text" in inputs: if "text" in inputs:
@ -526,13 +637,33 @@ def get_comfyui_exif_data(img: Image):
text = inputs["t5xxl"] text = inputs["t5xxl"]
else: else:
return "" return ""
if isinstance(text, list): # type:CLIPTextEncode (NSP) mode:Wildcards if isinstance(text, list): # type:CLIPTextEncode (NSP) mode:Wildcards
text = data[text[0]]["inputs"]["text"] text = data[text[0]]["inputs"]["text"]
return text.strip() return text.strip()
except Exception as e: except Exception as e:
print(e) print(e)
return "" return ""
has_impact_wildcard = any(
node_data.get("class_type") == "ImpactWildcardProcessor"
for node_data in data.values()
if isinstance(node_data, dict)
)
# Detection Point 1: Check if workflow contains ImpactWildcardProcessor
# If yes, immediately use the enhanced extraction and return
if has_impact_wildcard:
pos_prompt, neg_prompt = extract_comfyui_prompt_with_wildcard_support(
data, KSampler_entry
)
pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"])
return {
"meta": meta,
"pos_prompt": pos_prompt_arr,
"pos_prompt_raw": pos_prompt,
"neg_prompt_raw": neg_prompt
}
extract_all_prompts = os.getenv("IIB_COMFYUI_EXTRACT_ALL_PROMPTS", "false").lower() == "true" extract_all_prompts = os.getenv("IIB_COMFYUI_EXTRACT_ALL_PROMPTS", "false").lower() == "true"
if extract_all_prompts: if extract_all_prompts:
@ -567,11 +698,25 @@ def get_comfyui_exif_data(img: Image):
neg_prompt = get_text_from_clip(KSampler_entry["negative"][0]) neg_prompt = get_text_from_clip(KSampler_entry["negative"][0])
pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"]) pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"])
# Detection Point 2: Fallback if no prompts were extracted
# If standard extraction failed, try the enhanced method
if has_impact_wildcard and (not pos_prompt_arr or not pos_prompt.strip()):
pos_prompt_fallback, neg_prompt_fallback = extract_comfyui_prompt_with_wildcard_support(
data, KSampler_entry
)
if pos_prompt_fallback:
pos_prompt = pos_prompt_fallback
pos_prompt_arr = unique_by(parse_prompt(pos_prompt_fallback)["pos_prompt"])
if neg_prompt_fallback:
neg_prompt = neg_prompt_fallback
return { return {
"meta": meta, "meta": meta,
"pos_prompt": pos_prompt_arr, "pos_prompt": pos_prompt_arr,
"pos_prompt_raw": pos_prompt, "pos_prompt_raw": pos_prompt,
"neg_prompt_raw" : neg_prompt "neg_prompt_raw": neg_prompt
} }
def comfyui_exif_data_to_str(data): def comfyui_exif_data_to_str(data):