Merge pull request #1 from TotoB12/main

Sourcey refactor ala TotoB12
pull/232/head
aleph23 2023-07-28 11:23:04 -07:00 committed by GitHub
commit 7f1770ec74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 210 deletions

View File

@ -32,7 +32,7 @@ model_type_dict = {
# width is in number, not string # width is in number, not string
# return: url str # return: url str
def get_full_size_image_url(image_url, width): def get_full_size_image_url(image_url, width):
return re.sub('/width=\d+/', '/width=' + str(width) + '/', image_url) return re.sub('/width=\d+/', f'/width={str(width)}/', image_url)
# use this sha256 to get model info from civitai # use this sha256 to get model info from civitai
@ -51,7 +51,7 @@ def get_model_info_by_hash(hash:str):
util.printD("Civitai does not have this model") util.printD("Civitai does not have this model")
return {} return {}
else: else:
util.printD("Get error code: " + str(r.status_code)) util.printD(f"Get error code: {r.status_code}")
util.printD(r.text) util.printD(r.text)
return return
@ -65,11 +65,11 @@ def get_model_info_by_hash(hash:str):
util.printD("response:") util.printD("response:")
util.printD(r.text) util.printD(r.text)
return return
if not content: if not content:
util.printD("error, content from civitai is None") util.printD("error, content from civitai is None")
return return
return content return content
@ -81,14 +81,18 @@ def get_model_info_by_id(id:str) -> dict:
util.printD("id is empty") util.printD("id is empty")
return return
r = requests.get(url_dict["modelId"]+str(id), headers=util.def_headers, proxies=util.proxies) r = requests.get(
url_dict["modelId"] + id,
headers=util.def_headers,
proxies=util.proxies,
)
if not r.ok: if not r.ok:
if r.status_code == 404: if r.status_code == 404:
# this is not a civitai model # this is not a civitai model
util.printD("Civitai does not have this model") util.printD("Civitai does not have this model")
return {} return {}
else: else:
util.printD("Get error code: " + str(r.status_code)) util.printD(f"Get error code: {r.status_code}")
util.printD(r.text) util.printD(r.text)
return return
@ -102,11 +106,11 @@ def get_model_info_by_id(id:str) -> dict:
util.printD("response:") util.printD("response:")
util.printD(r.text) util.printD(r.text)
return return
if not content: if not content:
util.printD("error, content from civitai is None") util.printD("error, content from civitai is None")
return return
return content return content
@ -117,14 +121,18 @@ def get_version_info_by_version_id(id:str) -> dict:
util.printD("id is empty") util.printD("id is empty")
return return
r = requests.get(url_dict["modelVersionId"]+str(id), headers=util.def_headers, proxies=util.proxies) r = requests.get(
url_dict["modelVersionId"] + id,
headers=util.def_headers,
proxies=util.proxies,
)
if not r.ok: if not r.ok:
if r.status_code == 404: if r.status_code == 404:
# this is not a civitai model # this is not a civitai model
util.printD("Civitai does not have this model version") util.printD("Civitai does not have this model version")
return {} return {}
else: else:
util.printD("Get error code: " + str(r.status_code)) util.printD(f"Get error code: {r.status_code}")
util.printD(r.text) util.printD(r.text)
return return
@ -138,11 +146,11 @@ def get_version_info_by_version_id(id:str) -> dict:
util.printD("response:") util.printD("response:")
util.printD(r.text) util.printD(r.text)
return return
if not content: if not content:
util.printD("error, content from civitai is None") util.printD("error, content from civitai is None")
return return
return content return content
@ -198,9 +206,9 @@ def get_version_info_by_model_id(id:str) -> dict:
def load_model_info_by_search_term(model_type, search_term): def load_model_info_by_search_term(model_type, search_term):
util.printD(f"Load model info of {search_term} in {model_type}") util.printD(f"Load model info of {search_term} in {model_type}")
if model_type not in model.folders.keys(): if model_type not in model.folders.keys():
util.printD("unknow model type: " + model_type) util.printD(f"unknow model type: {model_type}")
return return
# search_term = subfolderpath + model name + ext. And it always start with a / even there is no sub folder # search_term = subfolderpath + model name + ext. And it always start with a / even there is no sub folder
base, ext = os.path.splitext(search_term) base, ext = os.path.splitext(search_term)
model_info_base = base model_info_base = base
@ -212,9 +220,9 @@ def load_model_info_by_search_term(model_type, search_term):
model_info_filepath = os.path.join(model_folder, model_info_filename) model_info_filepath = os.path.join(model_folder, model_info_filename)
if not os.path.isfile(model_info_filepath): if not os.path.isfile(model_info_filepath):
util.printD("Can not find model info file: " + model_info_filepath) util.printD(f"Can not find model info file: {model_info_filepath}")
return return
return model.load_model_info(model_info_filepath) return model.load_model_info(model_info_filepath)
@ -235,9 +243,9 @@ def get_model_names_by_type_and_filter(model_type:str, filter:dict) -> list:
empty_info_only = False empty_info_only = False
if filter: if filter:
if "no_info_only" in filter.keys(): if "no_info_only" in filter:
no_info_only = filter["no_info_only"] no_info_only = filter["no_info_only"]
if "empty_info_only" in filter.keys(): if "empty_info_only" in filter:
empty_info_only = filter["empty_info_only"] empty_info_only = filter["empty_info_only"]
@ -264,10 +272,7 @@ def get_model_names_by_type_and_filter(model_type:str, filter:dict) -> list:
# check model info file # check model info file
info_file = base + suffix + model.info_ext info_file = base + suffix + model.info_ext
if os.path.isfile(info_file): if os.path.isfile(info_file):
# load model info if model_info := model.load_model_info(info_file):
model_info = model.load_model_info(info_file)
# check content
if model_info:
if "id" in model_info.keys(): if "id" in model_info.keys():
# find a non-empty model info file # find a non-empty model info file
continue continue
@ -291,15 +296,12 @@ def get_model_id_from_url(url:str) -> str:
return "" return ""
if url.isnumeric(): if url.isnumeric():
# is already an id return url
id = str(url)
return id
s = re.sub("\\?.+$", "", url).split("/") s = re.sub("\\?.+$", "", url).split("/")
if len(s) < 2: if len(s) < 2:
util.printD("url is not valid") util.printD("url is not valid")
return "" return ""
if s[-2].isnumeric(): if s[-2].isnumeric():
id = s[-2] id = s[-2]
elif s[-1].isnumeric(): elif s[-1].isnumeric():
@ -307,7 +309,7 @@ def get_model_id_from_url(url:str) -> str:
else: else:
util.printD("There is no model id in this url") util.printD("There is no model id in this url")
return "" return ""
return id return id
@ -319,18 +321,18 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
return return
if not os.path.isfile(model_path): if not os.path.isfile(model_path):
util.printD("model_path is not a file: "+model_path) util.printD(f"model_path is not a file: {model_path}")
return return
base, ext = os.path.splitext(model_path) base, ext = os.path.splitext(model_path)
first_preview = base+".png" first_preview = f"{base}.png"
sec_preview = base+".preview.png" sec_preview = f"{base}.preview.png"
info_file = base + suffix + model.info_ext info_file = base + suffix + model.info_ext
# check preview image # check preview image
if not os.path.isfile(sec_preview): if not os.path.isfile(sec_preview):
# need to download preview image # need to download preview image
util.printD("Checking preview image for model: " + model_path) util.printD(f"Checking preview image for model: {model_path}")
# load model_info file # load model_info file
if os.path.isfile(info_file): if os.path.isfile(info_file):
model_info = model.load_model_info(info_file) model_info = model.load_model_info(info_file)
@ -347,7 +349,7 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
if skip_nsfw_preview: if skip_nsfw_preview:
util.printD("Skip NSFW image") util.printD("Skip NSFW image")
continue continue
if "url" in img_dict.keys(): if "url" in img_dict.keys():
img_url = img_dict["url"] img_url = img_dict["url"]
if max_size_preview: if max_size_preview:
@ -366,8 +368,8 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
# return - model_info # return - model_info
def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict: def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
util.printD("Searching local model by version id") util.printD("Searching local model by version id")
util.printD("folder: " + folder) util.printD(f"folder: {folder}")
util.printD("version_id: " + str(version_id)) util.printD(f"version_id: {version_id}")
if not folder: if not folder:
util.printD("folder is none") util.printD("folder is none")
@ -376,11 +378,11 @@ def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
if not os.path.isdir(folder): if not os.path.isdir(folder):
util.printD("folder is not a dir") util.printD("folder is not a dir")
return return
if not version_id: if not version_id:
util.printD("version_id is none") util.printD("version_id is none")
return return
# search civitai model info file # search civitai model info file
for filename in os.listdir(folder): for filename in os.listdir(folder):
# check ext # check ext
@ -409,7 +411,7 @@ def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
if str(id) == str(version_id): if str(id) == str(version_id):
# find the one # find the one
return model_info return model_info
return return
@ -425,16 +427,16 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
return return
if not os.path.isfile(model_path): if not os.path.isfile(model_path):
util.printD("model_path is not a file: "+model_path) util.printD(f"model_path is not a file: {model_path}")
return return
# get model info file name # get model info file name
base, ext = os.path.splitext(model_path) base, ext = os.path.splitext(model_path)
info_file = base + suffix + model.info_ext info_file = base + suffix + model.info_ext
if not os.path.isfile(info_file): if not os.path.isfile(info_file):
return return
# get model info # get model info
model_info_file = model.load_model_info(info_file) model_info_file = model.load_model_info(info_file)
if not model_info_file: if not model_info_file:
@ -442,18 +444,18 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
if "id" not in model_info_file.keys(): if "id" not in model_info_file.keys():
return return
local_version_id = model_info_file["id"] local_version_id = model_info_file["id"]
if not local_version_id: if not local_version_id:
return return
if "modelId" not in model_info_file.keys(): if "modelId" not in model_info_file.keys():
return return
model_id = model_info_file["modelId"] model_id = model_info_file["modelId"]
if not model_id: if not model_id:
return return
# get model info by id from civitai # get model info by id from civitai
model_info = get_model_info_by_id(model_id) model_info = get_model_info_by_id(model_id)
# delay before next request, to prevent to be treat as DDoS # delay before next request, to prevent to be treat as DDoS
@ -462,24 +464,24 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
if not model_info: if not model_info:
return return
if "modelVersions" not in model_info.keys(): if "modelVersions" not in model_info.keys():
return return
modelVersions = model_info["modelVersions"] modelVersions = model_info["modelVersions"]
if not modelVersions: if not modelVersions:
return return
if not len(modelVersions): if not len(modelVersions):
return return
current_version = modelVersions[0] current_version = modelVersions[0]
if not current_version: if not current_version:
return return
if "id" not in current_version.keys(): if "id" not in current_version.keys():
return return
current_version_id = current_version["id"] current_version_id = current_version["id"]
if not current_version_id: if not current_version_id:
return return
@ -488,10 +490,7 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
if current_version_id == local_version_id: if current_version_id == local_version_id:
return return
model_name = "" model_name = model_info["name"] if "name" in model_info.keys() else ""
if "name" in model_info.keys():
model_name = model_info["name"]
if not model_name: if not model_name:
model_name = "" model_name = ""
@ -499,21 +498,21 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
new_version_name = "" new_version_name = ""
if "name" in current_version.keys(): if "name" in current_version.keys():
new_version_name = current_version["name"] new_version_name = current_version["name"]
if not new_version_name: if not new_version_name:
new_version_name = "" new_version_name = ""
description = "" description = ""
if "description" in current_version.keys(): if "description" in current_version.keys():
description = current_version["description"] description = current_version["description"]
if not description: if not description:
description = "" description = ""
downloadUrl = "" downloadUrl = ""
if "downloadUrl" in current_version.keys(): if "downloadUrl" in current_version.keys():
downloadUrl = current_version["downloadUrl"] downloadUrl = current_version["downloadUrl"]
if not downloadUrl: if not downloadUrl:
downloadUrl = "" downloadUrl = ""
@ -528,7 +527,7 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
img_url = "" img_url = ""
return (model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url) return (model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url)
@ -564,7 +563,7 @@ def check_models_new_version_by_model_types(model_types:list, delay:float=1) ->
if model_type not in mts: if model_type not in mts:
continue continue
util.printD("Scanning path: " + model_folder) util.printD(f"Scanning path: {model_folder}")
for root, dirs, files in os.walk(model_folder, followlinks=True): for root, dirs, files in os.walk(model_folder, followlinks=True):
for filename in files: for filename in files:
# check ext # check ext
@ -582,21 +581,17 @@ def check_models_new_version_by_model_types(model_types:list, delay:float=1) ->
if not current_version_id: if not current_version_id:
continue continue
# check this version id in list is_already_in_list = any(
is_already_in_list = False current_version_id == new_version[3]
for new_version in new_versions: for new_version in new_versions
if current_version_id == new_version[3]: )
# already in list
is_already_in_list = True
break
if is_already_in_list: if is_already_in_list:
util.printD("New version is already in list") util.printD("New version is already in list")
continue continue
# search this new version id to check if this model is already downloaded if target_model_info := search_local_model_info_by_version_id(
target_model_info = search_local_model_info_by_version_id(root, current_version_id) root, current_version_id
if target_model_info: ):
util.printD("New version is already existed") util.printD("New version is already existed")
continue continue

