refactor monolithic shared.py and separate legacy options with logging

Signed-off-by: Vladimir Mandic <mandic00@live.com>
pull/4058/head
Vladimir Mandic 2025-07-12 13:35:41 -04:00
parent 4cec2db9a3
commit 4e449ca9b2
13 changed files with 599 additions and 572 deletions

View File

@ -36,6 +36,7 @@ ignore-paths=/usr/lib/.*$,
modules/teacache, modules/teacache,
modules/todo, modules/todo,
pipelines/flex2, pipelines/flex2,
pipelines/f_lite,
pipelines/hidream, pipelines/hidream,
pipelines/meissonic, pipelines/meissonic,
pipelines/omnigen2, pipelines/omnigen2,

View File

@ -74,6 +74,7 @@ Although upgrades and existing installations are tested and should work fine!
see [Styles docs](https://vladmandic.github.io/sdnext-docs/Styles/) for details see [Styles docs](https://vladmandic.github.io/sdnext-docs/Styles/) for details
- **TAESD** is now default preview type since its the only one that supports most new models - **TAESD** is now default preview type since its the only one that supports most new models
- SD.Next now starts with *locked* state preventing model loading until startup is complete - SD.Next now starts with *locked* state preventing model loading until startup is complete
- warn when modifying legacy settings that are no longer supported, but available for compatibilty
- **API** - **API**
- add `/sdapi/v1/lock-checkpoint` endpoint that can be used to lock/unlock model changes - add `/sdapi/v1/lock-checkpoint` endpoint that can be used to lock/unlock model changes
if model is locked, it cannot be changed using normal load or unload methods if model is locked, it cannot be changed using normal load or unload methods
@ -110,6 +111,7 @@ Although upgrades and existing installations are tested and should work fine!
- remove legacy lora support: `/extensions-builtin/Lora` - remove legacy lora support: `/extensions-builtin/Lora`
- remove legacy clip/blip interrogate module - remove legacy clip/blip interrogate module
- remove modern-ui remove `only-original` vs `only-diffusers` code paths - remove modern-ui remove `only-original` vs `only-diffusers` code paths
- split monolithic `shared.py`
- cleanup `/modules`: move pipeline loaders to `/pipelines` root - cleanup `/modules`: move pipeline loaders to `/pipelines` root
- cleanup `/modules`: move code folders used by pipelines to `/pipelines/<pipeline>` folder - cleanup `/modules`: move code folders used by pipelines to `/pipelines/<pipeline>` folder
- cleanup `/modules`: move code folders used by scripts to `/scripts/<script>` folder - cleanup `/modules`: move code folders used by scripts to `/scripts/<script>` folder

View File

@ -39,7 +39,7 @@ Main ToDo list can be found at [GitHub projects](https://github.com/users/vladma
### Future Considerations ### Future Considerations
- [TensorRT](https://github.com/huggingface/diffusers/pull/11173) - [TensorRT](https://github.com/huggingface/diffusers/pull/11173)
- [Modular guiders](https://github.com/huggingface/diffusers/pull/11311) - [Modular pipelines and guiders](https://github.com/huggingface/diffusers/issues/11915)
### New models ### New models

View File

@ -554,7 +554,7 @@ def check_diffusers():
t_start = time.time() t_start = time.time()
if args.skip_all or args.skip_git: if args.skip_all or args.skip_git:
return return
sha = '7a935a0bbe5f30847e7b27bda4df1d6d7b4b0aee' # diffusers commit hash sha = '3c8b67b3711b668a6e7867e08b54280e51454eb5' # diffusers commit hash
pkg = pkg_resources.working_set.by_key.get('diffusers', None) pkg = pkg_resources.working_set.by_key.get('diffusers', None)
minor = int(pkg.version.split('.')[1] if pkg is not None else -1) minor = int(pkg.version.split('.')[1] if pkg is not None else -1)
cur = opts.get('diffusers_version', '') if minor > -1 else '' cur = opts.get('diffusers_version', '') if minor > -1 else ''
@ -580,7 +580,7 @@ def check_transformers():
target = '4.52.4' target = '4.52.4'
else: else:
target = '4.53.2' target = '4.53.2'
if (pkg is None) or ((pkg.version != target) and not (args.experimental)): if (pkg is None) or ((pkg.version != target) and (not args.experimental)):
if pkg is None: if pkg is None:
log.info(f'Transformers install: version={target}') log.info(f'Transformers install: version={target}')
else: else:

View File

@ -4,10 +4,18 @@ import argparse
from modules.paths import data_path, models_path from modules.paths import data_path, models_path
parsed = None
parser = argparse.ArgumentParser(description="SD.Next", conflict_handler='resolve', epilog='For other options see UI Settings page', prog='', add_help=True, formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=55, indent_increment=2, width=200)) parser = argparse.ArgumentParser(description="SD.Next", conflict_handler='resolve', epilog='For other options see UI Settings page', prog='', add_help=True, formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=55, indent_increment=2, width=200))
parser._optionals = parser.add_argument_group('Other options') # pylint: disable=protected-access parser._optionals = parser.add_argument_group('Other options') # pylint: disable=protected-access
def parse_args():
global parsed # pylint: disable=global-statement
if parsed is None:
parsed, _ = parser.parse_known_args()
return parsed
def main_args(): def main_args():
# main server args # main server args
group_config = parser.add_argument_group('Configuration') group_config = parser.add_argument_group('Configuration')
@ -152,7 +160,6 @@ def settings_args(opts, args):
opts.data['clip_skip'] = 1 opts.data['clip_skip'] = 1
opts.onchange("lora_dir", lambda: setattr(args, "lora_dir", opts.lora_dir)) opts.onchange("lora_dir", lambda: setattr(args, "lora_dir", opts.lora_dir))
opts.onchange("lyco_dir", lambda: setattr(args, "lyco_dir", opts.lyco_dir))
if "USED_VSCODE_COMMAND_PICKARGS" in os.environ: if "USED_VSCODE_COMMAND_PICKARGS" in os.environ:
import shlex import shlex

View File

@ -19,6 +19,76 @@ def active():
return [x for x in extensions if x.enabled] return [x for x in extensions if x.enabled]
def temp_disable_extensions():
disable_safe = [
'sd-webui-controlnet',
'multidiffusion-upscaler-for-automatic1111',
'a1111-sd-webui-lycoris',
'sd-webui-agent-scheduler',
'clip-interrogator-ext',
'stable-diffusion-webui-images-browser',
]
disable_diffusers = [
'sd-webui-controlnet',
'multidiffusion-upscaler-for-automatic1111',
'a1111-sd-webui-lycoris',
'sd-webui-animatediff',
]
disable_themes = [
'sd-webui-lobe-theme',
'cozy-nest',
'sdnext-modernui',
]
disabled = []
if shared.cmd_opts.theme is not None:
theme_name = shared.cmd_opts.theme
else:
theme_name = f'{shared.opts.theme_type.lower()}/{shared.opts.gradio_theme}'
if theme_name == 'lobe':
disable_themes.remove('sd-webui-lobe-theme')
elif theme_name == 'cozy-nest' or theme_name == 'cozy':
disable_themes.remove('cozy-nest')
elif '/' not in theme_name: # set default themes per type
if theme_name == 'standard' or theme_name == 'default':
theme_name = 'standard/black-teal'
if theme_name == 'modern':
theme_name = 'modern/Default'
if theme_name == 'gradio':
theme_name = 'gradio/default'
if theme_name == 'huggingface':
theme_name = 'huggingface/blaaa'
if theme_name.lower().startswith('standard') or theme_name.lower().startswith('default'):
shared.opts.data['theme_type'] = 'Standard'
shared.opts.data['gradio_theme'] = theme_name[9:]
elif theme_name.lower().startswith('modern'):
shared.opts.data['theme_type'] = 'Modern'
shared.opts.data['gradio_theme'] = theme_name[7:]
disable_themes.remove('sdnext-modernui')
elif theme_name.lower().startswith('huggingface') or theme_name.lower().startswith('gradio') or theme_name.lower().startswith('none'):
shared.opts.data['theme_type'] = 'None'
shared.opts.data['gradio_theme'] = theme_name
else:
shared.log.error(f'UI theme invalid: theme="{theme_name}" available={["standard/*", "modern/*", "none/*"]} fallback="standard/black-teal"')
shared.opts.data['theme_type'] = 'Standard'
shared.opts.data['gradio_theme'] = 'black-teal'
for ext in disable_themes:
if ext.lower() not in shared.opts.disabled_extensions:
disabled.append(ext)
if shared.cmd_opts.safe:
for ext in disable_safe:
if ext.lower() not in shared.opts.disabled_extensions:
disabled.append(ext)
for ext in disable_diffusers:
if ext.lower() not in shared.opts.disabled_extensions:
disabled.append(ext)
disabled.append('Lora')
shared.cmd_opts.controlnet_loglevel = 'WARNING'
return disabled
class Extension: class Extension:
def __init__(self, name, path, enabled=True, is_builtin=False): def __init__(self, name, path, enabled=True, is_builtin=False):
self.name = name self.name = name
@ -152,7 +222,7 @@ def list_extensions():
extension_paths.append((extension_dirname, path, dirname == extensions_builtin_dir)) extension_paths.append((extension_dirname, path, dirname == extensions_builtin_dir))
if shared.opts.theme_type == 'Modern' and 'sdnext-modernui' in shared.opts.disabled_extensions: if shared.opts.theme_type == 'Modern' and 'sdnext-modernui' in shared.opts.disabled_extensions:
shared.opts.disabled_extensions.remove('sdnext-modernui') shared.opts.disabled_extensions.remove('sdnext-modernui')
disabled_extensions = [e.lower() for e in shared.opts.disabled_extensions + shared.temp_disable_extensions()] disabled_extensions = [e.lower() for e in shared.opts.disabled_extensions + temp_disable_extensions()]
for dirname, path, is_builtin in extension_paths: for dirname, path, is_builtin in extension_paths:
enabled = dirname.lower() not in disabled_extensions enabled = dirname.lower() not in disabled_extensions
extension = Extension(name=dirname, path=path, enabled=enabled, is_builtin=is_builtin) extension = Extension(name=dirname, path=path, enabled=enabled, is_builtin=is_builtin)

116
modules/json_helpers.py Normal file
View File

@ -0,0 +1,116 @@
import os
import sys
import time
import json
import fasteners
import orjson
from installer import log
locking_available = True # used by file read/write locking
def readfile(filename, silent=False, lock=False):
global locking_available # pylint: disable=global-statement
data = {}
lock_file = None
locked = False
if lock and locking_available:
try:
lock_file = fasteners.InterProcessReaderWriterLock(f"{filename}.lock")
lock_file.logger.disabled = True
locked = lock_file.acquire_read_lock(blocking=True, timeout=3)
except Exception as err:
lock_file = None
locking_available = False
log.error(f'File read lock: file="{filename}" {err}')
locked = False
try:
# if not os.path.exists(filename):
# return {}
t0 = time.time()
with open(filename, "rb") as file:
b = file.read()
data = orjson.loads(b) # pylint: disable=no-member
# if type(data) is str:
# data = json.loads(data)
t1 = time.time()
if not silent:
fn = f'{sys._getframe(2).f_code.co_name}:{sys._getframe(1).f_code.co_name}' # pylint: disable=protected-access
log.debug(f'Read: file="{filename}" json={len(data)} bytes={os.path.getsize(filename)} time={t1-t0:.3f} fn={fn}')
except FileNotFoundError as err:
if not silent:
log.debug(f'Reading failed: {filename} {err}')
except Exception as err:
if not silent:
log.error(f'Reading failed: {filename} {err}')
try:
if locking_available and lock_file is not None:
lock_file.release_read_lock()
if locked and os.path.exists(f"{filename}.lock"):
os.remove(f"{filename}.lock")
except Exception:
locking_available = False
return data
def writefile(data, filename, mode='w', silent=False, atomic=False):
import tempfile
global locking_available # pylint: disable=global-statement
lock_file = None
locked = False
def default(obj):
log.error(f'Save: file="{filename}" not a valid object: {obj}')
return str(obj)
try:
t0 = time.time()
# skipkeys=True, ensure_ascii=True, check_circular=True, allow_nan=True
if type(data) == dict:
output = json.dumps(data, indent=2, default=default)
elif type(data) == list:
output = json.dumps(data, indent=2, default=default)
elif isinstance(data, object):
simple = {}
for k in data.__dict__:
if data.__dict__[k] is not None:
simple[k] = data.__dict__[k]
output = json.dumps(simple, indent=2, default=default)
else:
raise ValueError('not a valid object')
except Exception as err:
log.error(f'Save failed: file="{filename}" {err}')
return
try:
if locking_available:
lock_file = fasteners.InterProcessReaderWriterLock(f"{filename}.lock") if locking_available else None
lock_file.logger.disabled = True
locked = lock_file.acquire_write_lock(blocking=True, timeout=3) if lock_file is not None else False
except Exception as err:
locking_available = False
lock_file = None
log.error(f'File write lock: file="{filename}" {err}')
locked = False
try:
if atomic:
with tempfile.NamedTemporaryFile(mode=mode, encoding="utf8", delete=False, dir=os.path.dirname(filename)) as f:
f.write(output)
f.flush()
os.fsync(f.fileno())
os.replace(f.name, filename)
else:
with open(filename, mode=mode, encoding="utf8") as file:
file.write(output)
t1 = time.time()
if not silent:
log.debug(f'Save: file="{filename}" json={len(data)} bytes={len(output)} time={t1-t0:.3f}')
except Exception as err:
log.error(f'Save failed: file="{filename}" {err}')
try:
if locking_available and lock_file is not None:
lock_file.release_write_lock()
if locked and os.path.exists(f"{filename}.lock"):
os.remove(f"{filename}.lock")
except Exception:
locking_available = False

View File

@ -2,6 +2,12 @@ from dataclasses import dataclass
from installer import log from installer import log
def options_section(section_identifier, options_dict):
for v in options_dict.values():
v.section = section_identifier
return options_dict
class OptionInfo: class OptionInfo:
def __init__( def __init__(
self, self,

222
modules/options_handler.py Normal file
View File

@ -0,0 +1,222 @@
import os
import json
import threading
from typing import TYPE_CHECKING
from modules import cmd_args, errors
from modules.json_helpers import readfile, writefile
from modules.shared_legacy import LegacyOption
from installer import log
if TYPE_CHECKING:
from modules.options import OptionInfo
cmd_opts = cmd_args.parse_args()
compatibility_opts = ['clip_skip', 'uni_pc_lower_order_final', 'uni_pc_order']
class Options():
data = None
data_labels = None
filename = None
typemap = {int: float}
debug = os.environ.get('SD_CONFIG_DEBUG', None) is not None
def __init__(self, options_templates:dict={}, restricted_opts:dict={}):
self.data_labels = options_templates
self.restricted_opts = restricted_opts
self.data = {k: v.default for k, v in self.data_labels.items()}
self.legacy = [k for k, v in self.data_labels.items() if isinstance(v, LegacyOption)]
def __setattr__(self, key, value): # pylint: disable=inconsistent-return-statements
if self.data is not None:
if key in self.data or key in self.data_labels:
if cmd_opts.freeze:
log.warning(f'Settings are frozen: {key}')
return
if cmd_opts.hide_ui_dir_config and key in self.restricted_opts:
log.warning(f'Settings key is restricted: {key}')
return
if self.debug:
log.trace(f'Settings set: {key}={value}')
if key in self.legacy:
log.warning(f'Settings set: {key}={value} legacy')
self.data[key] = value
return
return super(Options, self).__setattr__(key, value) # pylint: disable=super-with-arguments
def get(self, item):
if self.data is not None:
if item in self.data:
return self.data[item]
if item in self.data_labels:
return self.data_labels[item].default
return super(Options, self).__getattribute__(item) # pylint: disable=super-with-arguments
def __getattr__(self, item):
if self.data is not None:
if item in self.data:
return self.data[item]
if item in self.data_labels:
return self.data_labels[item].default
return super(Options, self).__getattribute__(item) # pylint: disable=super-with-arguments
def set(self, key, value):
"""sets an option and calls its onchange callback, returning True if the option changed and False otherwise"""
oldval = self.data.get(key, None)
if oldval is None:
oldval = self.data_labels[key].default
if oldval == value:
return False
try:
setattr(self, key, value)
except RuntimeError:
return False
if self.data_labels[key].onchange is not None:
try:
self.data_labels[key].onchange()
except Exception as err:
log.error(f'Error in onchange callback: {key} {value} {err}')
errors.display(err, 'Error in onchange callback')
setattr(self, key, oldval)
return False
return True
def get_default(self, key):
"""returns the default value for the key"""
data_label = self.data_labels.get(key)
return data_label.default if data_label is not None else None
def list(self):
"""list all visible options"""
components = [k for k, v in self.data_labels.items() if v.visible]
return components
def save_atomic(self, filename=None, silent=False):
if self.filename is None:
self.filename = cmd_opts.config
if filename is None:
filename = self.filename
filename = os.path.abspath(filename)
if cmd_opts.freeze:
log.warning(f'Setting: fn="{filename}" save disabled')
return
try:
diff = {}
unused_settings = []
# if self.debug:
# log.debug('Settings: user')
# for k, v in self.data.items():
# log.trace(f' Config: item={k} value={v} default={self.data_labels[k].default if k in self.data_labels else None}')
if self.debug:
log.debug(f'Settings: total={len(self.data.keys())} known={len(self.data_labels.keys())}')
for k, v in self.data.items():
if k in self.data_labels:
default = self.data_labels[k].default
if isinstance(v, list):
if (len(default) != len(v) or set(default) != set(v)): # list order is non-deterministic
diff[k] = v
if self.debug:
log.trace(f'Settings changed: {k}={v} default={default}')
elif self.data_labels[k].default != v:
diff[k] = v
if self.debug:
log.trace(f'Settings changed: {k}={v} default={default}')
else:
if k not in compatibility_opts:
diff[k] = v
if not k.startswith('uiux_'):
unused_settings.append(k)
if self.debug:
log.trace(f'Settings unknown: {k}={v}')
writefile(diff, filename, silent=silent)
if self.debug:
log.trace(f'Settings save: count={len(diff.keys())} {diff}')
if len(unused_settings) > 0:
log.debug(f"Settings: unused={unused_settings}")
except Exception as err:
log.error(f'Settings: fn="{filename}" {err}')
def save(self, filename=None, silent=False):
threading.Thread(target=self.save_atomic, args=(filename, silent)).start()
def same_type(self, x, y):
if x is None or y is None:
return True
type_x = self.typemap.get(type(x), type(x))
type_y = self.typemap.get(type(y), type(y))
return type_x == type_y
def load(self, filename=None):
if filename is None:
filename = self.filename
filename = os.path.abspath(filename)
if not os.path.isfile(filename):
log.debug(f'Settings: fn="{filename}" created')
self.save(filename)
return
self.data = readfile(filename, lock=True)
if self.data.get('quicksettings') is not None and self.data.get('quicksettings_list') is None:
self.data['quicksettings_list'] = [i.strip() for i in self.data.get('quicksettings').split(',')]
unknown_settings = []
for k, v in self.data.items():
info: OptionInfo = self.data_labels.get(k, None)
if info is not None:
if not info.validate(k, v):
self.data[k] = info.default
if info is not None and not self.same_type(info.default, v):
log.warning(f"Setting validation: {k}={v} ({type(v).__name__} expected={type(info.default).__name__})")
self.data[k] = info.default
if info is None and k not in compatibility_opts and not k.startswith('uiux_'):
unknown_settings.append(k)
if len(unknown_settings) > 0:
log.warning(f"Setting validation: unknown={unknown_settings}")
def onchange(self, key, func, call=True):
item = self.data_labels.get(key)
item.onchange = func
if call:
func()
def dumpjson(self):
d = {k: self.data.get(k, self.data_labels.get(k).default) for k in self.data_labels.keys()}
metadata = {
k: {
"is_stored": k in self.data and self.data[k] != self.data_labels[k].default, # pylint: disable=unnecessary-dict-index-lookup
"tab_name": v.section[0]
} for k, v in self.data_labels.items()
}
return json.dumps({"values": d, "metadata": metadata})
def add_option(self, key, info):
self.data_labels[key] = info
def reorder(self):
"""reorder settings so that all items related to section always go together"""
section_ids = {}
settings_items = self.data_labels.items()
for _k, item in settings_items:
if item.section not in section_ids:
section_ids[item.section] = len(section_ids)
self.data_labels = dict(sorted(settings_items, key=lambda x: section_ids[x[1].section]))
def cast_value(self, key, value):
"""casts an arbitrary to the same type as this setting's value with key
Example: cast_value("eta_noise_seed_delta", "12") -> returns 12 (an int rather than str)
"""
if value is None:
return None
default_value = self.data_labels[key].default
if default_value is None:
default_value = getattr(self, key, None)
if default_value is None:
return None
expected_type = type(default_value)
if expected_type == bool and value == "False":
value = False
elif expected_type == type(value):
pass
else:
value = expected_type(value)
return value

View File

@ -1,18 +1,13 @@
from functools import lru_cache
import io import io
import os import os
import sys import sys
import time import time
import json
import threading
import contextlib import contextlib
from types import SimpleNamespace
from enum import Enum from enum import Enum
import requests
import gradio as gr import gradio as gr
import fasteners
import orjson
import diffusers import diffusers
from modules.json_helpers import readfile, writefile # pylint: disable=W0611
from modules.shared_helpers import listdir, walk_files, html_path, html, req, total_tqdm # pylint: disable=W0611
from modules import errors, devices, shared_items, shared_state, cmd_args, theme, history, files_cache from modules import errors, devices, shared_items, shared_state, cmd_args, theme, history, files_cache
from modules.paths import models_path, script_path, data_path, sd_configs_path, sd_default_config, sd_model_file, default_sd_model_file, extensions_dir, extensions_builtin_dir # pylint: disable=W0611 from modules.paths import models_path, script_path, data_path, sd_configs_path, sd_default_config, sd_model_file, default_sd_model_file, extensions_dir, extensions_builtin_dir # pylint: disable=W0611
from modules.dml import memory_providers, default_memory_provider, directml_do_hijack from modules.dml import memory_providers, default_memory_provider, directml_do_hijack
@ -21,23 +16,23 @@ from modules.memstats import memory_stats, ram_stats # pylint: disable=unused-im
from modules.interrogate.openclip import caption_models, caption_types, get_clip_models, refresh_clip_models from modules.interrogate.openclip import caption_models, caption_types, get_clip_models, refresh_clip_models
from modules.interrogate.vqa import vlm_models, vlm_prompts, vlm_system from modules.interrogate.vqa import vlm_models, vlm_prompts, vlm_system
from modules.ui_components import DropdownEditable from modules.ui_components import DropdownEditable
from modules.options import OptionInfo from modules.options import OptionInfo, options_section
import modules.memmon import modules.memmon
import modules.styles import modules.styles
import modules.paths as paths import modules.paths as paths
from installer import log, print_dict, console # pylint: disable=unused-import from installer import log, print_dict, console, get_version # pylint: disable=unused-import
errors.install([gr]) errors.install([gr])
demo: gr.Blocks = None demo: gr.Blocks = None
api = None api = None
progress_print_out = sys.stdout
parser = cmd_args.parser
url = 'https://github.com/vladmandic/sdnext' url = 'https://github.com/vladmandic/sdnext'
cmd_opts, _ = parser.parse_known_args() cmd_opts = cmd_args.parse_args()
parser = cmd_args.parser
hide_dirs = {"visible": not cmd_opts.hide_ui_dir_config} hide_dirs = {"visible": not cmd_opts.hide_ui_dir_config}
listfiles = listdir
xformers_available = False xformers_available = False
locking_available = True # used by file read/write locking compiled_model_state = None
sd_upscalers = [] sd_upscalers = []
detailers = [] detailers = []
face_restorers = [] face_restorers = []
@ -61,12 +56,10 @@ restricted_opts = {
"outdir_init_images" "outdir_init_images"
} }
resize_modes = ["None", "Fixed", "Crop", "Fill", "Outpaint", "Context aware"] resize_modes = ["None", "Fixed", "Crop", "Fill", "Outpaint", "Context aware"]
compatibility_opts = ['clip_skip', 'uni_pc_lower_order_final', 'uni_pc_order']
dir_timestamps = {}
dir_cache = {}
max_workers = 8 max_workers = 8
default_hfcache_dir = os.environ.get("SD_HFCACHEDIR", None) or os.path.join(os.path.expanduser('~'), '.cache', 'huggingface', 'hub') default_hfcache_dir = os.environ.get("SD_HFCACHEDIR", None) or os.path.join(os.path.expanduser('~'), '.cache', 'huggingface', 'hub')
sdnq_quant_modes = ["int8", "float8_e4m3fn", "int7", "int6", "int5", "uint4", "uint3", "uint2", "float8_e5m2", "float8_e4m3fnuz", "float8_e5m2fnuz", "uint8", "uint7", "uint6", "uint5", "int4", "int3", "int2", "uint1"] sdnq_quant_modes = ["int8", "float8_e4m3fn", "int7", "int6", "int5", "uint4", "uint3", "uint2", "float8_e5m2", "float8_e4m3fnuz", "float8_e5m2fnuz", "uint8", "uint7", "uint6", "uint5", "int4", "int3", "int2", "uint1"]
state = shared_state.State()
class Backend(Enum): class Backend(Enum):
@ -74,119 +67,10 @@ class Backend(Enum):
DIFFUSERS = 2 DIFFUSERS = 2
state = shared_state.State()
if not hasattr(cmd_opts, "use_openvino"):
cmd_opts.use_openvino = False
def readfile(filename, silent=False, lock=False):
global locking_available # pylint: disable=global-statement
data = {}
lock_file = None
locked = False
if lock and locking_available:
try:
lock_file = fasteners.InterProcessReaderWriterLock(f"{filename}.lock")
lock_file.logger.disabled = True
locked = lock_file.acquire_read_lock(blocking=True, timeout=3)
except Exception as err:
lock_file = None
locking_available = False
log.error(f'File read lock: file="{filename}" {err}')
locked = False
try:
# if not os.path.exists(filename):
# return {}
t0 = time.time()
with open(filename, "rb") as file:
b = file.read()
data = orjson.loads(b) # pylint: disable=no-member
# if type(data) is str:
# data = json.loads(data)
t1 = time.time()
if not silent:
fn = f'{sys._getframe(2).f_code.co_name}:{sys._getframe(1).f_code.co_name}' # pylint: disable=protected-access
log.debug(f'Read: file="{filename}" json={len(data)} bytes={os.path.getsize(filename)} time={t1-t0:.3f} fn={fn}')
except FileNotFoundError as err:
if not silent:
log.debug(f'Reading failed: {filename} {err}')
except Exception as err:
if not silent:
log.error(f'Reading failed: {filename} {err}')
try:
if locking_available and lock_file is not None:
lock_file.release_read_lock()
if locked and os.path.exists(f"{filename}.lock"):
os.remove(f"{filename}.lock")
except Exception:
locking_available = False
return data
def writefile(data, filename, mode='w', silent=False, atomic=False):
import tempfile
global locking_available # pylint: disable=global-statement
lock_file = None
locked = False
def default(obj):
log.error(f'Save: file="{filename}" not a valid object: {obj}')
return str(obj)
try:
t0 = time.time()
# skipkeys=True, ensure_ascii=True, check_circular=True, allow_nan=True
if type(data) == dict:
output = json.dumps(data, indent=2, default=default)
elif type(data) == list:
output = json.dumps(data, indent=2, default=default)
elif isinstance(data, object):
simple = {}
for k in data.__dict__:
if data.__dict__[k] is not None:
simple[k] = data.__dict__[k]
output = json.dumps(simple, indent=2, default=default)
else:
raise ValueError('not a valid object')
except Exception as err:
log.error(f'Save failed: file="{filename}" {err}')
return
try:
if locking_available:
lock_file = fasteners.InterProcessReaderWriterLock(f"{filename}.lock") if locking_available else None
lock_file.logger.disabled = True
locked = lock_file.acquire_write_lock(blocking=True, timeout=3) if lock_file is not None else False
except Exception as err:
locking_available = False
lock_file = None
log.error(f'File write lock: file="{filename}" {err}')
locked = False
try:
if atomic:
with tempfile.NamedTemporaryFile(mode=mode, encoding="utf8", delete=False, dir=os.path.dirname(filename)) as f:
f.write(output)
f.flush()
os.fsync(f.fileno())
os.replace(f.name, filename)
else:
with open(filename, mode=mode, encoding="utf8") as file:
file.write(output)
t1 = time.time()
if not silent:
log.debug(f'Save: file="{filename}" json={len(data)} bytes={len(output)} time={t1-t0:.3f}')
except Exception as err:
log.error(f'Save failed: file="{filename}" {err}')
try:
if locking_available and lock_file is not None:
lock_file.release_write_lock()
if locked and os.path.exists(f"{filename}.lock"):
os.remove(f"{filename}.lock")
except Exception:
locking_available = False
# early select backend # early select backend
backend = Backend.DIFFUSERS backend = Backend.DIFFUSERS
if not hasattr(cmd_opts, "use_openvino"):
cmd_opts.use_openvino = False
if cmd_opts.use_openvino: # override for openvino if cmd_opts.use_openvino: # override for openvino
from modules.intel.openvino import get_device_list as get_openvino_device_list # pylint: disable=ungrouped-imports from modules.intel.openvino import get_device_list as get_openvino_device_list # pylint: disable=ungrouped-imports
elif cmd_opts.use_ipex or devices.has_xpu(): elif cmd_opts.use_ipex or devices.has_xpu():
@ -209,12 +93,6 @@ if not files_cache.do_cache_folders:
log.warning('File cache disabled: ') log.warning('File cache disabled: ')
def options_section(section_identifier, options_dict):
for v in options_dict.values():
v.section = section_identifier
return options_dict
def list_checkpoint_titles(): def list_checkpoint_titles():
import modules.sd_models # pylint: disable=W0621 import modules.sd_models # pylint: disable=W0621
return modules.sd_models.checkpoint_titles() return modules.sd_models.checkpoint_titles()
@ -251,76 +129,6 @@ def list_samplers():
return modules.sd_samplers.all_samplers return modules.sd_samplers.all_samplers
def temp_disable_extensions():
disable_safe = [
'sd-webui-controlnet',
'multidiffusion-upscaler-for-automatic1111',
'a1111-sd-webui-lycoris',
'sd-webui-agent-scheduler',
'clip-interrogator-ext',
'stable-diffusion-webui-images-browser',
]
disable_diffusers = [
'sd-webui-controlnet',
'multidiffusion-upscaler-for-automatic1111',
'a1111-sd-webui-lycoris',
'sd-webui-animatediff',
]
disable_themes = [
'sd-webui-lobe-theme',
'cozy-nest',
'sdnext-modernui',
]
disabled = []
if cmd_opts.theme is not None:
theme_name = cmd_opts.theme
else:
theme_name = f'{opts.theme_type.lower()}/{opts.gradio_theme}'
if theme_name == 'lobe':
disable_themes.remove('sd-webui-lobe-theme')
elif theme_name == 'cozy-nest' or theme_name == 'cozy':
disable_themes.remove('cozy-nest')
elif '/' not in theme_name: # set default themes per type
if theme_name == 'standard' or theme_name == 'default':
theme_name = 'standard/black-teal'
if theme_name == 'modern':
theme_name = 'modern/Default'
if theme_name == 'gradio':
theme_name = 'gradio/default'
if theme_name == 'huggingface':
theme_name = 'huggingface/blaaa'
if theme_name.lower().startswith('standard') or theme_name.lower().startswith('default'):
modules.shared.opts.data['theme_type'] = 'Standard'
modules.shared.opts.data['gradio_theme'] = theme_name[9:]
elif theme_name.lower().startswith('modern'):
modules.shared.opts.data['theme_type'] = 'Modern'
modules.shared.opts.data['gradio_theme'] = theme_name[7:]
disable_themes.remove('sdnext-modernui')
elif theme_name.lower().startswith('huggingface') or theme_name.lower().startswith('gradio') or theme_name.lower().startswith('none'):
modules.shared.opts.data['theme_type'] = 'None'
modules.shared.opts.data['gradio_theme'] = theme_name
else:
modules.shared.log.error(f'UI theme invalid: theme="{theme_name}" available={["standard/*", "modern/*", "none/*"]} fallback="standard/black-teal"')
modules.shared.opts.data['theme_type'] = 'Standard'
modules.shared.opts.data['gradio_theme'] = 'black-teal'
for ext in disable_themes:
if ext.lower() not in opts.disabled_extensions:
disabled.append(ext)
if cmd_opts.safe:
for ext in disable_safe:
if ext.lower() not in opts.disabled_extensions:
disabled.append(ext)
for ext in disable_diffusers:
if ext.lower() not in opts.disabled_extensions:
disabled.append(ext)
disabled.append('Lora')
cmd_opts.controlnet_loglevel = 'WARNING'
return disabled
def get_default_modes(): def get_default_modes():
default_offload_mode = "none" default_offload_mode = "none"
default_diffusers_offload_min_gpu_memory = 0.2 default_diffusers_offload_min_gpu_memory = 0.2
@ -939,279 +747,11 @@ options_templates.update(options_section((None, "Hidden options"), {
"tooltips": OptionInfo("UI Tooltips", "UI tooltips", gr.Radio, {"choices": ["None", "Browser default", "UI tooltips"], "visible": False}), "tooltips": OptionInfo("UI Tooltips", "UI tooltips", gr.Radio, {"choices": ["None", "Browser default", "UI tooltips"], "visible": False}),
})) }))
options_templates.update(options_section((None, "Legacy options"), {
"interrogate_clip_skip_categories": OptionInfo(["artists", "movements", "flavors"], "CLiP: skip categories", gr.CheckboxGroup, lambda: {"choices": []}, visible=False),
"lora_legacy": OptionInfo(False, "LoRA load using legacy method", gr.Checkbox, {"visible": False}),
"lora_preferred_name": OptionInfo("filename", "LoRA preferred name", gr.Radio, {"choices": ["filename", "alias"], "visible": False}),
"img2img_extra_noise": OptionInfo(0.0, "Extra noise multiplier for img2img", gr.Slider, {"minimum": 0.0, "maximum": 1.0, "step": 0.01, "visible": False}),
"disable_weights_auto_swap": OptionInfo(True, "Do not change selected model when reading generation parameters", gr.Checkbox, {"visible": False}),
"sub_quad_q_chunk_size": OptionInfo(512, "Attention query chunk size", gr.Slider, {"minimum": 16, "maximum": 8192, "step": 8, "visible": False}),
"sub_quad_kv_chunk_size": OptionInfo(512, "Attention kv chunk size", gr.Slider, {"minimum": 0, "maximum": 8192, "step": 8, "visible": False}),
"sub_quad_chunk_threshold": OptionInfo(80, "Attention chunking threshold", gr.Slider, {"minimum": 0, "maximum": 100, "step": 1, "visible": False}),
"upcast_attn": OptionInfo(False, "Upcast attention layer", gr.Checkbox, {"visible": False}),
"cuda_cast_unet": OptionInfo(False, "Fixed UNet precision", gr.Checkbox, {"visible": False}),
"comma_padding_backtrack": OptionInfo(20, "Prompt padding", gr.Slider, {"minimum": 0, "maximum": 74, "step": 1, "visible": False}),
"sd_textencoder_cache": OptionInfo(True, "Cache text encoder results", gr.Checkbox, {"visible": False}),
"rollback_vae": OptionInfo(False, "Attempt VAE roll back for NaN values", gr.Checkbox, {"visible": False}),
"sd_vae_sliced_encode": OptionInfo(False, "VAE sliced encode", gr.Checkbox, {"visible": False}),
"nan_skip": OptionInfo(False, "Skip Generation if NaN found in latents", gr.Checkbox, {"visible": False}),
"sd_model_dict": OptionInfo('None', "Use separate base dict", gr.Dropdown, lambda: {"choices": ['None'] + list_checkpoint_titles(), "visible": False}),
"diffusers_move_base": OptionInfo(False, "Move base model to CPU when using refiner", gr.Checkbox, {"visible": False }),
"diffusers_move_unet": OptionInfo(False, "Move base model to CPU when using VAE", gr.Checkbox, {"visible": False }),
"diffusers_move_refiner": OptionInfo(False, "Move refiner model to CPU when not in use", gr.Checkbox, {"visible": False }),
"diffusers_extract_ema": OptionInfo(False, "Use model EMA weights when possible", gr.Checkbox, {"visible": False }),
"batch_cond_uncond": OptionInfo(True, "Do conditional and unconditional denoising in one batch", gr.Checkbox, {"visible": False}),
"CLIP_stop_at_last_layers": OptionInfo(1, "Clip skip", gr.Slider, {"minimum": 1, "maximum": 8, "step": 1, "visible": False}),
"dataset_filename_join_string": OptionInfo(" ", "Filename join string", gr.Textbox, { "visible": False }),
"dataset_filename_word_regex": OptionInfo("", "Filename word regex", gr.Textbox, { "visible": False }),
"diffusers_force_zeros": OptionInfo(False, "Force zeros for prompts when empty", gr.Checkbox, {"visible": False}),
"disable_all_extensions": OptionInfo("none", "Disable all extensions (preserves the list of disabled extensions)", gr.Radio, {"choices": ["none", "user", "all"]}),
"disable_nan_check": OptionInfo(True, "Disable NaN check", gr.Checkbox, {"visible": False}),
"embeddings_templates_dir": OptionInfo("", "Embeddings train templates directory", gr.Textbox, { "visible": False }),
"extra_networks_card_fit": OptionInfo("cover", "UI image contain method", gr.Radio, {"choices": ["contain", "cover", "fill"], "visible": False}),
"grid_extended_filename": OptionInfo(True, "Add extended info to filename when saving grid", gr.Checkbox, {"visible": False}),
"grid_save_to_dirs": OptionInfo(False, "Save grids to a subdirectory", gr.Checkbox, {"visible": False}),
"hypernetwork_enabled": OptionInfo(False, "Enable Hypernetwork support", gr.Checkbox, {"visible": False}),
"img2img_fix_steps": OptionInfo(False, "For image processing do exact number of steps as specified", gr.Checkbox, { "visible": False }),
"interrogate_clip_dict_limit": OptionInfo(2048, "CLIP: maximum number of lines in text file", gr.Slider, { "visible": False }),
"keyedit_delimiters": OptionInfo(r".,\/!?%^*;:{}=`~()", "Ctrl+up/down word delimiters", gr.Textbox, { "visible": False }),
"keyedit_precision_attention": OptionInfo(0.1, "Ctrl+up/down precision when editing (attention:1.1)", gr.Slider, {"minimum": 0.01, "maximum": 0.2, "step": 0.001, "visible": False}),
"keyedit_precision_extra": OptionInfo(0.05, "Ctrl+up/down precision when editing <extra networks:0.9>", gr.Slider, {"minimum": 0.01, "maximum": 0.2, "step": 0.001, "visible": False}),
"live_preview_content": OptionInfo("Combined", "Live preview subject", gr.Radio, {"choices": ["Combined", "Prompt", "Negative prompt"], "visible": False}),
"live_previews_enable": OptionInfo(True, "Show live previews", gr.Checkbox, {"visible": False}),
"lora_functional": OptionInfo(False, "Use Kohya method for handling multiple LoRA", gr.Checkbox, { "visible": False }),
"lyco_dir": OptionInfo(os.path.join(paths.models_path, 'LyCORIS'), "Folder with LyCORIS network(s)", gr.Text, {"visible": False}),
"model_reuse_dict": OptionInfo(False, "Reuse loaded model dictionary", gr.Checkbox, {"visible": False}),
"pad_cond_uncond": OptionInfo(True, "Pad prompt and negative prompt to be same length", gr.Checkbox, {"visible": False}),
"pin_memory": OptionInfo(True, "Pin training dataset to memory", gr.Checkbox, { "visible": False }),
"save_optimizer_state": OptionInfo(False, "Save resumable optimizer state when training", gr.Checkbox, { "visible": False }),
"save_training_settings_to_txt": OptionInfo(True, "Save training settings to a text file", gr.Checkbox, { "visible": False }),
"sd_disable_ckpt": OptionInfo(False, "Disallow models in ckpt format", gr.Checkbox, {"visible": False}),
"sd_lora": OptionInfo("", "Add LoRA to prompt", gr.Textbox, {"visible": False}),
"sd_vae_checkpoint_cache": OptionInfo(0, "Cached VAEs", gr.Slider, {"minimum": 0, "maximum": 10, "step": 1, "visible": False}),
"show_progress_grid": OptionInfo(True, "Show previews as a grid", gr.Checkbox, {"visible": False}),
"show_progressbar": OptionInfo(True, "Show progressbar", gr.Checkbox, {"visible": False}),
"training_enable_tensorboard": OptionInfo(False, "Enable tensorboard logging", gr.Checkbox, { "visible": False }),
"training_image_repeats_per_epoch": OptionInfo(1, "Image repeats per epoch", gr.Slider, {"minimum": 1, "maximum": 100, "step": 1, "visible": False }),
"training_tensorboard_flush_every": OptionInfo(120, "Tensorboard flush period", gr.Number, { "visible": False }),
"training_tensorboard_save_images": OptionInfo(False, "Save generated images within tensorboard", gr.Checkbox, { "visible": False }),
"training_write_csv_every": OptionInfo(0, "Save loss CSV file every n steps", gr.Number, { "visible": False }),
"ui_scripts_reorder": OptionInfo("", "UI scripts order", gr.Textbox, { "visible": False }),
"unload_models_when_training": OptionInfo(False, "Move VAE and CLIP to RAM when training", gr.Checkbox, { "visible": False }),
"upscaler_for_img2img": OptionInfo("None", "Default upscaler for image resize operations", gr.Dropdown, lambda: {"choices": [x.name for x in sd_upscalers], "visible": False}, refresh=refresh_upscalers),
"use_save_to_dirs_for_ui": OptionInfo(False, "Save images to a subdirectory when using Save button", gr.Checkbox, {"visible": False}),
"use_upscaler_name_as_suffix": OptionInfo(True, "Use upscaler as suffix", gr.Checkbox, {"visible": False}),
}))
options_templates.update() from modules.shared_legacy import get_legacy_options
options_templates.update(get_legacy_options())
from modules.options_handler import Options
class Options: opts = Options(options_templates, restricted_opts)
data = None
data_labels = options_templates
filename = None
typemap = {int: float}
debug = os.environ.get('SD_CONFIG_DEBUG', None) is not None
def __init__(self):
self.data = {k: v.default for k, v in self.data_labels.items()}
def __setattr__(self, key, value): # pylint: disable=inconsistent-return-statements
if self.data is not None:
if key in self.data or key in self.data_labels:
if cmd_opts.freeze:
log.warning(f'Settings are frozen: {key}')
return
if cmd_opts.hide_ui_dir_config and key in restricted_opts:
log.warning(f'Settings key is restricted: {key}')
return
if self.debug:
log.trace(f'Settings set: {key}={value}')
self.data[key] = value
return
return super(Options, self).__setattr__(key, value) # pylint: disable=super-with-arguments
def get(self, item):
if self.data is not None:
if item in self.data:
return self.data[item]
if item in self.data_labels:
return self.data_labels[item].default
return super(Options, self).__getattribute__(item) # pylint: disable=super-with-arguments
def __getattr__(self, item):
if self.data is not None:
if item in self.data:
return self.data[item]
if item in self.data_labels:
return self.data_labels[item].default
return super(Options, self).__getattribute__(item) # pylint: disable=super-with-arguments
def set(self, key, value):
"""sets an option and calls its onchange callback, returning True if the option changed and False otherwise"""
oldval = self.data.get(key, None)
if oldval is None:
oldval = self.data_labels[key].default
if oldval == value:
return False
try:
setattr(self, key, value)
except RuntimeError:
return False
if self.data_labels[key].onchange is not None:
try:
self.data_labels[key].onchange()
except Exception as err:
log.error(f'Error in onchange callback: {key} {value} {err}')
errors.display(err, 'Error in onchange callback')
setattr(self, key, oldval)
return False
return True
def get_default(self, key):
"""returns the default value for the key"""
data_label = self.data_labels.get(key)
return data_label.default if data_label is not None else None
def list(self):
"""list all visible options"""
components = [k for k, v in self.data_labels.items() if v.visible]
return components
def save_atomic(self, filename=None, silent=False):
if self.filename is None:
self.filename = config_filename
if filename is None:
filename = self.filename
filename = os.path.abspath(filename)
if cmd_opts.freeze:
log.warning(f'Setting: fn="{filename}" save disabled')
return
try:
diff = {}
unused_settings = []
# if self.debug:
# log.debug('Settings: user')
# for k, v in self.data.items():
# log.trace(f' Config: item={k} value={v} default={self.data_labels[k].default if k in self.data_labels else None}')
if self.debug:
log.debug(f'Settings: total={len(self.data.keys())} known={len(self.data_labels.keys())}')
for k, v in self.data.items():
if k in self.data_labels:
default = self.data_labels[k].default
if isinstance(v, list):
if (len(default) != len(v) or set(default) != set(v)): # list order is non-deterministic
diff[k] = v
if self.debug:
log.trace(f'Settings changed: {k}={v} default={default}')
elif self.data_labels[k].default != v:
diff[k] = v
if self.debug:
log.trace(f'Settings changed: {k}={v} default={default}')
else:
if k not in compatibility_opts:
diff[k] = v
if not k.startswith('uiux_'):
unused_settings.append(k)
if self.debug:
log.trace(f'Settings unknown: {k}={v}')
writefile(diff, filename, silent=silent)
if self.debug:
log.trace(f'Settings save: count={len(diff.keys())} {diff}')
if len(unused_settings) > 0:
log.debug(f"Settings: unused={unused_settings}")
except Exception as err:
log.error(f'Settings: fn="{filename}" {err}')
def save(self, filename=None, silent=False):
threading.Thread(target=self.save_atomic, args=(filename, silent)).start()
def same_type(self, x, y):
if x is None or y is None:
return True
type_x = self.typemap.get(type(x), type(x))
type_y = self.typemap.get(type(y), type(y))
return type_x == type_y
def load(self, filename=None):
if filename is None:
filename = self.filename
filename = os.path.abspath(filename)
if not os.path.isfile(filename):
log.debug(f'Settings: fn="{filename}" created')
self.save(filename)
return
self.data = readfile(filename, lock=True)
if self.data.get('quicksettings') is not None and self.data.get('quicksettings_list') is None:
self.data['quicksettings_list'] = [i.strip() for i in self.data.get('quicksettings').split(',')]
unknown_settings = []
for k, v in self.data.items():
info: OptionInfo = self.data_labels.get(k, None)
if info is not None:
if not info.validate(k, v):
self.data[k] = info.default
if info is not None and not self.same_type(info.default, v):
log.warning(f"Setting validation: {k}={v} ({type(v).__name__} expected={type(info.default).__name__})")
self.data[k] = info.default
if info is None and k not in compatibility_opts and not k.startswith('uiux_'):
unknown_settings.append(k)
if len(unknown_settings) > 0:
log.warning(f"Setting validation: unknown={unknown_settings}")
def onchange(self, key, func, call=True):
item = self.data_labels.get(key)
item.onchange = func
if call:
func()
def dumpjson(self):
d = {k: self.data.get(k, self.data_labels.get(k).default) for k in self.data_labels.keys()}
metadata = {
k: {
"is_stored": k in self.data and self.data[k] != self.data_labels[k].default, # pylint: disable=unnecessary-dict-index-lookup
"tab_name": v.section[0]
} for k, v in self.data_labels.items()
}
return json.dumps({"values": d, "metadata": metadata})
def add_option(self, key, info):
self.data_labels[key] = info
def reorder(self):
"""reorder settings so that all items related to section always go together"""
section_ids = {}
settings_items = self.data_labels.items()
for _k, item in settings_items:
if item.section not in section_ids:
section_ids[item.section] = len(section_ids)
self.data_labels = dict(sorted(settings_items, key=lambda x: section_ids[x[1].section]))
def cast_value(self, key, value):
"""casts an arbitrary to the same type as this setting's value with key
Example: cast_value("eta_noise_seed_delta", "12") -> returns 12 (an int rather than str)
"""
if value is None:
return None
default_value = self.data_labels[key].default
if default_value is None:
default_value = getattr(self, key, None)
if default_value is None:
return None
expected_type = type(default_value)
if expected_type == bool and value == "False":
value = False
elif expected_type == type(value):
pass
else:
value = expected_type(value)
return value
profiler = None
opts = Options()
config_filename = cmd_opts.config config_filename = cmd_opts.config
opts.load(config_filename) opts.load(config_filename)
cmd_opts = cmd_args.settings_args(opts, cmd_opts) cmd_opts = cmd_args.settings_args(opts, cmd_opts)
@ -1223,6 +763,7 @@ opts.data['uni_pc_lower_order_final'] = opts.schedulers_use_loworder # compatibi
opts.data['uni_pc_order'] = max(2, opts.schedulers_solver_order) # compatibility opts.data['uni_pc_order'] = max(2, opts.schedulers_solver_order) # compatibility
log.info(f'Engine: backend={backend} compute={devices.backend} device={devices.get_optimal_device_name()} attention="{opts.cross_attention_optimization}" mode={devices.inference_context.__name__}') log.info(f'Engine: backend={backend} compute={devices.backend} device={devices.get_optimal_device_name()} attention="{opts.cross_attention_optimization}" mode={devices.inference_context.__name__}')
profiler = None
prompt_styles = modules.styles.StyleDatabase(opts) prompt_styles = modules.styles.StyleDatabase(opts)
reference_models = readfile(os.path.join('html', 'reference.json')) if opts.extra_network_reference_enable else {} reference_models = readfile(os.path.join('html', 'reference.json')) if opts.extra_network_reference_enable else {}
cmd_opts.disable_extension_access = (cmd_opts.share or cmd_opts.listen or (cmd_opts.server_name or False)) and not cmd_opts.insecure cmd_opts.disable_extension_access = (cmd_opts.share or cmd_opts.listen or (cmd_opts.server_name or False)) and not cmd_opts.insecure
@ -1251,21 +792,6 @@ except Exception as ex:
log.error(f'Device: {ex}') log.error(f'Device: {ex}')
class TotalTQDM: # compatibility with previous global-tqdm
# import tqdm
def __init__(self):
pass
def reset(self):
pass
def update(self):
pass
def updateTotal(self, new_total):
pass
def clear(self):
pass
total_tqdm = TotalTQDM()
def restart_server(restart=True): def restart_server(restart=True):
if demo is None: if demo is None:
return return
@ -1297,87 +823,12 @@ def restore_defaults(restart=True):
restart_server(restart) restart_server(restart)
def listdir(path): # startup def of shared.sd_model before its redefined in modeldata
if not os.path.exists(path):
return []
mtime = os.path.getmtime(path)
if path in dir_timestamps and mtime == dir_timestamps[path]:
return dir_cache[path]
else:
dir_cache[path] = [os.path.join(path, f) for f in os.listdir(path)]
dir_timestamps[path] = mtime
return dir_cache[path]
def walk_files(path, allowed_extensions=None):
if not os.path.exists(path):
return
if allowed_extensions is not None:
allowed_extensions = set(allowed_extensions)
for root, _dirs, files in os.walk(path, followlinks=True):
for filename in files:
if allowed_extensions is not None:
_, ext = os.path.splitext(filename)
if ext not in allowed_extensions:
continue
yield os.path.join(root, filename)
def html_path(filename):
return os.path.join(paths.script_path, "html", filename)
def html(filename):
path = html_path(filename)
if os.path.exists(path):
with open(path, encoding="utf8") as file:
return file.read()
return ""
@lru_cache(maxsize=1)
def get_version():
version = None
if version is None:
try:
import subprocess
res = subprocess.run('git log --pretty=format:"%h %ad" -1 --date=short', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
ver = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ' '
githash, updated = ver.split(' ')
res = subprocess.run('git remote get-url origin', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
origin = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
res = subprocess.run('git rev-parse --abbrev-ref HEAD', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
branch = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
version = {
'app': 'sd.next',
'updated': updated,
'hash': githash,
'url': origin.replace('\n', '') + '/tree/' + branch.replace('\n', '')
}
except Exception:
version = { 'app': 'sd.next' }
return version
def req(url_addr, headers = None, **kwargs):
if headers is None:
headers = { 'Content-type': 'application/json' }
try:
res = requests.get(url_addr, timeout=30, headers=headers, verify=False, allow_redirects=True, **kwargs)
except Exception as err:
log.error(f'HTTP request error: url={url_addr} {err}')
res = { 'status_code': 500, 'text': f'HTTP request error: url={url_addr} {err}' }
res = SimpleNamespace(**res)
return res
sd_model: diffusers.DiffusionPipeline = None # dummy and overwritten by class sd_model: diffusers.DiffusionPipeline = None # dummy and overwritten by class
sd_refiner: diffusers.DiffusionPipeline = None # dummy and overwritten by class sd_refiner: diffusers.DiffusionPipeline = None # dummy and overwritten by class
sd_model_type: str = '' # dummy and overwritten by class sd_model_type: str = '' # dummy and overwritten by class
sd_refiner_type: str = '' # dummy and overwritten by class sd_refiner_type: str = '' # dummy and overwritten by class
sd_loaded: bool = False # dummy and overwritten by class sd_loaded: bool = False # dummy and overwritten by class
compiled_model_state = None
listfiles = listdir
from modules.modeldata import Shared # pylint: disable=ungrouped-imports from modules.modeldata import Shared # pylint: disable=ungrouped-imports
sys.modules[__name__].__class__ = Shared sys.modules[__name__].__class__ = Shared

74
modules/shared_helpers.py Normal file
View File

@ -0,0 +1,74 @@
import os
from types import SimpleNamespace
from modules import paths
from installer import log
dir_timestamps = {}
dir_cache = {}
def listdir(path):
if not os.path.exists(path):
return []
mtime = os.path.getmtime(path)
if path in dir_timestamps and mtime == dir_timestamps[path]:
return dir_cache[path]
else:
dir_cache[path] = [os.path.join(path, f) for f in os.listdir(path)]
dir_timestamps[path] = mtime
return dir_cache[path]
def walk_files(path, allowed_extensions=None):
if not os.path.exists(path):
return
if allowed_extensions is not None:
allowed_extensions = set(allowed_extensions)
for root, _dirs, files in os.walk(path, followlinks=True):
for filename in files:
if allowed_extensions is not None:
_, ext = os.path.splitext(filename)
if ext not in allowed_extensions:
continue
yield os.path.join(root, filename)
def html_path(filename):
return os.path.join(paths.script_path, "html", filename)
def html(filename):
path = html_path(filename)
if os.path.exists(path):
with open(path, encoding="utf8") as file:
return file.read()
return ""
def req(url_addr, headers = None, **kwargs):
import requests
if headers is None:
headers = { 'Content-type': 'application/json' }
try:
res = requests.get(url_addr, timeout=30, headers=headers, verify=False, allow_redirects=True, **kwargs)
except Exception as err:
log.error(f'HTTP request error: url={url_addr} {err}')
res = { 'status_code': 500, 'text': f'HTTP request error: url={url_addr} {err}' }
res = SimpleNamespace(**res)
return res
class TotalTQDM: # compatibility with previous global-tqdm
# import tqdm
def __init__(self):
pass
def reset(self):
pass
def update(self):
pass
def updateTotal(self, new_total):
pass
def clear(self):
pass
total_tqdm = TotalTQDM()

78
modules/shared_legacy.py Normal file
View File

@ -0,0 +1,78 @@
import os
import gradio as gr
from modules import paths
from modules.options import OptionInfo, options_section
class LegacyOption(OptionInfo):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
legacy_options = options_section((None, "Legacy options"), {
"interrogate_clip_skip_categories": LegacyOption(["artists", "movements", "flavors"], "CLiP: skip categories", gr.CheckboxGroup, lambda: {"choices": []}, visible=False),
"lora_legacy": LegacyOption(False, "LoRA load using legacy method", gr.Checkbox, {"visible": False}),
"lora_preferred_name": LegacyOption("filename", "LoRA preferred name", gr.Radio, {"choices": ["filename", "alias"], "visible": False}),
"img2img_extra_noise": LegacyOption(0.0, "Extra noise multiplier for img2img", gr.Slider, {"minimum": 0.0, "maximum": 1.0, "step": 0.01, "visible": False}),
"disable_weights_auto_swap": LegacyOption(True, "Do not change selected model when reading generation parameters", gr.Checkbox, {"visible": False}),
"sub_quad_q_chunk_size": LegacyOption(512, "Attention query chunk size", gr.Slider, {"minimum": 16, "maximum": 8192, "step": 8, "visible": False}),
"sub_quad_kv_chunk_size": LegacyOption(512, "Attention kv chunk size", gr.Slider, {"minimum": 0, "maximum": 8192, "step": 8, "visible": False}),
"sub_quad_chunk_threshold": LegacyOption(80, "Attention chunking threshold", gr.Slider, {"minimum": 0, "maximum": 100, "step": 1, "visible": False}),
"upcast_attn": LegacyOption(False, "Upcast attention layer", gr.Checkbox, {"visible": False}),
"cuda_cast_unet": LegacyOption(False, "Fixed UNet precision", gr.Checkbox, {"visible": False}),
"comma_padding_backtrack": LegacyOption(20, "Prompt padding", gr.Slider, {"minimum": 0, "maximum": 74, "step": 1, "visible": False}),
"sd_textencoder_cache": LegacyOption(True, "Cache text encoder results", gr.Checkbox, {"visible": False}),
"rollback_vae": LegacyOption(False, "Attempt VAE roll back for NaN values", gr.Checkbox, {"visible": False}),
"sd_vae_sliced_encode": LegacyOption(False, "VAE sliced encode", gr.Checkbox, {"visible": False}),
"nan_skip": LegacyOption(False, "Skip Generation if NaN found in latents", gr.Checkbox, {"visible": False}),
"sd_model_dict": LegacyOption('None', "Use separate base dict", gr.Dropdown, lambda: {"choices": ['None'], "visible": False}),
"diffusers_move_base": LegacyOption(False, "Move base model to CPU when using refiner", gr.Checkbox, {"visible": False }),
"diffusers_move_unet": LegacyOption(False, "Move base model to CPU when using VAE", gr.Checkbox, {"visible": False }),
"diffusers_move_refiner": LegacyOption(False, "Move refiner model to CPU when not in use", gr.Checkbox, {"visible": False }),
"diffusers_extract_ema": LegacyOption(False, "Use model EMA weights when possible", gr.Checkbox, {"visible": False }),
"batch_cond_uncond": LegacyOption(True, "Do conditional and unconditional denoising in one batch", gr.Checkbox, {"visible": False}),
"CLIP_stop_at_last_layers": LegacyOption(1, "Clip skip", gr.Slider, {"minimum": 1, "maximum": 8, "step": 1, "visible": False}),
"dataset_filename_join_string": LegacyOption(" ", "Filename join string", gr.Textbox, { "visible": False }),
"dataset_filename_word_regex": LegacyOption("", "Filename word regex", gr.Textbox, { "visible": False }),
"diffusers_force_zeros": LegacyOption(False, "Force zeros for prompts when empty", gr.Checkbox, {"visible": False}),
"disable_all_extensions": LegacyOption("none", "Disable all extensions (preserves the list of disabled extensions)", gr.Radio, {"choices": ["none", "user", "all"]}),
"disable_nan_check": LegacyOption(True, "Disable NaN check", gr.Checkbox, {"visible": False}),
"embeddings_templates_dir": LegacyOption("", "Embeddings train templates directory", gr.Textbox, { "visible": False }),
"extra_networks_card_fit": LegacyOption("cover", "UI image contain method", gr.Radio, {"choices": ["contain", "cover", "fill"], "visible": False}),
"grid_extended_filename": LegacyOption(True, "Add extended info to filename when saving grid", gr.Checkbox, {"visible": False}),
"grid_save_to_dirs": LegacyOption(False, "Save grids to a subdirectory", gr.Checkbox, {"visible": False}),
"hypernetwork_enabled": LegacyOption(False, "Enable Hypernetwork support", gr.Checkbox, {"visible": False}),
"img2img_fix_steps": LegacyOption(False, "For image processing do exact number of steps as specified", gr.Checkbox, { "visible": False }),
"interrogate_clip_dict_limit": LegacyOption(2048, "CLIP: maximum number of lines in text file", gr.Slider, { "visible": False }),
"keyedit_delimiters": LegacyOption(r".,\/!?%^*;:{}=`~()", "Ctrl+up/down word delimiters", gr.Textbox, { "visible": False }),
"keyedit_precision_attention": LegacyOption(0.1, "Ctrl+up/down precision when editing (attention:1.1)", gr.Slider, {"minimum": 0.01, "maximum": 0.2, "step": 0.001, "visible": False}),
"keyedit_precision_extra": LegacyOption(0.05, "Ctrl+up/down precision when editing <extra networks:0.9>", gr.Slider, {"minimum": 0.01, "maximum": 0.2, "step": 0.001, "visible": False}),
"live_preview_content": LegacyOption("Combined", "Live preview subject", gr.Radio, {"choices": ["Combined", "Prompt", "Negative prompt"], "visible": False}),
"live_previews_enable": LegacyOption(True, "Show live previews", gr.Checkbox, {"visible": False}),
"lora_functional": LegacyOption(False, "Use Kohya method for handling multiple LoRA", gr.Checkbox, { "visible": False }),
"lyco_dir": LegacyOption(os.path.join(paths.models_path, 'LyCORIS'), "Folder with LyCORIS network(s)", gr.Text, {"visible": False}),
"model_reuse_dict": LegacyOption(False, "Reuse loaded model dictionary", gr.Checkbox, {"visible": False}),
"pad_cond_uncond": LegacyOption(True, "Pad prompt and negative prompt to be same length", gr.Checkbox, {"visible": False}),
"pin_memory": LegacyOption(True, "Pin training dataset to memory", gr.Checkbox, { "visible": False }),
"save_optimizer_state": LegacyOption(False, "Save resumable optimizer state when training", gr.Checkbox, { "visible": False }),
"save_training_settings_to_txt": LegacyOption(True, "Save training settings to a text file", gr.Checkbox, { "visible": False }),
"sd_disable_ckpt": LegacyOption(False, "Disallow models in ckpt format", gr.Checkbox, {"visible": False}),
"sd_lora": LegacyOption("", "Add LoRA to prompt", gr.Textbox, {"visible": False}),
"sd_vae_checkpoint_cache": LegacyOption(0, "Cached VAEs", gr.Slider, {"minimum": 0, "maximum": 10, "step": 1, "visible": False}),
"show_progress_grid": LegacyOption(True, "Show previews as a grid", gr.Checkbox, {"visible": False}),
"show_progressbar": LegacyOption(True, "Show progressbar", gr.Checkbox, {"visible": False}),
"training_enable_tensorboard": LegacyOption(False, "Enable tensorboard logging", gr.Checkbox, { "visible": False }),
"training_image_repeats_per_epoch": LegacyOption(1, "Image repeats per epoch", gr.Slider, {"minimum": 1, "maximum": 100, "step": 1, "visible": False }),
"training_tensorboard_flush_every": LegacyOption(120, "Tensorboard flush period", gr.Number, { "visible": False }),
"training_tensorboard_save_images": LegacyOption(False, "Save generated images within tensorboard", gr.Checkbox, { "visible": False }),
"training_write_csv_every": LegacyOption(0, "Save loss CSV file every n steps", gr.Number, { "visible": False }),
"ui_scripts_reorder": LegacyOption("", "UI scripts order", gr.Textbox, { "visible": False }),
"unload_models_when_training": LegacyOption(False, "Move VAE and CLIP to RAM when training", gr.Checkbox, { "visible": False }),
"upscaler_for_img2img": LegacyOption("None", "Default upscaler for image resize operations", gr.Dropdown, lambda: {"choices": [], "visible": False}),
"use_save_to_dirs_for_ui": LegacyOption(False, "Save images to a subdirectory when using Save button", gr.Checkbox, {"visible": False}),
"use_upscaler_name_as_suffix": LegacyOption(True, "Use upscaler as suffix", gr.Checkbox, {"visible": False}),
})
def get_legacy_options():
return legacy_options

View File

@ -22,7 +22,7 @@
"eslint": "eslint . javascript/ extensions-builtin/sdnext-modernui/javascript/", "eslint": "eslint . javascript/ extensions-builtin/sdnext-modernui/javascript/",
"ruff": ". venv/bin/activate && ruff check", "ruff": ". venv/bin/activate && ruff check",
"pylint": ". venv/bin/activate && pylint *.py modules/ pipelines/ scripts/ extensions-builtin/ | grep -v '^*'", "pylint": ". venv/bin/activate && pylint *.py modules/ pipelines/ scripts/ extensions-builtin/ | grep -v '^*'",
"format": ". venv/bin/activate && pre-commit run -a", "format": ". venv/bin/activate && pre-commit run --all-files",
"lint": "npm run eslint && npm run format && npm run ruff && npm run pylint | grep -v TODO", "lint": "npm run eslint && npm run format && npm run ruff && npm run pylint | grep -v TODO",
"todo": "npm run pylint | grep W0511 | awk -F'TODO ' '{print \"- \"$NF}' | sed 's/ (fixme)//g' | sort", "todo": "npm run pylint | grep W0511 | awk -F'TODO ' '{print \"- \"$NF}' | sed 's/ (fixme)//g' | sort",
"test": ". venv/bin/activate; python launch.py --debug --test" "test": ". venv/bin/activate; python launch.py --debug --test"