mirror of https://github.com/vladmandic/automatic
237 lines
9.6 KiB
Python
237 lines
9.6 KiB
Python
import os
|
|
import copy
|
|
from abc import abstractmethod
|
|
import PIL
|
|
from PIL import Image
|
|
import modules.shared
|
|
from modules import modelloader
|
|
|
|
|
|
LANCZOS = (Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS)
|
|
NEAREST = (Image.Resampling.NEAREST if hasattr(Image, 'Resampling') else Image.NEAREST)
|
|
models = None
|
|
|
|
class Upscaler:
|
|
name = None
|
|
folder = None
|
|
model_path = None
|
|
model_name = None
|
|
model_url = None
|
|
enable = True
|
|
filter = None
|
|
model = None
|
|
user_path = None
|
|
scalers = []
|
|
tile = True
|
|
|
|
def __init__(self, create_dirs=True):
|
|
global models # pylint: disable=global-statement
|
|
if models is None:
|
|
models = modules.shared.readfile('html/upscalers.json')
|
|
self.mod_pad_h = None
|
|
self.tile_size = modules.shared.opts.upscaler_tile_size
|
|
self.tile_pad = modules.shared.opts.upscaler_tile_overlap
|
|
self.device = modules.shared.device
|
|
self.img = None
|
|
self.output = None
|
|
self.scale = 1
|
|
self.half = not modules.shared.cmd_opts.no_half
|
|
self.pre_pad = 0
|
|
self.mod_scale = None
|
|
self.model_download_path = None
|
|
if self.user_path is not None and len(self.user_path) > 0 and not os.path.exists(self.user_path):
|
|
modules.shared.log.info(f'Upscaler create: folder="{self.user_path}"')
|
|
if self.model_path is None and self.name:
|
|
self.model_path = os.path.join(modules.shared.models_path, self.name)
|
|
if self.model_path and create_dirs:
|
|
os.makedirs(self.model_path, exist_ok=True)
|
|
try:
|
|
import cv2 # pylint: disable=unused-import
|
|
self.can_tile = True
|
|
except Exception:
|
|
pass
|
|
|
|
def find_scalers(self):
|
|
scalers = []
|
|
loaded = []
|
|
for k, v in models.items(): # from config
|
|
if k != self.name:
|
|
continue
|
|
for model in v:
|
|
local_name = os.path.join(self.user_path, modelloader.friendly_fullname(model[1]))
|
|
model_path = local_name if os.path.exists(local_name) else model[1]
|
|
scaler = UpscalerData(name=f'{k} {model[0]}', path=model_path, upscaler=self)
|
|
scalers.append(scaler)
|
|
loaded.append(model_path)
|
|
# modules.shared.log.debug(f'Upscaler type={self.name} folder="{self.user_path}" model="{model[0]}" path="{model_path}"')
|
|
if not os.path.exists(self.user_path):
|
|
return scalers
|
|
for fn in os.listdir(self.user_path): # from folder
|
|
if not fn.endswith('.pth') and not fn.endswith('.pt'):
|
|
continue
|
|
file_name = os.path.join(self.user_path, fn)
|
|
if file_name not in loaded:
|
|
model_name = os.path.splitext(fn)[0]
|
|
scaler = UpscalerData(name=f'{self.name} {model_name}', path=file_name, upscaler=self)
|
|
scaler.custom = True
|
|
scalers.append(scaler)
|
|
loaded.append(file_name)
|
|
# modules.shared.log.debug(f'Upscaler type={self.name} folder="{self.user_path}" model="{model_name}" path="{file_name}"')
|
|
return scalers
|
|
|
|
@abstractmethod
|
|
def do_upscale(self, img: PIL.Image, selected_model: str):
|
|
return img
|
|
|
|
def upscale(self, img: PIL.Image, scale, selected_model: str = None):
|
|
orig_state = copy.deepcopy(modules.shared.state)
|
|
modules.shared.state.begin('upscale')
|
|
self.scale = scale
|
|
dest_w = int(img.width * scale)
|
|
dest_h = int(img.height * scale)
|
|
for _ in range(3):
|
|
shape = (img.width, img.height)
|
|
img = self.do_upscale(img, selected_model)
|
|
if shape == (img.width, img.height):
|
|
break
|
|
if img.width >= dest_w and img.height >= dest_h:
|
|
break
|
|
if img.width != dest_w or img.height != dest_h:
|
|
img = img.resize((int(dest_w), int(dest_h)), resample=LANCZOS)
|
|
modules.shared.state.end()
|
|
modules.shared.state = orig_state
|
|
return img
|
|
|
|
@abstractmethod
|
|
def load_model(self, path: str):
|
|
pass
|
|
|
|
def find_models(self, ext_filter=None) -> list: # pylint: disable=unused-argument
|
|
return modelloader.load_models(model_path=self.model_path, model_url=self.model_url, command_path=self.user_path)
|
|
|
|
def update_status(self, prompt):
|
|
modules.shared.log.info(f'Upscaler: type={self.name} model="{prompt}"')
|
|
|
|
def find_model(self, path):
|
|
info = None
|
|
for scaler in self.scalers:
|
|
if scaler.data_path == path:
|
|
info = scaler
|
|
break
|
|
if info is None:
|
|
modules.shared.log.error(f'Upscaler cannot match model: type={self.name} model="{path}"')
|
|
return None
|
|
if info.local_data_path.startswith("http"):
|
|
from modules.modelloader import load_file_from_url
|
|
info.local_data_path = load_file_from_url(url=info.data_path, model_dir=self.model_download_path, progress=True)
|
|
if not os.path.isfile(info.local_data_path):
|
|
modules.shared.log.error(f'Upscaler cannot find model: type={self.name} model="{info.local_data_path}"')
|
|
return None
|
|
return info
|
|
|
|
|
|
class UpscalerData:
|
|
custom: bool = False
|
|
name = None
|
|
data_path = None
|
|
scale: int = 4
|
|
scaler: Upscaler = None
|
|
model: None
|
|
|
|
def __init__(self, name: str, path: str, upscaler: Upscaler = None, scale: int = 4, model=None):
|
|
self.name = name
|
|
self.data_path = path
|
|
self.local_data_path = path
|
|
self.scaler = upscaler
|
|
self.scale = scale
|
|
self.model = model
|
|
|
|
|
|
class UpscalerNone(Upscaler):
|
|
name = "None"
|
|
scalers = []
|
|
|
|
def load_model(self, path):
|
|
pass
|
|
|
|
def do_upscale(self, img, selected_model=None):
|
|
return img
|
|
|
|
def __init__(self, dirname=None): # pylint: disable=unused-argument
|
|
super().__init__(False)
|
|
self.scalers = [UpscalerData("None", None, self)]
|
|
|
|
|
|
class UpscalerLanczos(Upscaler):
|
|
scalers = []
|
|
|
|
def do_upscale(self, img, selected_model=None):
|
|
return img.resize((int(img.width * self.scale), int(img.height * self.scale)), resample=LANCZOS)
|
|
|
|
def load_model(self, _):
|
|
pass
|
|
|
|
def __init__(self, dirname=None): # pylint: disable=unused-argument
|
|
super().__init__(False)
|
|
self.name = "Lanczos"
|
|
self.scalers = [UpscalerData("Lanczos", None, self)]
|
|
|
|
|
|
class UpscalerNearest(Upscaler):
|
|
scalers = []
|
|
|
|
def do_upscale(self, img, selected_model=None):
|
|
return img.resize((int(img.width * self.scale), int(img.height * self.scale)), resample=NEAREST)
|
|
|
|
def load_model(self, _):
|
|
pass
|
|
|
|
def __init__(self, dirname=None): # pylint: disable=unused-argument
|
|
super().__init__(False)
|
|
self.name = "Nearest"
|
|
self.scalers = [UpscalerData("Nearest", None, self)]
|
|
|
|
def compile_upscaler(model, name=""):
|
|
try:
|
|
if modules.shared.opts.ipex_optimize_upscaler:
|
|
import intel_extension_for_pytorch as ipex # pylint: disable=import-error, unused-import
|
|
from modules.devices import dtype as devices_dtype
|
|
model.training = False
|
|
model = ipex.optimize(model, dtype=devices_dtype, inplace=True, weights_prepack=False) # pylint: disable=attribute-defined-outside-init
|
|
modules.shared.log.info("Applied Upscaler IPEX Optimize.")
|
|
except Exception as err:
|
|
modules.shared.log.warning(f"Upscaler IPEX Optimize not supported: {err}")
|
|
try:
|
|
if modules.shared.opts.cuda_compile_upscaler and modules.shared.opts.cuda_compile_backend != 'none':
|
|
modules.shared.log.info(f"Upscaler Compiling: {name} mode={modules.shared.opts.cuda_compile_backend}")
|
|
import logging
|
|
import torch._dynamo # pylint: disable=unused-import,redefined-outer-name
|
|
use_old_compiled_model_state = False
|
|
|
|
if modules.shared.opts.cuda_compile_backend == "openvino_fx":
|
|
from modules.intel.openvino import openvino_fx, openvino_clear_caches # pylint: disable=unused-import
|
|
from modules.sd_models import CompiledModelState
|
|
|
|
openvino_clear_caches()
|
|
torch._dynamo.eval_frame.check_if_dynamo_supported = lambda: True # pylint: disable=protected-access
|
|
|
|
if modules.shared.compiled_model_state is not None:
|
|
use_old_compiled_model_state = True
|
|
old_compiled_model_state = modules.shared.compiled_model_state
|
|
modules.shared.compiled_model_state = CompiledModelState()
|
|
|
|
log_level = logging.WARNING if modules.shared.opts.cuda_compile_verbose else logging.CRITICAL # pylint: disable=protected-access
|
|
if hasattr(torch, '_logging'):
|
|
torch._logging.set_logs(dynamo=log_level, aot=log_level, inductor=log_level) # pylint: disable=protected-access
|
|
|
|
torch._dynamo.config.verbose = modules.shared.opts.cuda_compile_verbose # pylint: disable=protected-access
|
|
torch._dynamo.config.suppress_errors = modules.shared.opts.cuda_compile_errors # pylint: disable=protected-access
|
|
model = torch.compile(model, mode=modules.shared.opts.cuda_compile_mode, backend=modules.shared.opts.cuda_compile_backend, fullgraph=modules.shared.opts.cuda_compile_fullgraph) # pylint: disable=attribute-defined-outside-init
|
|
|
|
if use_old_compiled_model_state:
|
|
modules.shared.compiled_model_state = old_compiled_model_state
|
|
modules.shared.log.info("Upscaler: Complilation done.")
|
|
except Exception as err:
|
|
modules.shared.log.warning(f"Model compile not supported: {err}")
|
|
return model
|