View File

@ -12,7 +12,7 @@ requests.packages.urllib3.disable_warnings()
# output is downloaded file path # output is downloaded file path
def dl(url, folder, filename, filepath): def dl(url, folder, filename, filepath):
util.printD("Start downloading from: " + url) util.printD(f"Start downloading from: {url}")
# get file_path # get file_path
file_path = "" file_path = ""
if filepath: if filepath:
@ -22,11 +22,11 @@ def dl(url, folder, filename, filepath):
if not folder: if not folder:
util.printD("folder is none") util.printD("folder is none")
return return
if not os.path.isdir(folder): if not os.path.isdir(folder):
util.printD("folder does not exist: "+folder) util.printD(f"folder does not exist: {folder}")
return return
if filename: if filename:
file_path = os.path.join(folder, filename) file_path = os.path.join(folder, filename)
@ -40,25 +40,25 @@ def dl(url, folder, filename, filepath):
# if file_path is empty, need to get file name from download url's header # if file_path is empty, need to get file name from download url's header
if not file_path: if not file_path:
filename = "" filename = ""
if "Content-Disposition" in rh.headers.keys(): if "Content-Disposition" in rh.headers:
cd = rh.headers["Content-Disposition"] cd = rh.headers["Content-Disposition"]
# Extract the filename from the header # Extract the filename from the header
# content of a CD: "attachment;filename=FileName.txt" # content of a CD: "attachment;filename=FileName.txt"
# in case "" is in CD filename's start and end, need to strip them out # in case "" is in CD filename's start and end, need to strip them out
filename = cd.split("=")[1].strip('"') filename = cd.split("=")[1].strip('"')
if not filename: if not filename:
util.printD("Fail to get file name from Content-Disposition: " + cd) util.printD(f"Fail to get file name from Content-Disposition: {cd}")
return return
if not filename: if not filename:
util.printD("Can not get file name from download url's header") util.printD("Can not get file name from download url's header")
return return
# with folder and filename, now we have the full file path # with folder and filename, now we have the full file path
file_path = os.path.join(folder, filename) file_path = os.path.join(folder, filename)
util.printD("Target file path: " + file_path) util.printD(f"Target file path: {file_path}")
base, ext = os.path.splitext(file_path) base, ext = os.path.splitext(file_path)
# check if file is already exist # check if file is already exist
@ -67,7 +67,7 @@ def dl(url, folder, filename, filepath):
while os.path.isfile(file_path): while os.path.isfile(file_path):
util.printD("Target file already exist.") util.printD("Target file already exist.")
# re-name # re-name
new_base = base + "_" + str(count) new_base = f"{base}_{str(count)}"
file_path = new_base + ext file_path = new_base + ext
count += 1 count += 1

View File

@ -23,7 +23,7 @@ def open_model_url(msg, open_url_with_js):
if not result: if not result:
util.printD("Parsing js ms failed") util.printD("Parsing js ms failed")
return return
model_type = result["model_type"] model_type = result["model_type"]
search_term = result["search_term"] search_term = result["search_term"]
@ -50,7 +50,7 @@ def open_model_url(msg, open_url_with_js):
} }
if not open_url_with_js: if not open_url_with_js:
util.printD("Open Url: " + url) util.printD(f"Open Url: {url}")
# open url # open url
webbrowser.open_new_tab(url) webbrowser.open_new_tab(url)
else: else:
@ -73,7 +73,7 @@ def add_trigger_words(msg):
if not result: if not result:
util.printD("Parsing js ms failed") util.printD("Parsing js ms failed")
return return
model_type = result["model_type"] model_type = result["model_type"]
search_term = result["search_term"] search_term = result["search_term"]
prompt = result["prompt"] prompt = result["prompt"]
@ -83,29 +83,29 @@ def add_trigger_words(msg):
if not model_info: if not model_info:
util.printD(f"Failed to get model info for {model_type} {search_term}") util.printD(f"Failed to get model info for {model_type} {search_term}")
return [prompt, prompt] return [prompt, prompt]
if "trainedWords" not in model_info.keys(): if "trainedWords" not in model_info.keys():
util.printD(f"Failed to get trainedWords from info file for {model_type} {search_term}") util.printD(f"Failed to get trainedWords from info file for {model_type} {search_term}")
return [prompt, prompt] return [prompt, prompt]
trainedWords = model_info["trainedWords"] trainedWords = model_info["trainedWords"]
if not trainedWords: if not trainedWords:
util.printD(f"No trainedWords from info file for {model_type} {search_term}") util.printD(f"No trainedWords from info file for {model_type} {search_term}")
return [prompt, prompt] return [prompt, prompt]
if len(trainedWords) == 0: if len(trainedWords) == 0:
util.printD(f"trainedWords from info file for {model_type} {search_term} is empty") util.printD(f"trainedWords from info file for {model_type} {search_term} is empty")
return [prompt, prompt] return [prompt, prompt]
# get ful trigger words # get ful trigger words
trigger_words = "" trigger_words = ""
for word in trainedWords: for word in trainedWords:
trigger_words = trigger_words + word + ", " trigger_words = trigger_words + word + ", "
new_prompt = prompt + " " + trigger_words new_prompt = f"{prompt} {trigger_words}"
util.printD("trigger_words: " + trigger_words) util.printD(f"trigger_words: {trigger_words}")
util.printD("prompt: " + prompt) util.printD(f"prompt: {prompt}")
util.printD("new_prompt: " + new_prompt) util.printD(f"new_prompt: {new_prompt}")
util.printD("End add_trigger_words") util.printD("End add_trigger_words")
@ -188,14 +188,14 @@ def dl_model_new_version(msg, max_size_preview, skip_nsfw_preview):
output = "Parsing js ms failed" output = "Parsing js ms failed"
util.printD(output) util.printD(output)
return output return output
model_path = result["model_path"] model_path = result["model_path"]
version_id = result["version_id"] version_id = result["version_id"]
download_url = result["download_url"] download_url = result["download_url"]
util.printD("model_path: " + model_path) util.printD(f"model_path: {model_path}")
util.printD("version_id: " + str(version_id)) util.printD(f"version_id: {str(version_id)}")
util.printD("download_url: " + download_url) util.printD(f"download_url: {download_url}")
# check data # check data
if not model_path: if not model_path:
@ -207,14 +207,14 @@ def dl_model_new_version(msg, max_size_preview, skip_nsfw_preview):
output = "version_id is empty" output = "version_id is empty"
util.printD(output) util.printD(output)
return output return output
if not download_url: if not download_url:
output = "download_url is empty" output = "download_url is empty"
util.printD(output) util.printD(output)
return output return output
if not os.path.isfile(model_path): if not os.path.isfile(model_path):
output = "model_path is not a file: "+ model_path output = f"model_path is not a file: {model_path}"
util.printD(output) util.printD(output)
return output return output
@ -232,14 +232,14 @@ def dl_model_new_version(msg, max_size_preview, skip_nsfw_preview):
# download file # download file
new_model_path = downloader.dl(download_url, model_folder, None, None) new_model_path = downloader.dl(download_url, model_folder, None, None)
if not new_model_path: if not new_model_path:
output = "Download failed, check console log for detail. Download url: " + download_url output = f"Download failed, check console log for detail. Download url: {download_url}"
util.printD(output) util.printD(output)
return output return output
# get version info # get version info
version_info = civitai.get_version_info_by_version_id(version_id) version_info = civitai.get_version_info_by_version_id(version_id)
if not version_info: if not version_info:
output = "Model downloaded, but failed to get version info, check console log for detail. Model saved to: " + new_model_path output = f"Model downloaded, but failed to get version info, check console log for detail. Model saved to: {new_model_path}"
util.printD(output) util.printD(output)
return output return output
@ -250,7 +250,7 @@ def dl_model_new_version(msg, max_size_preview, skip_nsfw_preview):
# then, get preview image # then, get preview image
civitai.get_preview_image_by_model_path(new_model_path, max_size_preview, skip_nsfw_preview) civitai.get_preview_image_by_model_path(new_model_path, max_size_preview, skip_nsfw_preview)
output = "Done. Model downloaded to: " + new_model_path output = f"Done. Model downloaded to: {new_model_path}"
util.printD(output) util.printD(output)
return output return output

