'Refactored by Sourcery'
parent
2d95b6fcf9
commit
b43ab281f0
|
|
@ -32,7 +32,7 @@ model_type_dict = {
|
||||||
# width is in number, not string
|
# width is in number, not string
|
||||||
# return: url str
|
# return: url str
|
||||||
def get_full_size_image_url(image_url, width):
|
def get_full_size_image_url(image_url, width):
|
||||||
return re.sub('/width=\d+/', '/width=' + str(width) + '/', image_url)
|
return re.sub('/width=\d+/', f'/width={str(width)}/', image_url)
|
||||||
|
|
||||||
|
|
||||||
# use this sha256 to get model info from civitai
|
# use this sha256 to get model info from civitai
|
||||||
|
|
@ -51,7 +51,7 @@ def get_model_info_by_hash(hash:str):
|
||||||
util.printD("Civitai does not have this model")
|
util.printD("Civitai does not have this model")
|
||||||
return {}
|
return {}
|
||||||
else:
|
else:
|
||||||
util.printD("Get error code: " + str(r.status_code))
|
util.printD(f"Get error code: {r.status_code}")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -65,11 +65,11 @@ def get_model_info_by_hash(hash:str):
|
||||||
util.printD("response:")
|
util.printD("response:")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
if not content:
|
if not content:
|
||||||
util.printD("error, content from civitai is None")
|
util.printD("error, content from civitai is None")
|
||||||
return
|
return
|
||||||
|
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -81,14 +81,18 @@ def get_model_info_by_id(id:str) -> dict:
|
||||||
util.printD("id is empty")
|
util.printD("id is empty")
|
||||||
return
|
return
|
||||||
|
|
||||||
r = requests.get(url_dict["modelId"]+str(id), headers=util.def_headers, proxies=util.proxies)
|
r = requests.get(
|
||||||
|
url_dict["modelId"] + id,
|
||||||
|
headers=util.def_headers,
|
||||||
|
proxies=util.proxies,
|
||||||
|
)
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
if r.status_code == 404:
|
if r.status_code == 404:
|
||||||
# this is not a civitai model
|
# this is not a civitai model
|
||||||
util.printD("Civitai does not have this model")
|
util.printD("Civitai does not have this model")
|
||||||
return {}
|
return {}
|
||||||
else:
|
else:
|
||||||
util.printD("Get error code: " + str(r.status_code))
|
util.printD(f"Get error code: {r.status_code}")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -102,11 +106,11 @@ def get_model_info_by_id(id:str) -> dict:
|
||||||
util.printD("response:")
|
util.printD("response:")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
if not content:
|
if not content:
|
||||||
util.printD("error, content from civitai is None")
|
util.printD("error, content from civitai is None")
|
||||||
return
|
return
|
||||||
|
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -117,14 +121,18 @@ def get_version_info_by_version_id(id:str) -> dict:
|
||||||
util.printD("id is empty")
|
util.printD("id is empty")
|
||||||
return
|
return
|
||||||
|
|
||||||
r = requests.get(url_dict["modelVersionId"]+str(id), headers=util.def_headers, proxies=util.proxies)
|
r = requests.get(
|
||||||
|
url_dict["modelVersionId"] + id,
|
||||||
|
headers=util.def_headers,
|
||||||
|
proxies=util.proxies,
|
||||||
|
)
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
if r.status_code == 404:
|
if r.status_code == 404:
|
||||||
# this is not a civitai model
|
# this is not a civitai model
|
||||||
util.printD("Civitai does not have this model version")
|
util.printD("Civitai does not have this model version")
|
||||||
return {}
|
return {}
|
||||||
else:
|
else:
|
||||||
util.printD("Get error code: " + str(r.status_code))
|
util.printD(f"Get error code: {r.status_code}")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -138,11 +146,11 @@ def get_version_info_by_version_id(id:str) -> dict:
|
||||||
util.printD("response:")
|
util.printD("response:")
|
||||||
util.printD(r.text)
|
util.printD(r.text)
|
||||||
return
|
return
|
||||||
|
|
||||||
if not content:
|
if not content:
|
||||||
util.printD("error, content from civitai is None")
|
util.printD("error, content from civitai is None")
|
||||||
return
|
return
|
||||||
|
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -198,9 +206,9 @@ def get_version_info_by_model_id(id:str) -> dict:
|
||||||
def load_model_info_by_search_term(model_type, search_term):
|
def load_model_info_by_search_term(model_type, search_term):
|
||||||
util.printD(f"Load model info of {search_term} in {model_type}")
|
util.printD(f"Load model info of {search_term} in {model_type}")
|
||||||
if model_type not in model.folders.keys():
|
if model_type not in model.folders.keys():
|
||||||
util.printD("unknow model type: " + model_type)
|
util.printD(f"unknow model type: {model_type}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# search_term = subfolderpath + model name + ext. And it always start with a / even there is no sub folder
|
# search_term = subfolderpath + model name + ext. And it always start with a / even there is no sub folder
|
||||||
base, ext = os.path.splitext(search_term)
|
base, ext = os.path.splitext(search_term)
|
||||||
model_info_base = base
|
model_info_base = base
|
||||||
|
|
@ -223,9 +231,9 @@ def load_model_info_by_search_term(model_type, search_term):
|
||||||
break;
|
break;
|
||||||
|
|
||||||
if not found:
|
if not found:
|
||||||
util.printD("Can not find model info file: " + model_info_filepath)
|
util.printD(f"Can not find model info file: {model_info_filepath}")
|
||||||
return
|
return
|
||||||
|
|
||||||
return model.load_model_info(model_info_filepath)
|
return model.load_model_info(model_info_filepath)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -249,9 +257,9 @@ def get_model_names_by_type_and_filter(model_type:str, filter:dict) -> list:
|
||||||
empty_info_only = False
|
empty_info_only = False
|
||||||
|
|
||||||
if filter:
|
if filter:
|
||||||
if "no_info_only" in filter.keys():
|
if "no_info_only" in filter:
|
||||||
no_info_only = filter["no_info_only"]
|
no_info_only = filter["no_info_only"]
|
||||||
if "empty_info_only" in filter.keys():
|
if "empty_info_only" in filter:
|
||||||
empty_info_only = filter["empty_info_only"]
|
empty_info_only = filter["empty_info_only"]
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -279,10 +287,7 @@ def get_model_names_by_type_and_filter(model_type:str, filter:dict) -> list:
|
||||||
# check model info file
|
# check model info file
|
||||||
info_file = base + suffix + model.info_ext
|
info_file = base + suffix + model.info_ext
|
||||||
if os.path.isfile(info_file):
|
if os.path.isfile(info_file):
|
||||||
# load model info
|
if model_info := model.load_model_info(info_file):
|
||||||
model_info = model.load_model_info(info_file)
|
|
||||||
# check content
|
|
||||||
if model_info:
|
|
||||||
if "id" in model_info.keys():
|
if "id" in model_info.keys():
|
||||||
# find a non-empty model info file
|
# find a non-empty model info file
|
||||||
continue
|
continue
|
||||||
|
|
@ -305,15 +310,12 @@ def get_model_id_from_url(url:str) -> str:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
if url.isnumeric():
|
if url.isnumeric():
|
||||||
# is already an id
|
return url
|
||||||
id = str(url)
|
|
||||||
return id
|
|
||||||
|
|
||||||
s = re.sub("\\?.+$", "", url).split("/")
|
s = re.sub("\\?.+$", "", url).split("/")
|
||||||
if len(s) < 2:
|
if len(s) < 2:
|
||||||
util.printD("url is not valid")
|
util.printD("url is not valid")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
if s[-2].isnumeric():
|
if s[-2].isnumeric():
|
||||||
id = s[-2]
|
id = s[-2]
|
||||||
elif s[-1].isnumeric():
|
elif s[-1].isnumeric():
|
||||||
|
|
@ -321,7 +323,7 @@ def get_model_id_from_url(url:str) -> str:
|
||||||
else:
|
else:
|
||||||
util.printD("There is no model id in this url")
|
util.printD("There is no model id in this url")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
return id
|
return id
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -333,18 +335,18 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
|
||||||
return
|
return
|
||||||
|
|
||||||
if not os.path.isfile(model_path):
|
if not os.path.isfile(model_path):
|
||||||
util.printD("model_path is not a file: "+model_path)
|
util.printD(f"model_path is not a file: {model_path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
base, ext = os.path.splitext(model_path)
|
base, ext = os.path.splitext(model_path)
|
||||||
first_preview = base+".png"
|
first_preview = f"{base}.png"
|
||||||
sec_preview = base+".preview.png"
|
sec_preview = f"{base}.preview.png"
|
||||||
info_file = base + suffix + model.info_ext
|
info_file = base + suffix + model.info_ext
|
||||||
|
|
||||||
# check preview image
|
# check preview image
|
||||||
if not os.path.isfile(sec_preview):
|
if not os.path.isfile(sec_preview):
|
||||||
# need to download preview image
|
# need to download preview image
|
||||||
util.printD("Checking preview image for model: " + model_path)
|
util.printD(f"Checking preview image for model: {model_path}")
|
||||||
# load model_info file
|
# load model_info file
|
||||||
if os.path.isfile(info_file):
|
if os.path.isfile(info_file):
|
||||||
model_info = model.load_model_info(info_file)
|
model_info = model.load_model_info(info_file)
|
||||||
|
|
@ -361,7 +363,7 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
|
||||||
if skip_nsfw_preview:
|
if skip_nsfw_preview:
|
||||||
util.printD("Skip NSFW image")
|
util.printD("Skip NSFW image")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if "url" in img_dict.keys():
|
if "url" in img_dict.keys():
|
||||||
img_url = img_dict["url"]
|
img_url = img_dict["url"]
|
||||||
if max_size_preview:
|
if max_size_preview:
|
||||||
|
|
@ -380,8 +382,8 @@ def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_
|
||||||
# return - model_info
|
# return - model_info
|
||||||
def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
|
def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
|
||||||
util.printD("Searching local model by version id")
|
util.printD("Searching local model by version id")
|
||||||
util.printD("folder: " + folder)
|
util.printD(f"folder: {folder}")
|
||||||
util.printD("version_id: " + str(version_id))
|
util.printD(f"version_id: {version_id}")
|
||||||
|
|
||||||
if not folder:
|
if not folder:
|
||||||
util.printD("folder is none")
|
util.printD("folder is none")
|
||||||
|
|
@ -390,11 +392,11 @@ def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
|
||||||
if not os.path.isdir(folder):
|
if not os.path.isdir(folder):
|
||||||
util.printD("folder is not a dir")
|
util.printD("folder is not a dir")
|
||||||
return
|
return
|
||||||
|
|
||||||
if not version_id:
|
if not version_id:
|
||||||
util.printD("version_id is none")
|
util.printD("version_id is none")
|
||||||
return
|
return
|
||||||
|
|
||||||
# search civitai model info file
|
# search civitai model info file
|
||||||
for filename in os.listdir(folder):
|
for filename in os.listdir(folder):
|
||||||
# check ext
|
# check ext
|
||||||
|
|
@ -423,7 +425,7 @@ def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict:
|
||||||
if str(id) == str(version_id):
|
if str(id) == str(version_id):
|
||||||
# find the one
|
# find the one
|
||||||
return model_info
|
return model_info
|
||||||
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -439,16 +441,16 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not os.path.isfile(model_path):
|
if not os.path.isfile(model_path):
|
||||||
util.printD("model_path is not a file: "+model_path)
|
util.printD(f"model_path is not a file: {model_path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# get model info file name
|
# get model info file name
|
||||||
base, ext = os.path.splitext(model_path)
|
base, ext = os.path.splitext(model_path)
|
||||||
info_file = base + suffix + model.info_ext
|
info_file = base + suffix + model.info_ext
|
||||||
|
|
||||||
if not os.path.isfile(info_file):
|
if not os.path.isfile(info_file):
|
||||||
return
|
return
|
||||||
|
|
||||||
# get model info
|
# get model info
|
||||||
model_info_file = model.load_model_info(info_file)
|
model_info_file = model.load_model_info(info_file)
|
||||||
if not model_info_file:
|
if not model_info_file:
|
||||||
|
|
@ -456,18 +458,18 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
|
|
||||||
if "id" not in model_info_file.keys():
|
if "id" not in model_info_file.keys():
|
||||||
return
|
return
|
||||||
|
|
||||||
local_version_id = model_info_file["id"]
|
local_version_id = model_info_file["id"]
|
||||||
if not local_version_id:
|
if not local_version_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
if "modelId" not in model_info_file.keys():
|
if "modelId" not in model_info_file.keys():
|
||||||
return
|
return
|
||||||
|
|
||||||
model_id = model_info_file["modelId"]
|
model_id = model_info_file["modelId"]
|
||||||
if not model_id:
|
if not model_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
# get model info by id from civitai
|
# get model info by id from civitai
|
||||||
model_info = get_model_info_by_id(model_id)
|
model_info = get_model_info_by_id(model_id)
|
||||||
# delay before next request, to prevent to be treat as DDoS
|
# delay before next request, to prevent to be treat as DDoS
|
||||||
|
|
@ -476,24 +478,24 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
|
|
||||||
if not model_info:
|
if not model_info:
|
||||||
return
|
return
|
||||||
|
|
||||||
if "modelVersions" not in model_info.keys():
|
if "modelVersions" not in model_info.keys():
|
||||||
return
|
return
|
||||||
|
|
||||||
modelVersions = model_info["modelVersions"]
|
modelVersions = model_info["modelVersions"]
|
||||||
if not modelVersions:
|
if not modelVersions:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not len(modelVersions):
|
if not len(modelVersions):
|
||||||
return
|
return
|
||||||
|
|
||||||
current_version = modelVersions[0]
|
current_version = modelVersions[0]
|
||||||
if not current_version:
|
if not current_version:
|
||||||
return
|
return
|
||||||
|
|
||||||
if "id" not in current_version.keys():
|
if "id" not in current_version.keys():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_version_id = current_version["id"]
|
current_version_id = current_version["id"]
|
||||||
if not current_version_id:
|
if not current_version_id:
|
||||||
return
|
return
|
||||||
|
|
@ -502,10 +504,7 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
if current_version_id == local_version_id:
|
if current_version_id == local_version_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
model_name = ""
|
model_name = model_info["name"] if "name" in model_info.keys() else ""
|
||||||
if "name" in model_info.keys():
|
|
||||||
model_name = model_info["name"]
|
|
||||||
|
|
||||||
if not model_name:
|
if not model_name:
|
||||||
model_name = ""
|
model_name = ""
|
||||||
|
|
||||||
|
|
@ -513,21 +512,21 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
new_version_name = ""
|
new_version_name = ""
|
||||||
if "name" in current_version.keys():
|
if "name" in current_version.keys():
|
||||||
new_version_name = current_version["name"]
|
new_version_name = current_version["name"]
|
||||||
|
|
||||||
if not new_version_name:
|
if not new_version_name:
|
||||||
new_version_name = ""
|
new_version_name = ""
|
||||||
|
|
||||||
description = ""
|
description = ""
|
||||||
if "description" in current_version.keys():
|
if "description" in current_version.keys():
|
||||||
description = current_version["description"]
|
description = current_version["description"]
|
||||||
|
|
||||||
if not description:
|
if not description:
|
||||||
description = ""
|
description = ""
|
||||||
|
|
||||||
downloadUrl = ""
|
downloadUrl = ""
|
||||||
if "downloadUrl" in current_version.keys():
|
if "downloadUrl" in current_version.keys():
|
||||||
downloadUrl = current_version["downloadUrl"]
|
downloadUrl = current_version["downloadUrl"]
|
||||||
|
|
||||||
if not downloadUrl:
|
if not downloadUrl:
|
||||||
downloadUrl = ""
|
downloadUrl = ""
|
||||||
|
|
||||||
|
|
@ -542,7 +541,7 @@ def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple:
|
||||||
img_url = ""
|
img_url = ""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return (model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url)
|
return (model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -578,7 +577,7 @@ def check_models_new_version_by_model_types(model_types:list, delay:float=1) ->
|
||||||
if model_type not in mts:
|
if model_type not in mts:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
util.printD("Scanning path: " + model_folder)
|
util.printD(f"Scanning path: {model_folder}")
|
||||||
for root, dirs, files in os.walk(model_folder, followlinks=True):
|
for root, dirs, files in os.walk(model_folder, followlinks=True):
|
||||||
for filename in files:
|
for filename in files:
|
||||||
# check ext
|
# check ext
|
||||||
|
|
@ -596,21 +595,17 @@ def check_models_new_version_by_model_types(model_types:list, delay:float=1) ->
|
||||||
if not current_version_id:
|
if not current_version_id:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# check this version id in list
|
is_already_in_list = any(
|
||||||
is_already_in_list = False
|
current_version_id == new_version[3]
|
||||||
for new_version in new_versions:
|
for new_version in new_versions
|
||||||
if current_version_id == new_version[3]:
|
)
|
||||||
# already in list
|
|
||||||
is_already_in_list = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if is_already_in_list:
|
if is_already_in_list:
|
||||||
util.printD("New version is already in list")
|
util.printD("New version is already in list")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# search this new version id to check if this model is already downloaded
|
if target_model_info := search_local_model_info_by_version_id(
|
||||||
target_model_info = search_local_model_info_by_version_id(root, current_version_id)
|
root, current_version_id
|
||||||
if target_model_info:
|
):
|
||||||
util.printD("New version is already existed")
|
util.printD("New version is already existed")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ def get_custom_model_folder():
|
||||||
|
|
||||||
# write model info to file
|
# write model info to file
|
||||||
def write_model_info(path, model_info):
|
def write_model_info(path, model_info):
|
||||||
util.printD("Write model info to file: " + path)
|
util.printD(f"Write model info to file: {path}")
|
||||||
with open(os.path.realpath(path), 'w') as f:
|
with open(os.path.realpath(path), 'w') as f:
|
||||||
f.write(json.dumps(model_info, indent=4))
|
f.write(json.dumps(model_info, indent=4))
|
||||||
|
|
||||||
|
|
@ -72,10 +72,10 @@ def load_model_info(path):
|
||||||
try:
|
try:
|
||||||
model_info = json.load(f)
|
model_info = json.load(f)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
util.printD("Selected file is not json: " + path)
|
util.printD(f"Selected file is not json: {path}")
|
||||||
util.printD(e)
|
util.printD(e)
|
||||||
return
|
return
|
||||||
|
|
||||||
return model_info
|
return model_info
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -110,9 +110,9 @@ def get_model_names_by_type(model_type:str) -> list:
|
||||||
def get_model_path_by_type_and_name(model_type:str, model_name:str) -> str:
|
def get_model_path_by_type_and_name(model_type:str, model_name:str) -> str:
|
||||||
util.printD("Run get_model_path_by_type_and_name")
|
util.printD("Run get_model_path_by_type_and_name")
|
||||||
if model_type not in folders.keys():
|
if model_type not in folders.keys():
|
||||||
util.printD("unknown model_type: " + model_type)
|
util.printD(f"unknown model_type: {model_type}")
|
||||||
return
|
return
|
||||||
|
|
||||||
if not model_name:
|
if not model_name:
|
||||||
util.printD("model name can not be empty")
|
util.printD("model name can not be empty")
|
||||||
return
|
return
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue