Fixed wrong negative prompt acquisition.

pull/933/head
osirigunso 2026-03-23 21:59:04 +09:00
parent a4e503f214
commit e74293806b
1 changed files with 159 additions and 156 deletions

View File

@ -488,6 +488,106 @@ def is_img_created_by_comfyui_with_webui_gen_info(img: Image):
return is_img_created_by_comfyui(img) and img.info.get('parameters') return is_img_created_by_comfyui(img) and img.info.get('parameters')
def extract_comfyui_prompt_with_wildcard_support(data: Dict, KSampler_entry: Dict):
"""
Enhanced prompt extraction for workflows using ImpactWildcardProcessor.
This function handles:
- populated_text over wildcard_text
- Recursive resolution of upstream prompt nodes
- Intermediate conditioning nodes such as FluxGuidance
Returns:
tuple of (positive_prompt, negative_prompt)
"""
def get_node(node_id):
return data.get(str(node_id)) or data.get(node_id)
def normalize_text(value):
if not isinstance(value, str):
return None
return value.strip()
def extract_direct_text(node):
if not isinstance(node, dict):
return None, False
class_type = node.get("class_type", "")
inputs = node.get("inputs", {}) or {}
if class_type == "ImpactWildcardProcessor":
populated = normalize_text(inputs.get("populated_text"))
return (populated or ""), True
if "CLIPTextEncode" in class_type:
for key in ("text", "t5xxl"):
if key in inputs:
value = inputs.get(key)
if isinstance(value, str):
return value.strip(), True
if isinstance(value, list) and len(value) >= 1:
return None, False
return "", True
for key in ("text", "t5xxl", "prompt", "string", "value"):
if key in inputs and isinstance(inputs.get(key), str):
return inputs.get(key).strip(), True
return None, False
def resolve_text_from_ref(ref, visited=None):
if visited is None:
visited = set()
node_id = ref[0] if isinstance(ref, list) and len(ref) >= 1 else ref
node_key = str(node_id)
if not node_key or node_key in visited:
return ""
visited.add(node_key)
node = get_node(node_id)
if not isinstance(node, dict):
return ""
direct_text, is_terminal = extract_direct_text(node)
if direct_text is not None:
return direct_text
if is_terminal:
return ""
inputs = node.get("inputs", {}) or {}
class_type = node.get("class_type", "")
if class_type == "FluxGuidance":
conditioning = inputs.get("conditioning")
if isinstance(conditioning, list) and len(conditioning) >= 1:
return resolve_text_from_ref(conditioning, visited)
return ""
for key in ("text", "t5xxl", "conditioning", "positive", "negative", "prompt", "string", "value"):
value = inputs.get(key)
if isinstance(value, list) and len(value) >= 1:
resolved = resolve_text_from_ref(value, visited)
if resolved or resolved == "":
return resolved
return ""
try:
positive_ref = KSampler_entry.get("positive")
negative_ref = KSampler_entry.get("negative")
positive_prompt = resolve_text_from_ref(positive_ref) if positive_ref else ""
negative_prompt = resolve_text_from_ref(negative_ref) if negative_ref else ""
return positive_prompt or "", negative_prompt or ""
except Exception as e:
print(e)
return "", ""
def get_comfyui_exif_data(img: Image): def get_comfyui_exif_data(img: Image):
prompt = None prompt = None
if img.format == "PNG": if img.format == "PNG":
@ -498,13 +598,11 @@ def get_comfyui_exif_data(img: Image):
split = [x.decode("utf-8", errors="ignore") for x in exif.split(b"\x00")] split = [x.decode("utf-8", errors="ignore") for x in exif.split(b"\x00")]
prompt_str = find(split, lambda x: x.lower().startswith("prompt:")) prompt_str = find(split, lambda x: x.lower().startswith("prompt:"))
if prompt_str: if prompt_str:
prompt = prompt_str.split(":", 1)[1] prompt = prompt_str.split(":", 1)[1] if prompt_str else None
if not prompt: if not prompt:
return {} return {}
data: Dict[str, Any] = json.loads(prompt) data: Dict[str, Any] = json.loads(prompt)
meta_key = '3' meta_key = '3'
for i in data.keys(): for i in data.keys():
try: try:
@ -530,157 +628,42 @@ def get_comfyui_exif_data(img: Image):
meta["Model"] = None meta["Model"] = None
meta["Source Identifier"] = "ComfyUI" meta["Source Identifier"] = "ComfyUI"
text_key_priority = [ def get_text_from_clip(idx: str):
"populated_text", # ImpactWildcardProcessor の最終展開結果
"text",
"prompt",
"positive",
"negative",
"string",
"value",
"t5xxl",
]
text_key_blacklist = {
"wildcard_text", # 展開前テンプレート
"Select to add Wildcard", # UI 用
"select_to_add_wildcard",
"template",
"pattern",
}
wildcard_patterns = [
re.compile(r"__[^_\n]+__"),
re.compile(r"\{[^{}\n]*\|[^{}\n]*\}"),
]
def normalize_text(value):
if not isinstance(value, str):
return None
value = value.strip()
return value if value else None
def looks_unexpanded_wildcard(text: str) -> bool:
return any(p.search(text) for p in wildcard_patterns)
def get_node(node_id):
key = str(node_id)
return data.get(key) or data.get(node_id)
def get_best_text_from_inputs(inputs: dict):
if not isinstance(inputs, dict):
return ""
clean_candidates = []
wildcard_candidates = []
def add_candidate(value):
text = normalize_text(value)
if not text:
return
if looks_unexpanded_wildcard(text):
wildcard_candidates.append(text)
else:
clean_candidates.append(text)
for key in text_key_priority:
if key in text_key_blacklist:
continue
add_candidate(inputs.get(key))
if clean_candidates:
return clean_candidates[0]
if wildcard_candidates:
return wildcard_candidates[0]
for key, value in inputs.items():
if key in text_key_blacklist:
continue
add_candidate(value)
if clean_candidates:
return clean_candidates[0]
if wildcard_candidates:
return wildcard_candidates[0]
return ""
def extract_text_from_node(node: dict):
if not isinstance(node, dict):
return ""
class_type = node.get("class_type", "")
inputs = node.get("inputs", {}) or {}
# 明示的な特例: ImpactWildcardProcessor は populated_text を最優先
if class_type == "ImpactWildcardProcessor":
populated = normalize_text(inputs.get("populated_text"))
if populated:
return populated
# wildcard_text は意図的に返さない
return ""
return get_best_text_from_inputs(inputs)
def resolve_text_from_ref(ref, visited=None):
if visited is None:
visited = set()
node_id = ref[0] if isinstance(ref, list) and len(ref) >= 1 else ref
node_key = str(node_id)
if node_key in visited:
return ""
visited.add(node_key)
node = get_node(node_id)
if not isinstance(node, dict):
return ""
direct_text = extract_text_from_node(node)
if direct_text:
return direct_text
inputs = node.get("inputs", {}) or {}
# FluxGuidance の場合は conditioning を優先して辿る
if node.get("class_type") == "FluxGuidance":
conditioning = inputs.get("conditioning")
if isinstance(conditioning, list) and len(conditioning) >= 1:
resolved = resolve_text_from_ref(conditioning, visited)
if resolved:
return resolved
# よく使う接続キーを優先
preferred_link_keys = [
"text",
"conditioning",
"positive",
"negative",
"prompt",
"string",
"value",
]
for key in preferred_link_keys:
value = inputs.get(key)
if isinstance(value, list) and len(value) >= 1:
resolved = resolve_text_from_ref(value, visited)
if resolved:
return resolved
# fallback: 全 input を走査
for value in inputs.values():
if isinstance(value, list) and len(value) >= 1:
resolved = resolve_text_from_ref(value, visited)
if resolved:
return resolved
return ""
def get_text_from_clip(idx):
try: try:
return resolve_text_from_ref(idx) inputs = data[idx]["inputs"]
if "text" in inputs:
text = inputs["text"]
elif "t5xxl" in inputs:
text = inputs["t5xxl"]
else:
return ""
if isinstance(text, list): # type:CLIPTextEncode (NSP) mode:Wildcards
text = data[text[0]]["inputs"]["text"]
return text.strip()
except Exception as e: except Exception as e:
print(e) print(e)
return "" return ""
has_impact_wildcard = any(
node_data.get("class_type") == "ImpactWildcardProcessor"
for node_data in data.values()
if isinstance(node_data, dict)
)
# Detection Point 1: Check if workflow contains ImpactWildcardProcessor
# If yes, immediately use the enhanced extraction and return
if has_impact_wildcard:
pos_prompt, neg_prompt = extract_comfyui_prompt_with_wildcard_support(
data, KSampler_entry
)
pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"])
return {
"meta": meta,
"pos_prompt": pos_prompt_arr,
"pos_prompt_raw": pos_prompt,
"neg_prompt_raw": neg_prompt
}
extract_all_prompts = os.getenv("IIB_COMFYUI_EXTRACT_ALL_PROMPTS", "false").lower() == "true" extract_all_prompts = os.getenv("IIB_COMFYUI_EXTRACT_ALL_PROMPTS", "false").lower() == "true"
if extract_all_prompts: if extract_all_prompts:
@ -690,25 +673,45 @@ def get_comfyui_exif_data(img: Image):
for node_id, node_data in data.items(): for node_id, node_data in data.items():
try: try:
class_type = node_data.get("class_type", "") class_type = node_data.get("class_type", "")
if "CLIPTextEncode" in class_type or class_type == "ImpactWildcardProcessor": inputs = node_data.get("inputs", {})
text = resolve_text_from_ref(node_id)
if "CLIPTextEncode" in class_type:
text = inputs.get("text", "")
if isinstance(text, list):
text = data[text[0]]["inputs"].get("text", "")
if text: if text:
all_prompts.append(text.strip()) all_prompts.append(text.strip())
except Exception as e: except Exception as e:
print(e) print(e)
pass pass
all_prompts_str = "\nBREAK\n".join(unique_by(all_prompts)) if all_prompts else "" all_prompts_str = "\nBREAK\n".join(all_prompts) if all_prompts else ""
pos_prompt = all_prompts_str pos_prompt = all_prompts_str
neg_prompt = "" neg_prompt = ""
else: else:
positive_ref = KSampler_entry.get("positive") in_node = data[str(KSampler_entry["positive"][0])]
negative_ref = KSampler_entry.get("negative") if in_node["class_type"] != "FluxGuidance":
pos_prompt = get_text_from_clip(KSampler_entry["positive"][0])
else:
pos_prompt = get_text_from_clip(in_node["inputs"]["conditioning"][0])
pos_prompt = get_text_from_clip(positive_ref) if positive_ref else "" neg_prompt = get_text_from_clip(KSampler_entry["negative"][0])
neg_prompt = get_text_from_clip(negative_ref) if negative_ref else ""
pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"]) pos_prompt_arr = unique_by(parse_prompt(pos_prompt)["pos_prompt"])
# Detection Point 2: Fallback if no prompts were extracted
# If standard extraction failed, try the enhanced method
if has_impact_wildcard and (not pos_prompt_arr or not pos_prompt.strip()):
pos_prompt_fallback, neg_prompt_fallback = extract_comfyui_prompt_with_wildcard_support(
data, KSampler_entry
)
if pos_prompt_fallback:
pos_prompt = pos_prompt_fallback
pos_prompt_arr = unique_by(parse_prompt(pos_prompt_fallback)["pos_prompt"])
if neg_prompt_fallback:
neg_prompt = neg_prompt_fallback
return { return {
"meta": meta, "meta": meta,
"pos_prompt": pos_prompt_arr, "pos_prompt": pos_prompt_arr,