View File

@ -48,7 +48,7 @@ def get_custom_model_folder():
# write model info to file # write model info to file
def write_model_info(path, model_info): def write_model_info(path, model_info):
util.printD("Write model info to file: " + path) util.printD(f"Write model info to file: {path}")
with open(os.path.realpath(path), 'w') as f: with open(os.path.realpath(path), 'w') as f:
f.write(json.dumps(model_info, indent=4)) f.write(json.dumps(model_info, indent=4))
@ -60,10 +60,10 @@ def load_model_info(path):
try: try:
model_info = json.load(f) model_info = json.load(f)
except Exception as e: except Exception as e:
util.printD("Selected file is not json: " + path) util.printD(f"Selected file is not json: {path}")
util.printD(e) util.printD(e)
return return
return model_info return model_info
@ -94,13 +94,13 @@ def get_model_names_by_type(model_type:str) -> list:
def get_model_path_by_type_and_name(model_type:str, model_name:str) -> str: def get_model_path_by_type_and_name(model_type:str, model_name:str) -> str:
util.printD("Run get_model_path_by_type_and_name") util.printD("Run get_model_path_by_type_and_name")
if model_type not in folders.keys(): if model_type not in folders.keys():
util.printD("unknown model_type: " + model_type) util.printD(f"unknown model_type: {model_type}")
return return
if not model_name: if not model_name:
util.printD("model name can not be empty") util.printD("model name can not be empty")
return return
folder = folders[model_type] folder = folders[model_type]
# model could be in subfolder, need to walk. # model could be in subfolder, need to walk.

