mirror of https://github.com/vladmandic/automatic
190 lines
6.6 KiB
Python
190 lines
6.6 KiB
Python
import io
|
|
import re
|
|
import json
|
|
import piexif
|
|
from PIL import Image, ExifTags
|
|
from modules import shared, errors, sd_samplers
|
|
from modules.image.watermark import get_watermark
|
|
|
|
|
|
def safe_decode_string(s: bytes):
|
|
remove_prefix = lambda text, prefix: text[len(prefix):] if text.startswith(prefix) else text # pylint: disable=unnecessary-lambda-assignment
|
|
for encoding in ['utf_16_be', 'utf-8', 'utf-16', 'ascii', 'latin_1', 'cp1252', 'cp437']: # try different encodings
|
|
try:
|
|
s = remove_prefix(s, b'UNICODE')
|
|
s = remove_prefix(s, b'ASCII')
|
|
s = remove_prefix(s, b'\x00')
|
|
val = s.decode(encoding, errors="strict")
|
|
val = re.sub(r'[\x00-\x09]', '', val).strip() # remove remaining special characters
|
|
if len(val) == 0: # remove empty strings
|
|
val = None
|
|
return val
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def parse_comfy_metadata(data: dict):
|
|
def parse_workflow():
|
|
res = ''
|
|
try:
|
|
txt = data.get('workflow', {})
|
|
dct = json.loads(txt)
|
|
nodes = len(dct.get('nodes', []))
|
|
version = dct.get('extra', {}).get('frontendVersion', 'unknown')
|
|
if version is not None:
|
|
res = f" | Version: {version} | Nodes: {nodes}"
|
|
except Exception:
|
|
pass
|
|
return res
|
|
|
|
def parse_prompt():
|
|
res = ''
|
|
try:
|
|
txt = data.get('prompt', {})
|
|
dct = json.loads(txt)
|
|
for val in dct.values():
|
|
inp = val.get('inputs', {})
|
|
if 'model' in inp:
|
|
model = inp.get('model', None)
|
|
if isinstance(model, str) and len(model) > 0:
|
|
res += f" | Model: {model} | Class: {val.get('class_type', '')}"
|
|
except Exception:
|
|
pass
|
|
return res
|
|
|
|
workflow = parse_workflow()
|
|
prompt = parse_prompt()
|
|
if len(workflow) > 0 or len(prompt) > 0:
|
|
parsed = f'App: ComfyUI{workflow}{prompt}'
|
|
shared.log.info(f'Image metadata: {parsed}')
|
|
return parsed
|
|
return ''
|
|
|
|
|
|
def parse_invoke_metadata(data: dict):
|
|
def parse_metadtaa():
|
|
res = ''
|
|
try:
|
|
txt = data.get('invokeai_metadata', {})
|
|
dct = json.loads(txt)
|
|
if 'app_version' in dct:
|
|
version = dct['app_version']
|
|
if isinstance(version, str) and len(version) > 0:
|
|
res += f" | Version: {version}"
|
|
except Exception:
|
|
pass
|
|
return res
|
|
|
|
metadata = parse_metadtaa()
|
|
if len(metadata) > 0:
|
|
parsed = f'App: InvokeAI{metadata}'
|
|
shared.log.info(f'Image metadata: {parsed}')
|
|
return parsed
|
|
return ''
|
|
|
|
|
|
def parse_novelai_metadata(data: dict):
|
|
geninfo = ''
|
|
if data.get("Software", None) == "NovelAI":
|
|
try:
|
|
dct = json.loads(data["Comment"])
|
|
sampler = sd_samplers.samplers_map.get(dct["sampler"], "Euler a")
|
|
geninfo = f'{data["Description"]} Negative prompt: {dct["uc"]} Steps: {dct["steps"]}, Sampler: {sampler}, CFG scale: {dct["scale"]}, Seed: {dct["seed"]}, Clip skip: 2, ENSD: 31337'
|
|
except Exception:
|
|
pass
|
|
return geninfo
|
|
|
|
|
|
def read_info_from_image(image: Image.Image, watermark: bool = False) -> tuple[str, dict]:
|
|
if image is None:
|
|
return '', {}
|
|
if isinstance(image, str):
|
|
try:
|
|
image = Image.open(image)
|
|
image.load()
|
|
except Exception:
|
|
return '', {}
|
|
items = image.info or {}
|
|
geninfo = items.pop('parameters', None) or items.pop('UserComment', None) or ''
|
|
if isinstance(geninfo, dict):
|
|
if 'UserComment' in geninfo:
|
|
geninfo = geninfo['UserComment'] # Info was nested
|
|
else:
|
|
geninfo = '' # Unknown format. Ignore contents
|
|
items['UserComment'] = geninfo
|
|
|
|
if "exif" in items:
|
|
try:
|
|
exif = piexif.load(items["exif"])
|
|
except Exception as e:
|
|
shared.log.error(f'Error loading EXIF data: {e}')
|
|
exif = {}
|
|
for _key, subkey in exif.items():
|
|
if isinstance(subkey, dict):
|
|
for key, val in subkey.items():
|
|
if isinstance(val, bytes): # decode bytestring
|
|
val = safe_decode_string(val)
|
|
if isinstance(val, tuple) and isinstance(val[0], int) and isinstance(val[1], int) and val[1] > 0: # convert camera ratios
|
|
val = round(val[0] / val[1], 2)
|
|
if val is not None and key in ExifTags.TAGS: # add known tags
|
|
if ExifTags.TAGS[key] == 'UserComment': # add geninfo from UserComment
|
|
geninfo = str(val)
|
|
items['parameters'] = val
|
|
else:
|
|
items[ExifTags.TAGS[key]] = val
|
|
elif val is not None and key in ExifTags.GPSTAGS:
|
|
items[ExifTags.GPSTAGS[key]] = val
|
|
if watermark:
|
|
wm = get_watermark(image)
|
|
if wm != '':
|
|
# geninfo += f' Watermark: {wm}'
|
|
items['watermark'] = wm
|
|
|
|
for key, val in items.items():
|
|
if isinstance(val, bytes): # decode bytestring
|
|
items[key] = safe_decode_string(val)
|
|
|
|
geninfo += parse_comfy_metadata(items)
|
|
geninfo += parse_invoke_metadata(items)
|
|
geninfo += parse_novelai_metadata(items)
|
|
|
|
for key in ['exif', 'ExifOffset', 'JpegIFOffset', 'JpegIFByteCount', 'ExifVersion', 'icc_profile', 'jfif', 'jfif_version', 'jfif_unit', 'jfif_density', 'adobe', 'photoshop', 'loop', 'duration', 'dpi']: # remove unwanted tags
|
|
items.pop(key, None)
|
|
|
|
try:
|
|
items['width'] = image.width
|
|
items['height'] = image.height
|
|
items['mode'] = image.mode
|
|
except Exception:
|
|
pass
|
|
|
|
return geninfo, items
|
|
|
|
|
|
def image_data(data):
|
|
import gradio as gr
|
|
if data is None:
|
|
return gr.update(), None
|
|
err1 = None
|
|
err2 = None
|
|
try:
|
|
image = Image.open(io.BytesIO(data))
|
|
image.load()
|
|
info, _ = read_info_from_image(image)
|
|
errors.log.debug(f'Decoded object: image={image} metadata={info}')
|
|
return info, None
|
|
except Exception as e:
|
|
err1 = e
|
|
try:
|
|
if len(data) > 1024 * 10:
|
|
errors.log.warning(f'Error decoding object: data too long: {len(data)}')
|
|
return gr.update(), None
|
|
info = data.decode('utf8')
|
|
errors.log.debug(f'Decoded object: data={len(data)} metadata={info}')
|
|
return info, None
|
|
except Exception as e:
|
|
err2 = e
|
|
errors.log.error(f'Error decoding object: {err1 or err2}')
|
|
return gr.update(), None
|