diff --git a/scripts/api.py b/scripts/api.py deleted file mode 100644 index 70ad16c..0000000 --- a/scripts/api.py +++ /dev/null @@ -1,39 +0,0 @@ -from PIL import Image -import numpy as np -from fastapi import FastAPI, Body -from fastapi.exceptions import HTTPException -from modules.api.models import * -from modules.api import api -from scripts.roop_globals import VERSION_FLAG -import gradio as gr - -def encode_to_base64(image): - if type(image) is str: - return image - elif type(image) is Image.Image: - return api.encode_pil_to_base64(image) - elif type(image) is np.ndarray: - return encode_np_to_base64(image) - else: - return "" - -def encode_np_to_base64(image): - pil = Image.fromarray(image) - return api.encode_pil_to_base64(pil) - - -def roop_api(_: gr.Blocks, app: FastAPI): - @app.get("/roop/version") - async def version(): - return {"version": VERSION_FLAG} - - @app.get("/roop/swap_face") - async def swap_face(): - return {"version": VERSION_FLAG} - -try: - import modules.script_callbacks as script_callbacks - script_callbacks.on_app_started(roop_api) -except: - - pass diff --git a/scripts/faceswap.py b/scripts/faceswap.py index d69dc23..9548c99 100644 --- a/scripts/faceswap.py +++ b/scripts/faceswap.py @@ -1,5 +1,6 @@ import importlib -from scripts.roop_utils.models_utils import get_models, get_face_checkpoints +from scripts.roop_api import roop_api +from scripts.roop_utils.models_utils import get_current_model, get_face_checkpoints from scripts import (roop_globals, roop_logging, faceswap_settings, faceswap_tab, faceswap_unit_ui) from scripts.roop_swapping import swapper @@ -9,6 +10,7 @@ from scripts.roop_postprocessing import upscaling from pprint import pprint import numpy as np import logging +from copy import deepcopy #Reload all the modules when using "apply and restart" #This is mainly done for development purposes @@ -20,6 +22,7 @@ importlib.reload(upscaling) importlib.reload(faceswap_settings) importlib.reload(models_utils) importlib.reload(faceswap_unit_ui) +importlib.reload(roop_api) import base64 import io @@ -41,7 +44,6 @@ from modules.shared import opts from PIL import Image from scripts.roop_utils.imgutils import (pil_to_cv2,convert_to_sd) - from scripts.roop_logging import logger, save_img_debug from scripts.roop_globals import VERSION_FLAG from scripts.roop_postprocessing.postprocessing_options import PostProcessingOptions @@ -55,6 +57,12 @@ EXTENSION_PATH=os.path.join("extensions","sd-webui-roop") # Register the tab, done here to prevent it from being added twice script_callbacks.on_ui_tabs(faceswap_tab.on_ui_tabs) +try: + import modules.script_callbacks as script_callbacks + script_callbacks.on_app_started(roop_api.roop_api) +except: + pass + class FaceSwapScript(scripts.Script): @@ -79,22 +87,18 @@ class FaceSwapScript(scripts.Script): """Return True if any unit is enabled and the state is not interupted""" return any([u.enable for u in self.units]) and not shared.state.interrupted - @property - def model(self) -> str : - model = opts.data.get("roop_model", None) - if model is None : - models = get_models() - model = models[0] if len(models) else None - logger.info("Try to use model : %s", model) - if not os.path.isfile(model): - logger.error("The model %s cannot be found or loaded", model) - raise FileNotFoundError("No faceswap model found. Please add it to the roop directory.") - return model - @property def keep_original_images(self) : return opts.data.get("roop_keep_original", False) + @property + def swap_in_generated_units(self) : + return [u for u in self.units if u.swap_in_generated and u.enable] + + @property + def swap_in_source_units(self) : + return [u for u in self.units if u.swap_in_source and u.enable] + def title(self): return f"roop" @@ -155,138 +159,53 @@ class FaceSwapScript(scripts.Script): #If is instance of img2img, we check if face swapping in source is required. if isinstance(p, StableDiffusionProcessingImg2Img): if self.enabled: - init_images = p.init_images - for i, unit in enumerate(self.units): - if unit.enable and unit.swap_in_source : - blend_config = unit.blend_faces # store blend config - unit.blend_faces = True # force blending - (init_images, result_infos) = self.process_images_unit(unit, init_images, upscaled_swapper=self.upscaled_swapper_in_source) - logger.info(f"unit {i+1}> processed init images: {len(init_images)}, {len(result_infos)}") - unit.blend_faces = blend_config #restore blend config + init_images : List[Tuple[Image.Image, str]] = [(img,None) for img in p.init_images] + new_inits = swapper.process_images_units(get_current_model(), self.swap_in_source_units,images=init_images, upscaled_swapper=self.upscaled_swapper_in_source,force_blend=True) + logger.info(f"processed init images: {len(init_images)}") + p.init_images = [img[0] for img in new_inits] - p.init_images = init_images - - def process_image_unit(self, unit : FaceSwapUnitSettings, image: Image.Image, info = None, upscaled_swapper = False) -> List: - """Process one image and return a List of (image, info) (one if blended, many if not). - - Args: - unit : the current unit - image : the image where to apply swapping - info : The info - - Returns: - List of tuple of (image, info) where image is the image where swapping has been applied and info is the image info with similarity infos. - """ - - results = [] - if unit.enable : - if convert_to_sd(image) : - return [(image, info)] - if not unit.blend_faces : - src_faces = unit.faces - logger.info(f"will generate {len(src_faces)} images") - else : - logger.info("blend all faces together") - src_faces = [unit.blended_faces] - assert(not np.array_equal(unit.reference_face.embedding,src_faces[0].embedding) if len(unit.faces)>1 else True), "Reference face cannot be the same as blended" - - - for i,src_face in enumerate(src_faces): - logger.info(f"Process face {i}") - if unit.reference_face is not None : - reference_face = unit.reference_face - else : - logger.info("Use source face as reference face") - reference_face = src_face - - save_img_debug(image, "Before swap") - result: swapper.ImageResult = swapper.swap_face( - reference_face, - src_face, - image, - faces_index=unit.faces_index, - model=self.model, - same_gender=unit.same_gender, - upscaled_swapper=upscaled_swapper, - compute_similarity=unit.compute_similarity - ) - save_img_debug(result.image, "After swap") - - if result.image is None : - logger.error("Result image is None") - if (not unit.check_similarity) or result.similarity and all([result.similarity.values()!=0]+[x >= unit.min_sim for x in result.similarity.values()]) and all([result.ref_similarity.values()!=0]+[x >= unit.min_ref_sim for x in result.ref_similarity.values()]): - results.append((result.image, f"{info}, similarity = {result.similarity}, ref_similarity = {result.ref_similarity}")) - else: - logger.warning( - f"skip, similarity to low, sim = {result.similarity} (target {unit.min_sim}) ref sim = {result.ref_similarity} (target = {unit.min_ref_sim})" - ) - return results - - def process_images_unit(self, unit : FaceSwapUnitSettings, images : List[Image.Image], infos = None, upscaled_swapper = False) -> Tuple[List[Image.Image], List[str]] : - if unit.enable : - result_images : List[Image.Image] = [] - result_infos : List[str]= [] - if not infos : - # this allows the use of zip afterwards if no infos are present - # we make sure infos size is the same as images size - infos = [None] * len(images) - for i, (img, info) in enumerate(zip(images, infos)): - swapped_images = self.process_image_unit(unit, img, info, upscaled_swapper) - for result_image, result_info in swapped_images : - result_images.append(result_image) - result_infos.append(result_info) - logger.info(f"{len(result_images)} images processed") - return (result_images, result_infos) - return (images, infos) def postprocess(self, p : StableDiffusionProcessing, processed: Processed, *args): if self.enabled : # Get the original images without the grid - orig_images = processed.images[processed.index_of_first_image:] - orig_infotexts = processed.infotexts[processed.index_of_first_image:] + orig_images : List[Image.Image] = processed.images[processed.index_of_first_image:] + orig_infotexts : List[str] = processed.infotexts[processed.index_of_first_image:] + + keep_original = self.keep_original_images # These are were images and infos of swapped images will be stored images = [] infotexts = [] + if (len(self.swap_in_generated_units))>0 : + for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)): + batch_index = i%p.batch_size + swapped_images = swapper.process_images_units(get_current_model(), self.swap_in_generated_units, images=[(img,info)], upscaled_swapper=self.upscaled_swapper_in_generated) + logger.info(f"{len(swapped_images)} images swapped") + + for swp_img, new_info in swapped_images : + img = swp_img # Will only swap the last image in the batch in next units (FIXME : hard to fix properly but not really critical) - for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)): - if any([u.enable for u in self.units]): - for unit_i, unit in enumerate(self.units): - #convert image position to batch index - #this should work (not completely confident) - batch_index = i%p.batch_size - if unit.enable : - if unit.swap_in_generated : - img_to_swap = img - - swapped_images = self.process_image_unit(image=img_to_swap, unit=unit, info=info, upscaled_swapper=self.upscaled_swapper_in_generated) - logger.info(f"{len(swapped_images)} images swapped") - for swp_img, new_info in swapped_images : - logger.info(f"unit {unit_i+1}> processed") - if swp_img is not None : + if swp_img is not None : - save_img_debug(swp_img,"Before apply mask") - swp_img = imgutils.apply_mask(swp_img, p, batch_index) - save_img_debug(swp_img,"After apply mask") + save_img_debug(swp_img,"Before apply mask") + swp_img = imgutils.apply_mask(swp_img, p, batch_index) + save_img_debug(swp_img,"After apply mask") - try : - if self.postprocess_options is not None: - swp_img = enhance_image(swp_img, self.postprocess_options) - except Exception as e: - logger.error("Failed to upscale : %s", e) + try : + if self.postprocess_options is not None: + swp_img = enhance_image(swp_img, self.postprocess_options) + except Exception as e: + logger.error("Failed to upscale : %s", e) - logger.info("Add swp image to processed") - images.append(swp_img) - infotexts.append(new_info) - if p.outpath_samples and opts.samples_save : - save_image(swp_img, p.outpath_samples, "", p.all_seeds[batch_index], p.all_prompts[batch_index], opts.samples_format,info=new_info, p=p, suffix="-swapped") - else : - logger.error("swp image is None") - elif unit.swap_in_source and not self.keep_original_images : - # if images were swapped in source, but we don't keep original - # no images will be showned unless we add it as a swap image : - images.append(img) - infotexts.append(new_info) + logger.info("Add swp image to processed") + images.append(swp_img) + infotexts.append(new_info) + if p.outpath_samples and opts.samples_save : + save_image(swp_img, p.outpath_samples, "", p.all_seeds[batch_index], p.all_prompts[batch_index], opts.samples_format,info=new_info, p=p, suffix="-swapped") + else : + logger.error("swp image is None") + else : + keep_original=True # Generate grid : @@ -299,7 +218,7 @@ class FaceSwapScript(scripts.Script): grid.info["parameters"] = text images.insert(0, grid) - if self.keep_original_images: + if keep_original: # If we want to keep original images, we add all existing (including grid this time) images += processed.images infotexts += processed.infotexts diff --git a/scripts/faceswap_unit_settings.py b/scripts/faceswap_unit_settings.py index a6ba35f..ac51a99 100644 --- a/scripts/faceswap_unit_settings.py +++ b/scripts/faceswap_unit_settings.py @@ -18,7 +18,7 @@ class FaceSwapUnitSettings: # The checkpoint file source_face : str # The batch source images - _batch_files: gr.components.File + _batch_files: Union[gr.components.File,List[Image.Image]] # Will blend faces if True blend_faces: bool # Enable this unit @@ -118,7 +118,11 @@ class FaceSwapUnitSettings: if self.batch_files is not None and not hasattr(self,"_faces") : self._faces = [self.reference_face] if self.reference_face is not None else [] for file in self.batch_files : - img = Image.open(file.name) + if isinstance(file, Image.Image) : + img = file + else : + img = Image.open(file.name) + face = swapper.get_or_default(swapper.get_faces(pil_to_cv2(img)), 0, None) if face is not None : self._faces.append(face) diff --git a/scripts/roop_api/roop_api.py b/scripts/roop_api/roop_api.py new file mode 100644 index 0000000..80d9ef5 --- /dev/null +++ b/scripts/roop_api/roop_api.py @@ -0,0 +1,68 @@ +from PIL import Image +import numpy as np +from fastapi import FastAPI, Body +from fastapi.exceptions import HTTPException +from modules.api.models import * +from modules.api import api +from scripts.roop_api.roop_api_types import FaceSwapUnit, FaceSwapRequest +from scripts.roop_globals import VERSION_FLAG +import gradio as gr +from typing import Dict, List, Set, Tuple, Union, Optional +import json +from scripts.roop_swapping import swapper +from scripts.faceswap_unit_settings import FaceSwapUnitSettings +from scripts.roop_utils.imgutils import (pil_to_cv2,convert_to_sd, base64_to_pil) +from scripts.roop_utils.models_utils import get_current_model + +def encode_to_base64(image): + if type(image) is str: + return image + elif type(image) is Image.Image: + return api.encode_pil_to_base64(image) + elif type(image) is np.ndarray: + return encode_np_to_base64(image) + else: + return "" + +def encode_np_to_base64(image): + pil = Image.fromarray(image) + return api.encode_pil_to_base64(pil) + + +def roop_api(_: gr.Blocks, app: FastAPI): + @app.get("/roop/version") + async def version(): + return {"version": VERSION_FLAG} + + # use post as we consider the method non idempotent (which is debatable) + @app.post("/roop/swap_face") + async def swap_face(request : FaceSwapRequest) -> List[FaceSwapUnit]: + units : List[FaceSwapUnitSettings]= [] + src_image = base64_to_pil(request.image) + for u in request.units: + units.append( + FaceSwapUnitSettings(source_img=base64_to_pil(u.source_img), + source_face = u.source_face, + _batch_files = u.get_batch_images(), + blend_faces= u.blend_faces, + enable = True, + same_gender = u.same_gender, + check_similarity=u.check_similarity, + _compute_similarity=u.compute_similarity, + min_ref_sim= u.min_ref_sim, + min_sim= u.min_sim, + _faces_index = ",".join([str(i) for i in (u.faces_index)]), + swap_in_generated=True, + swap_in_source=False + ) + ) + + + # result_images = [] + # for unit_i, unit in enumerate(units): + # swapped_images = swapper.process_image_unit(get_current_model(), image=, unit=unit, info=info, upscaled_swapper=self.upscaled_swapper_in_generated) + + + + return request.units + diff --git a/scripts/roop_api/roop_api_types.py b/scripts/roop_api/roop_api_types.py new file mode 100644 index 0000000..a3cb7db --- /dev/null +++ b/scripts/roop_api/roop_api_types.py @@ -0,0 +1,72 @@ +from scripts.roop_swapping import swapper +import numpy as np +import base64 +import io +from dataclasses import dataclass, fields +from typing import Dict, List, Set, Tuple, Union, Optional +import dill as pickle +import gradio as gr +from insightface.app.common import Face +from PIL import Image +from scripts.roop_utils.imgutils import (pil_to_cv2,convert_to_sd, base64_to_pil) +from scripts.roop_logging import logger +from pydantic import BaseModel, Field +from scripts.roop_postprocessing.postprocessing_options import InpaintingWhen + + +class FaceSwapUnit(BaseModel) : + + # The image given in reference + source_img: str = Field(description='base64 reference image', examples=["data:image/jpeg;base64,/9j/4AAQSkZJRgABAQECWAJYAAD...."], default=None) + # The checkpoint file + source_face : str = Field(description='face checkpoint (from models/roop/faces)',examples=["my_face.pkl"], default=None) + # base64 batch source images + batch_images: Tuple[str] = Field(description='list of base64 batch source images',examples=["data:image/jpeg;base64,/9j/4AAQSkZJRgABAQECWAJYAAD....", "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQECWAJYAAD...."], default=None) + + # Will blend faces if True + blend_faces: bool = Field(description='Will blend faces if True', default=True) + + # Use same gender filtering + same_gender: bool = Field(description='Use same gender filtering', default=True) + + # If True, discard images with low similarity + check_similarity : bool = Field(description='If True, discard images with low similarity', default=False) + # if True will compute similarity and add it to the image info + compute_similarity : bool = Field(description='If True will compute similarity and add it to the image info', default=False) + + # Minimum similarity against the used face (reference, batch or checkpoint) + min_sim: float = Field(description='Minimum similarity against the used face (reference, batch or checkpoint)', default=0.0) + # Minimum similarity against the reference (reference or checkpoint if checkpoint is given) + min_ref_sim: float = Field(description='Minimum similarity against the reference (reference or checkpoint if checkpoint is given)', default=0.0) + + # The face index to use for swapping + faces_index: Tuple[int] = Field(description='The face index to use for swapping, list of face numbers starting from 0', default=(0,)) + + def get_batch_images(self) -> List[Image.Image] : + images = [] + if self.batch_images : + for img in self.batch_images : + images.append(base64_to_pil(img)) + return images + +class PostProcessingOptions (BaseModel): + face_restorer_name: str = Field(description='face restorer name', default=None) + restorer_visibility: float = Field(description='face restorer visibility', default=1, le=1, ge=0) + codeformer_weight: float = Field(description='face restorer codeformer weight', default=1, le=1, ge=0) + + upscaler_name: str = Field(description='upscaler name', default=None) + scale: float = Field(description='upscaling scale', default=1, le=10, ge=0) + upscale_visibility: float = Field(description='upscaler visibility', default=1, le=1, ge=0) + + inpainting_denoising_strengh : float = Field(description='Inpainting denoising strenght', default=0.02, lt=1, ge=0) + inpainting_prompt : str = Field(description='Inpainting denoising strenght',examples=["Portrait of a [gender]"], default="Portrait of a [gender]") + inpainting_negative_prompt : str = Field(description='Inpainting denoising strenght',examples=["Deformed, blurry, bad anatomy, disfigured, poorly drawn face, mutation"], default="") + inpainting_steps : int = Field(description='Inpainting steps',examples=["Portrait of a [gender]"], ge=1, le=150) + inpainting_sampler : str = Field(description='Inpainting sampler',examples=["Euler"], default="Euler") + inpainting_when : InpaintingWhen = Field(description='When inpainting happens', examples=[e.value for e in InpaintingWhen.__members__.values()], default=InpaintingWhen.BEFORE_UPSCALING) + + +class FaceSwapRequest(BaseModel) : + image : str = Field(description='base64 reference image', examples=["data:image/jpeg;base64,/9j/4AAQSkZJRgABAQECWAJYAAD...."], default=None) + units : List[FaceSwapUnit] + postprocessing : PostProcessingOptions \ No newline at end of file diff --git a/scripts/roop_api_types/units_types.py b/scripts/roop_api_types/units_types.py deleted file mode 100644 index fdfa92b..0000000 --- a/scripts/roop_api_types/units_types.py +++ /dev/null @@ -1,46 +0,0 @@ -from scripts.roop_swapping import swapper -import numpy as np -import base64 -import io -from dataclasses import dataclass, fields -from typing import Dict, List, Set, Tuple, Union, Optional -import dill as pickle -import gradio as gr -from insightface.app.common import Face -from PIL import Image -from scripts.roop_utils.imgutils import (pil_to_cv2,convert_to_sd) -from scripts.roop_logging import logger - - -@dataclass -class FaceSwapUnitDTO : - # The image given in reference - source_img: str - # The checkpoint file - source_face : str - # base64 batch source images - batch_files: List[str] - - # Will blend faces if True - blend_faces: bool - # Enable this unit - enable: bool - # Use same gender filtering - same_gender: bool - - # If True, discard images with low similarity - check_similarity : bool - # if True will compute similarity and add it to the image info - compute_similarity :bool - - # Minimum similarity against the used face (reference, batch or checkpoint) - min_sim: float - # Minimum similarity against the reference (reference or checkpoint if checkpoint is given) - min_ref_sim: float - # The face index to use for swapping - faces_index: int - - # Swap in the source image in img2img (before processing) - swap_in_source: bool - # Swap in the generated image in img2img (always on for txt2img) - swap_in_generated: bool \ No newline at end of file diff --git a/scripts/roop_globals.py b/scripts/roop_globals.py index 91cbaee..f009f59 100644 --- a/scripts/roop_globals.py +++ b/scripts/roop_globals.py @@ -3,7 +3,7 @@ import os MODELS_DIR = os.path.abspath(os.path.join("models","roop")) ANALYZER_DIR = os.path.abspath(os.path.join(MODELS_DIR, "analysers")) -VERSION_FLAG = "v0.3.0" +VERSION_FLAG = "v1.0.0" EXTENSION_PATH=os.path.join("extensions","sd-webui-roop") SD_CONVERT_SCORE = 0.7 diff --git a/scripts/roop_swapping/swapper.py b/scripts/roop_swapping/swapper.py index 2fbbce0..ffb9082 100644 --- a/scripts/roop_swapping/swapper.py +++ b/scripts/roop_swapping/swapper.py @@ -13,11 +13,13 @@ from PIL import Image from sklearn.metrics.pairwise import cosine_similarity from scripts.roop_swapping import upscaled_inswapper -from scripts.roop_utils.imgutils import cv2_to_pil, pil_to_cv2 -from scripts.roop_logging import logger +from scripts.roop_utils.imgutils import cv2_to_pil, pil_to_cv2, convert_to_sd +from scripts.roop_logging import logger, save_img_debug from scripts import roop_globals from modules.shared import opts from functools import lru_cache +from scripts.faceswap_unit_settings import FaceSwapUnitSettings + providers = ["CPUExecutionProvider"] @@ -335,4 +337,82 @@ def swap_face( except Exception as e : logger.error("Conversion failed %s", e) raise e - return return_result \ No newline at end of file + return return_result + + +def process_image_unit(model, unit : FaceSwapUnitSettings, image: Image.Image, info = None, upscaled_swapper = False, force_blend = False) -> List: + """Process one image and return a List of (image, info) (one if blended, many if not). + + Args: + unit : the current unit + image : the image where to apply swapping + info : The info + + Returns: + List of tuple of (image, info) where image is the image where swapping has been applied and info is the image info with similarity infos. + """ + + results = [] + if unit.enable : + if convert_to_sd(image) : + return [(image, info)] + if not unit.blend_faces and not force_blend : + src_faces = unit.faces + logger.info(f"will generate {len(src_faces)} images") + else : + logger.info("blend all faces together") + src_faces = [unit.blended_faces] + assert(not np.array_equal(unit.reference_face.embedding,src_faces[0].embedding) if len(unit.faces)>1 else True), "Reference face cannot be the same as blended" + + + for i,src_face in enumerate(src_faces): + logger.info(f"Process face {i}") + if unit.reference_face is not None : + reference_face = unit.reference_face + else : + logger.info("Use source face as reference face") + reference_face = src_face + + save_img_debug(image, "Before swap") + result: ImageResult = swap_face( + reference_face, + src_face, + image, + faces_index=unit.faces_index, + model=model, + same_gender=unit.same_gender, + upscaled_swapper=upscaled_swapper, + compute_similarity=unit.compute_similarity + ) + save_img_debug(result.image, "After swap") + + if result.image is None : + logger.error("Result image is None") + if (not unit.check_similarity) or result.similarity and all([result.similarity.values()!=0]+[x >= unit.min_sim for x in result.similarity.values()]) and all([result.ref_similarity.values()!=0]+[x >= unit.min_ref_sim for x in result.ref_similarity.values()]): + results.append((result.image, f"{info}, similarity = {result.similarity}, ref_similarity = {result.ref_similarity}")) + else: + logger.warning( + f"skip, similarity to low, sim = {result.similarity} (target {unit.min_sim}) ref sim = {result.ref_similarity} (target = {unit.min_ref_sim})" + ) + logger.debug("process_image_unit : Unit produced %s results", len(results)) + return results + +def process_images_units(model, units : List[FaceSwapUnitSettings], images: List[Tuple[Image.Image, str]], upscaled_swapper = False, force_blend = False) -> List: + if len(units) == 0 : + logger.info("Finished processing image, return %s images", len(images)) + return None + + logger.debug("%s more units", len(units)) + + processed_images = [] + for i,(image, info) in enumerate(images) : + logger.debug("Processing image %s", i) + swapped = process_image_unit(model,units[0],image, info, upscaled_swapper, force_blend) + logger.debug("Image %s -> %s images", i, len(swapped)) + nexts = process_images_units(model,units[1:],swapped, upscaled_swapper,force_blend) + if nexts : + processed_images.extend(nexts) + else : + processed_images.extend(swapped) + + return processed_images \ No newline at end of file diff --git a/scripts/roop_utils/imgutils.py b/scripts/roop_utils/imgutils.py index 43673b2..665bf85 100644 --- a/scripts/roop_utils/imgutils.py +++ b/scripts/roop_utils/imgutils.py @@ -1,3 +1,5 @@ +import io +from typing import Optional from PIL import Image, ImageChops, ImageOps,ImageFilter import cv2 import numpy as np @@ -6,7 +8,7 @@ import torch from ifnude import detect from scripts.roop_globals import SD_CONVERT_SCORE from modules import processing - +import base64 def convert_to_sd(img): shapes = [] @@ -168,4 +170,15 @@ def prepare_mask( #FIXME : Properly fix blur # if getattr(p, "mask_blur", 0) > 0: # mask = mask.filter(ImageFilter.GaussianBlur(p.mask_blur)) - return mask \ No newline at end of file + return mask + +def base64_to_pil(base64str : Optional[str]) : + if base64str is None : + return None + if 'base64,' in base64str: # check if the base64 string has a data URL scheme + base64_data = base64str.split('base64,')[-1] + img_bytes = base64.b64decode(base64_data) + else: + # if no data URL scheme, just decode + img_bytes = base64.b64decode(base64str) + return Image.open(io.BytesIO(img_bytes)) \ No newline at end of file diff --git a/scripts/roop_utils/models_utils.py b/scripts/roop_utils/models_utils.py index 0e917c2..d866934 100644 --- a/scripts/roop_utils/models_utils.py +++ b/scripts/roop_utils/models_utils.py @@ -4,6 +4,8 @@ import os import modules.scripts as scripts from modules import scripts from scripts.roop_globals import EXTENSION_PATH +from modules.shared import opts +from scripts.roop_logging import logger def get_models(): """ @@ -27,6 +29,17 @@ def get_models(): return models +def get_current_model() -> str : + model = opts.data.get("roop_model", None) + if model is None : + models = get_models() + model = models[0] if len(models) else None + logger.info("Try to use model : %s", model) + if not os.path.isfile(model): + logger.error("The model %s cannot be found or loaded", model) + raise FileNotFoundError("No faceswap model found. Please add it to the roop directory.") + return model + def get_face_checkpoints(): """ Retrieve a list of face checkpoint paths.