View File

@ -19,14 +19,14 @@ def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview):
output = "Model Types is None, can not scan." output = "Model Types is None, can not scan."
util.printD(output) util.printD(output)
return output return output
model_types = [] model_types = []
# check type if it is a string # check type if it is a string
if type(scan_model_types) == str: if type(scan_model_types) == str:
model_types.append(scan_model_types) model_types.append(scan_model_types)
else: else:
model_types = scan_model_types model_types = scan_model_types
model_count = 0 model_count = 0
image_count = 0 image_count = 0
# scan_log = "" # scan_log = ""
@ -34,7 +34,7 @@ def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview):
if model_type not in model_types: if model_type not in model_types:
continue continue
util.printD("Scanning path: " + model_folder) util.printD(f"Scanning path: {model_folder}")
for root, dirs, files in os.walk(model_folder, followlinks=True): for root, dirs, files in os.walk(model_folder, followlinks=True):
for filename in files: for filename in files:
# check ext # check ext
@ -45,7 +45,7 @@ def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview):
if len(base) > 4: if len(base) > 4:
if base[-4:] == model.vae_suffix: if base[-4:] == model.vae_suffix:
# find .vae # find .vae
util.printD("This is a vae file: " + filename) util.printD(f"This is a vae file: {filename}")
continue continue
# find a model # find a model
@ -53,15 +53,15 @@ def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview):
info_file = base + civitai.suffix + model.info_ext info_file = base + civitai.suffix + model.info_ext
# check info file # check info file
if not os.path.isfile(info_file): if not os.path.isfile(info_file):
util.printD("Creating model info for: " + filename) util.printD(f"Creating model info for: {filename}")
# get model's sha256 # get model's sha256
hash = util.gen_file_sha256(item) hash = util.gen_file_sha256(item)
if not hash: if not hash:
output = "failed generating SHA256 for model:" + filename output = f"failed generating SHA256 for model:{filename}"
util.printD(output) util.printD(output)
return output return output
# use this sha256 to get model info from civitai # use this sha256 to get model info from civitai
model_info = civitai.get_model_info_by_hash(hash) model_info = civitai.get_model_info_by_hash(hash)
# delay 1 second for ti # delay 1 second for ti
@ -72,7 +72,7 @@ def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview):
if model_info is None: if model_info is None:
output = "Connect to Civitai API service failed. Wait a while and try again" output = "Connect to Civitai API service failed. Wait a while and try again"
util.printD(output) util.printD(output)
return output+", check console log for detail" return f"{output}, check console log for detail"
# write model info to file # write model info to file
model.write_model_info(info_file, model_info) model.write_model_info(info_file, model_info)
@ -102,7 +102,7 @@ def get_model_info_by_input(model_type, model_name, model_url_or_id, max_size_pr
# parse model id # parse model id
model_id = civitai.get_model_id_from_url(model_url_or_id) model_id = civitai.get_model_id_from_url(model_url_or_id)
if not model_id: if not model_id:
output = "failed to parse model id from url: " + model_url_or_id output = f"failed to parse model id from url: {model_url_or_id}"
util.printD(output) util.printD(output)
return output return output
@ -113,13 +113,13 @@ def get_model_info_by_input(model_type, model_name, model_url_or_id, max_size_pr
output = "failed to get model file path" output = "failed to get model file path"
util.printD(output) util.printD(output)
return output return output
model_root, model_path = result model_root, model_path = result
if not model_path: if not model_path:
output = "model path is empty" output = "model path is empty"
util.printD(output) util.printD(output)
return output return output
# get info file path # get info file path
base, ext = os.path.splitext(model_path) base, ext = os.path.splitext(model_path)
info_file = base + civitai.suffix + model.info_ext info_file = base + civitai.suffix + model.info_ext
@ -129,19 +129,19 @@ def get_model_info_by_input(model_type, model_name, model_url_or_id, max_size_pr
model_info = civitai.get_version_info_by_model_id(model_id) model_info = civitai.get_version_info_by_model_id(model_id)
if not model_info: if not model_info:
output = "failed to get model info from url: " + model_url_or_id output = f"failed to get model info from url: {model_url_or_id}"
util.printD(output) util.printD(output)
return output return output
# write model info to file # write model info to file
model.write_model_info(info_file, model_info) model.write_model_info(info_file, model_info)
util.printD("Saved model info to: "+ info_file) util.printD(f"Saved model info to: {info_file}")
# check preview image # check preview image
civitai.get_preview_image_by_model_path(model_path, max_size_preview, skip_nsfw_preview) civitai.get_preview_image_by_model_path(model_path, max_size_preview, skip_nsfw_preview)
output = "Model Info saved to: " + info_file output = f"Model Info saved to: {info_file}"
return output return output
@ -166,28 +166,28 @@ def check_models_new_version_to_md(model_types:list) -> str:
url = civitai.url_dict["modelPage"]+str(model_id) url = civitai.url_dict["modelPage"]+str(model_id)
part = f'<div style="font-size:20px;margin:6px 0px;"><b>Model: <a href="{url}" target="_blank"><u>{model_name}</u></a></b></div>' part = f'<div style="font-size:20px;margin:6px 0px;"><b>Model: <a href="{url}" target="_blank"><u>{model_name}</u></a></b></div>'
part = part + f'<div style="font-size:16px">File: {model_path}</div>' part = f'{part}<div style="font-size:16px">File: {model_path}</div>'
if download_url: if download_url:
# replace "\" to "/" in model_path for windows # replace "\" to "/" in model_path for windows
model_path = model_path.replace('\\', '\\\\') model_path = model_path.replace('\\', '\\\\')
part = part + f'<div style="font-size:16px;margin:6px 0px;">New Version: <u><a href="{download_url}" target="_blank" style="margin:0px 10px;">{new_version_name}</a></u>' part = f'{part}<div style="font-size:16px;margin:6px 0px;">New Version: <u><a href="{download_url}" target="_blank" style="margin:0px 10px;">{new_version_name}</a></u>'
# add js function to download new version into SD webui by python # add js function to download new version into SD webui by python
part = part + " " part = f"{part} "
# in embed HTML, onclick= will also follow a ", never a ', so have to write it as following # in embed HTML, onclick= will also follow a ", never a ', so have to write it as following
part = part + f"<u><a href='#' style='margin:0px 10px;' onclick=\"ch_dl_model_new_version(event, '{model_path}', '{new_verion_id}', '{download_url}')\">[Download into SD]</a></u>" part = f"""{part}<u><a href='#' style='margin:0px 10px;' onclick=\"ch_dl_model_new_version(event, '{model_path}', '{new_verion_id}', '{download_url}')\">[Download into SD]</a></u>"""
else: else:
part = part + f'<div style="font-size:16px;margin:6px 0px;">New Version: {new_version_name}' part = f'{part}<div style="font-size:16px;margin:6px 0px;">New Version: {new_version_name}'
part = part + '</div>' part = f'{part}</div>'
# description # description
if description: if description:
part = part + '<blockquote style="font-size:16px;margin:6px 0px;">'+ description + '</blockquote><br>' part = f'{part}<blockquote style="font-size:16px;margin:6px 0px;">{description}</blockquote><br>'
# preview image # preview image
if img_url: if img_url:
part = part + f"<img src='{img_url}'><br>" part = f"{part}<img src='{img_url}'><br>"
output = output + part output = output + part
@ -198,7 +198,7 @@ def check_models_new_version_to_md(model_types:list) -> str:
# get model info by url # get model info by url
def get_model_info_by_url(model_url_or_id:str) -> tuple: def get_model_info_by_url(model_url_or_id:str) -> tuple:
util.printD("Getting model info by: " + model_url_or_id) util.printD(f"Getting model info by: {model_url_or_id}")
# parse model id # parse model id
model_id = civitai.get_model_id_from_url(model_url_or_id) model_id = civitai.get_model_id_from_url(model_url_or_id)
@ -210,22 +210,22 @@ def get_model_info_by_url(model_url_or_id:str) -> tuple:
if model_info is None: if model_info is None:
util.printD("Connect to Civitai API service failed. Wait a while and try again") util.printD("Connect to Civitai API service failed. Wait a while and try again")
return return
if not model_info: if not model_info:
util.printD("failed to get model info from url or id") util.printD("failed to get model info from url or id")
return return
# parse model type, model name, subfolder, version from this model info # parse model type, model name, subfolder, version from this model info
# get model type # get model type
if "type" not in model_info.keys(): if "type" not in model_info.keys():
util.printD("model type is not in model_info") util.printD("model type is not in model_info")
return return
civitai_model_type = model_info["type"] civitai_model_type = model_info["type"]
if civitai_model_type not in civitai.model_type_dict.keys(): if civitai_model_type not in civitai.model_type_dict.keys():
util.printD("This model type is not supported:"+civitai_model_type) util.printD(f"This model type is not supported:{civitai_model_type}")
return return
model_type = civitai.model_type_dict[civitai_model_type] model_type = civitai.model_type_dict[civitai_model_type]
# get model type # get model type
@ -242,20 +242,15 @@ def get_model_info_by_url(model_url_or_id:str) -> tuple:
if "modelVersions" not in model_info.keys(): if "modelVersions" not in model_info.keys():
util.printD("modelVersions is not in model_info") util.printD("modelVersions is not in model_info")
return return
modelVersions = model_info["modelVersions"] modelVersions = model_info["modelVersions"]
if not modelVersions: if not modelVersions:
util.printD("modelVersions is Empty") util.printD("modelVersions is Empty")
return return
version_strs = []
for version in modelVersions:
# version name can not be used as id
# version id is not readable
# so , we use name_id as version string
version_str = version["name"]+"_"+str(version["id"])
version_strs.append(version_str)
version_strs = [
version["name"] + "_" + str(version["id"]) for version in modelVersions
]
# get folder by model type # get folder by model type
folder = model.folders[model_type] folder = model.folders[model_type]
# get subfolders # get subfolders
@ -279,11 +274,11 @@ def get_ver_info_by_ver_str(version_str:str, model_info:dict) -> dict:
if not version_str: if not version_str:
util.printD("version_str is empty") util.printD("version_str is empty")
return return
if not model_info: if not model_info:
util.printD("model_info is None") util.printD("model_info is None")
return return
# get version list # get version list
if "modelVersions" not in model_info.keys(): if "modelVersions" not in model_info.keys():
util.printD("modelVersions is not in model_info") util.printD("modelVersions is not in model_info")
@ -293,7 +288,7 @@ def get_ver_info_by_ver_str(version_str:str, model_info:dict) -> dict:
if not modelVersions: if not modelVersions:
util.printD("modelVersions is Empty") util.printD("modelVersions is Empty")
return return
# find version by version_str # find version by version_str
version = None version = None
for ver in modelVersions: for ver in modelVersions:
@ -306,14 +301,14 @@ def get_ver_info_by_ver_str(version_str:str, model_info:dict) -> dict:
version = ver version = ver
if not version: if not version:
util.printD("can not find version by version string: " + version_str) util.printD(f"can not find version by version string: {version_str}")
return return
# get version id # get version id
if "id" not in version.keys(): if "id" not in version.keys():
util.printD("this version has no id") util.printD("this version has no id")
return return
return version return version
@ -323,11 +318,11 @@ def get_id_and_dl_url_by_version_str(version_str:str, model_info:dict) -> tuple:
if not version_str: if not version_str:
util.printD("version_str is empty") util.printD("version_str is empty")
return return
if not model_info: if not model_info:
util.printD("model_info is None") util.printD("model_info is None")
return return
# get version list # get version list
if "modelVersions" not in model_info.keys(): if "modelVersions" not in model_info.keys():
util.printD("modelVersions is not in model_info") util.printD("modelVersions is not in model_info")
@ -337,7 +332,7 @@ def get_id_and_dl_url_by_version_str(version_str:str, model_info:dict) -> tuple:
if not modelVersions: if not modelVersions:
util.printD("modelVersions is Empty") util.printD("modelVersions is Empty")
return return
# find version by version_str # find version by version_str
version = None version = None
for ver in modelVersions: for ver in modelVersions:
@ -350,14 +345,14 @@ def get_id_and_dl_url_by_version_str(version_str:str, model_info:dict) -> tuple:
version = ver version = ver
if not version: if not version:
util.printD("can not find version by version string: " + version_str) util.printD(f"can not find version by version string: {version_str}")
return return
# get version id # get version id
if "id" not in version.keys(): if "id" not in version.keys():
util.printD("this version has no id") util.printD("this version has no id")
return return
version_id = version["id"] version_id = version["id"]
if not version_id: if not version_id:
util.printD("version id is Empty") util.printD("version id is Empty")
@ -367,14 +362,14 @@ def get_id_and_dl_url_by_version_str(version_str:str, model_info:dict) -> tuple:
if "downloadUrl" not in version.keys(): if "downloadUrl" not in version.keys():
util.printD("downloadUrl is not in this version") util.printD("downloadUrl is not in this version")
return return
downloadUrl = version["downloadUrl"] downloadUrl = version["downloadUrl"]
if not downloadUrl: if not downloadUrl:
util.printD("downloadUrl is Empty") util.printD("downloadUrl is Empty")
return return
util.printD("Get Download Url: " + downloadUrl) util.printD(f"Get Download Url: {downloadUrl}")
return (version_id, downloadUrl) return (version_id, downloadUrl)
@ -388,36 +383,36 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
output = "model_info is None" output = "model_info is None"
util.printD(output) util.printD(output)
return output return output
if not model_type: if not model_type:
output = "model_type is None" output = "model_type is None"
util.printD(output) util.printD(output)
return output return output
if not subfolder_str: if not subfolder_str:
output = "subfolder string is None" output = "subfolder string is None"
util.printD(output) util.printD(output)
return output return output
if not version_str: if not version_str:
output = "version_str is None" output = "version_str is None"
util.printD(output) util.printD(output)
return output return output
# get model root folder # get model root folder
if model_type not in model.folders.keys(): if model_type not in model.folders.keys():
output = "Unknow model type: "+model_type output = f"Unknow model type: {model_type}"
util.printD(output) util.printD(output)
return output return output
model_root_folder = model.folders[model_type] model_root_folder = model.folders[model_type]
# get subfolder # get subfolder
subfolder = "" subfolder = ""
if subfolder_str == "/" or subfolder_str == "\\": if subfolder_str in {"/", "\\"}:
subfolder = "" subfolder = ""
elif subfolder_str[:1] == "/" or subfolder_str[:1] == "\\": elif subfolder_str[:1] in ["/", "\\"]:
subfolder = subfolder_str[1:] subfolder = subfolder_str[1:]
else: else:
subfolder = subfolder_str subfolder = subfolder_str
@ -425,17 +420,17 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
# get model folder for downloading # get model folder for downloading
model_folder = os.path.join(model_root_folder, subfolder) model_folder = os.path.join(model_root_folder, subfolder)
if not os.path.isdir(model_folder): if not os.path.isdir(model_folder):
output = "Model folder is not a dir: "+ model_folder output = f"Model folder is not a dir: {model_folder}"
util.printD(output) util.printD(output)
return output return output
# get version info # get version info
ver_info = get_ver_info_by_ver_str(version_str, model_info) ver_info = get_ver_info_by_ver_str(version_str, model_info)
if not ver_info: if not ver_info:
output = "Fail to get version info, check console log for detail" output = "Fail to get version info, check console log for detail"
util.printD(output) util.printD(output)
return output return output
version_id = ver_info["id"] version_id = ver_info["id"]
@ -444,22 +439,23 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
# some model versions have multiple files # some model versions have multiple files
download_urls = [] download_urls = []
if "files" in ver_info.keys(): if "files" in ver_info.keys():
for file_info in ver_info["files"]: download_urls.extend(
if "downloadUrl" in file_info.keys(): file_info["downloadUrl"]
download_urls.append(file_info["downloadUrl"]) for file_info in ver_info["files"]
if "downloadUrl" in file_info.keys()
)
if not len(download_urls): if not len(download_urls):
if "downloadUrl" in ver_info.keys(): if "downloadUrl" in ver_info.keys():
download_urls.append(ver_info["downloadUrl"]) download_urls.append(ver_info["downloadUrl"])
# check if this model is already existed if r := civitai.search_local_model_info_by_version_id(
r = civitai.search_local_model_info_by_version_id(model_folder, version_id) model_folder, version_id
if r: ):
output = "This model version is already existed" output = "This model version is already existed"
util.printD(output) util.printD(output)
return output return output
# download # download
filepath = "" filepath = ""
for url in download_urls: for url in download_urls:
@ -468,7 +464,7 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
output = "Downloading failed, check console log for detail" output = "Downloading failed, check console log for detail"
util.printD(output) util.printD(output)
return output return output
if url == ver_info["downloadUrl"]: if url == ver_info["downloadUrl"]:
filepath = model_filepath filepath = model_filepath
else: else:
@ -479,7 +475,7 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
output = "Fail to get download url, check console log for detail" output = "Fail to get download url, check console log for detail"
util.printD(output) util.printD(output)
return output return output
# download # download
filepath = downloader.dl(url, model_folder, None, None) filepath = downloader.dl(url, model_folder, None, None)
if not filepath: if not filepath:
@ -490,11 +486,11 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
if not filepath: if not filepath:
filepath = model_filepath filepath = model_filepath
# get version info # get version info
version_info = civitai.get_version_info_by_version_id(version_id) version_info = civitai.get_version_info_by_version_id(version_id)
if not version_info: if not version_info:
output = "Model downloaded, but failed to get version info, check console log for detail. Model saved to: " + filepath output = f"Model downloaded, but failed to get version info, check console log for detail. Model saved to: {filepath}"
util.printD(output) util.printD(output)
return output return output
@ -505,7 +501,7 @@ def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, versio
# then, get preview image # then, get preview image
civitai.get_preview_image_by_model_path(filepath, max_size_preview, skip_nsfw_preview) civitai.get_preview_image_by_model_path(filepath, max_size_preview, skip_nsfw_preview)
output = "Done. Model downloaded to: " + filepath output = f"Done. Model downloaded to: {filepath}"
util.printD(output) util.printD(output)
return output return output

View File

@ -29,7 +29,7 @@ def parse_js_msg(msg):
return return
if action not in js_actions: if action not in js_actions:
util.printD("Unknow action: " + action) util.printD(f"Unknow action: {action}")
return return
util.printD("End parse js msg") util.printD("End parse js msg")
@ -45,13 +45,13 @@ def build_py_msg(action:str, content:dict):
if not content: if not content:
util.printD("Content is None") util.printD("Content is None")
return return
if not action: if not action:
util.printD("Action is None") util.printD("Action is None")
return return
if action not in py_actions: if action not in py_actions:
util.printD("Unknow action: " + action) util.printD(f"Unknow action: {action}")
return return
msg = { msg = {

View File

@ -29,7 +29,7 @@ data = {
# save setting # save setting
# return output msg for log # return output msg for log
def save(): def save():
print("Saving setting to: " + path) print(f"Saving setting to: {path}")
json_data = json.dumps(data, indent=4) json_data = json.dumps(data, indent=4)
@ -40,12 +40,12 @@ def save():
with open(path, 'w') as f: with open(path, 'w') as f:
f.write(json_data) f.write(json_data)
except Exception as e: except Exception as e:
util.printD("Error when writing file:"+path) util.printD(f"Error when writing file:{path}")
output = str(e) output = str(e)
util.printD(str(e)) util.printD(str(e))
return output return output
output = "Setting saved to: " + path output = f"Setting saved to: {path}"
util.printD(output) util.printD(output)
return output return output
@ -56,7 +56,7 @@ def load():
# load data into globel data # load data into globel data
global data global data
util.printD("Load setting from: " + path) util.printD(f"Load setting from: {path}")
if not os.path.isfile(path): if not os.path.isfile(path):
util.printD("No setting file, use default") util.printD("No setting file, use default")

View File

@ -22,10 +22,10 @@ def printD(msg):
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
"""Yield pieces of data from a file-like object until EOF.""" """Yield pieces of data from a file-like object until EOF."""
while True: while True:
chunk = file.read(size) if chunk := file.read(size):
if not chunk: yield chunk
else:
break break
yield chunk
# Now, hashing use the same way as pip's source code. # Now, hashing use the same way as pip's source code.
def gen_file_sha256(filname): def gen_file_sha256(filname):
@ -39,40 +39,40 @@ def gen_file_sha256(filname):
h.update(block) h.update(block)
hash_value = h.hexdigest() hash_value = h.hexdigest()
printD("sha256: " + hash_value) printD(f"sha256: {hash_value}")
printD("length: " + str(length)) printD(f"length: {str(length)}")
return hash_value return hash_value
# get preview image # get preview image
def download_file(url, path): def download_file(url, path):
printD("Downloading file from: " + url) printD(f"Downloading file from: {url}")
# get file # get file
r = requests.get(url, stream=True, headers=def_headers, proxies=proxies) r = requests.get(url, stream=True, headers=def_headers, proxies=proxies)
if not r.ok: if not r.ok:
printD("Get error code: " + str(r.status_code)) printD(f"Get error code: {r.status_code}")
printD(r.text) printD(r.text)
return return
# write to file # write to file
with open(os.path.realpath(path), 'wb') as f: with open(os.path.realpath(path), 'wb') as f:
r.raw.decode_content = True r.raw.decode_content = True
shutil.copyfileobj(r.raw, f) shutil.copyfileobj(r.raw, f)
printD("File downloaded to: " + path) printD(f"File downloaded to: {path}")
# get subfolder list # get subfolder list
def get_subfolders(folder:str) -> list: def get_subfolders(folder:str) -> list:
printD("Get subfolder for: " + folder) printD(f"Get subfolder for: {folder}")
if not folder: if not folder:
printD("folder can not be None") printD("folder can not be None")
return return
if not os.path.isdir(folder): if not os.path.isdir(folder):
printD("path is not a folder") printD("path is not a folder")
return return
prefix_len = len(folder) prefix_len = len(folder)
subfolders = [] subfolders = []
for root, dirs, files in os.walk(folder, followlinks=True): for root, dirs, files in os.walk(folder, followlinks=True):
@ -98,7 +98,7 @@ def get_relative_path(item_path:str, parent_path:str) -> str:
return item_path return item_path
relative = item_path[len(parent_path):] relative = item_path[len(parent_path):]
if relative[:1] == "/" or relative[:1] == "\\": if relative[:1] in ["/", "\\"]:
relative = relative[1:] relative = relative[1:]
# printD("relative:"+relative) # printD("relative:"+relative)