Add support for passing extra paths through CLI
parent
a58b078ab4
commit
2e2e301a20
67
app.py
67
app.py
|
|
@ -1,3 +1,4 @@
|
||||||
|
from typing import List
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.responses import FileResponse
|
from fastapi.responses import FileResponse
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
@ -9,12 +10,46 @@ from scripts.iib.db.update_image_data import update_image_data
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
tag = "\033[31m[warn]\033[0m"
|
tag = "\033[31m[warn]\033[0m"
|
||||||
def paths_check(sd_webui_config: str):
|
|
||||||
|
|
||||||
|
def normalize_paths(paths: List[str]):
|
||||||
|
"""
|
||||||
|
Normalize a list of paths, ensuring that each path is an absolute path with no redundant components.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
paths (List[str]): A list of paths to be normalized.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str]: A list of normalized paths.
|
||||||
|
"""
|
||||||
|
res: List[str] = []
|
||||||
|
for path in paths:
|
||||||
|
# Skip empty or blank paths
|
||||||
|
if not path or len(path.strip()) == 0:
|
||||||
|
continue
|
||||||
|
# If the path is already an absolute path, use it as is
|
||||||
|
if os.path.isabs(path):
|
||||||
|
abs_path = path
|
||||||
|
# Otherwise, make the path absolute by joining it with the current working directory
|
||||||
|
else:
|
||||||
|
abs_path = os.path.join(os.getcwd(), path)
|
||||||
|
# If the absolute path exists, add it to the result after normalizing it
|
||||||
|
if os.path.exists(abs_path):
|
||||||
|
res.append(os.path.normpath(abs_path))
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def sd_webui_paths_check(sd_webui_config: str):
|
||||||
import json
|
import json
|
||||||
|
|
||||||
conf = {}
|
conf = {}
|
||||||
with open(sd_webui_config, "r") as f:
|
with open(sd_webui_config, "r") as f:
|
||||||
conf = json.loads(f.read())
|
conf = json.loads(f.read())
|
||||||
paths = [conf.get(key) for key in sd_img_dirs]
|
paths = [conf.get(key) for key in sd_img_dirs]
|
||||||
|
paths_check(paths)
|
||||||
|
|
||||||
|
|
||||||
|
def paths_check(paths):
|
||||||
for path in paths:
|
for path in paths:
|
||||||
if not path or len(path.strip()) == 0:
|
if not path or len(path.strip()) == 0:
|
||||||
continue
|
continue
|
||||||
|
|
@ -25,6 +60,7 @@ def paths_check(sd_webui_config: str):
|
||||||
if not os.path.exists(abs_path):
|
if not os.path.exists(abs_path):
|
||||||
print(f"{tag} The path '{abs_path}' will be ignored (value: {path}).")
|
print(f"{tag} The path '{abs_path}' will be ignored (value: {path}).")
|
||||||
|
|
||||||
|
|
||||||
def update_image_index(sd_webui_config: str):
|
def update_image_index(sd_webui_config: str):
|
||||||
dirs = get_valid_img_dirs(get_sd_webui_conf(sd_webui_config=sd_webui_config))
|
dirs = get_valid_img_dirs(get_sd_webui_conf(sd_webui_config=sd_webui_config))
|
||||||
if not len(dirs):
|
if not len(dirs):
|
||||||
|
|
@ -35,20 +71,35 @@ def update_image_index(sd_webui_config: str):
|
||||||
return print(f"{tag} it appears that there is some issue")
|
return print(f"{tag} it appears that there is some issue")
|
||||||
print("update image index completed. ✨")
|
print("update image index completed. ✨")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description='Process some integers.')
|
parser = argparse.ArgumentParser(description="Process some integers.")
|
||||||
parser.add_argument('--port', type=int, help='the port to use', default=8000)
|
parser.add_argument("--port", type=int, help="the port to use", default=8000)
|
||||||
parser.add_argument('--sd_webui_config', help='the path to the config file')
|
parser.add_argument("--sd_webui_config", help="the path to the config file")
|
||||||
parser.add_argument('--update_image_index', action='store_true', help='update the image index')
|
parser.add_argument(
|
||||||
|
"--update_image_index", action="store_true", help="update the image index"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--extra_paths", nargs="+", help="extra paths to use, will be added to Quick Move.", default=[]
|
||||||
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
sd_webui_config = args.sd_webui_config
|
sd_webui_config = args.sd_webui_config
|
||||||
if sd_webui_config:
|
if sd_webui_config:
|
||||||
paths_check(sd_webui_config)
|
sd_webui_paths_check(sd_webui_config)
|
||||||
if args.update_image_index:
|
if args.update_image_index:
|
||||||
update_image_index(sd_webui_config)
|
update_image_index(sd_webui_config)
|
||||||
|
paths_check(args.extra_paths)
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
def index():
|
def index():
|
||||||
return FileResponse(os.path.join(cwd, "vue/dist/index.html"))
|
return FileResponse(os.path.join(cwd, "vue/dist/index.html"))
|
||||||
infinite_image_browsing_api(None, app, sd_webui_config = sd_webui_config)
|
|
||||||
|
infinite_image_browsing_api(
|
||||||
|
None,
|
||||||
|
app,
|
||||||
|
sd_webui_config=sd_webui_config,
|
||||||
|
extra_paths_cli=normalize_paths(args.extra_paths),
|
||||||
|
)
|
||||||
uvicorn.run(app, host="127.0.0.1", port=args.port)
|
uvicorn.run(app, host="127.0.0.1", port=args.port)
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,8 @@ def infinite_image_browsing_api(_: Any, app: FastAPI, **kwargs):
|
||||||
r = ExtraPath.get_extra_paths(conn, "scanned")
|
r = ExtraPath.get_extra_paths(conn, "scanned")
|
||||||
mem["EXTRA_PATHS"] = [x.path for x in r]
|
mem["EXTRA_PATHS"] = [x.path for x in r]
|
||||||
|
|
||||||
def is_path_under_parents(path, parent_paths = img_search_dirs + mem["EXTRA_PATHS"]):
|
|
||||||
|
def is_path_under_parents(path, parent_paths = img_search_dirs + mem["EXTRA_PATHS"] + kwargs.get("extra_paths_cli", [])):
|
||||||
"""
|
"""
|
||||||
Check if the given path is under one of the specified parent paths.
|
Check if the given path is under one of the specified parent paths.
|
||||||
:param path: The path to check.
|
:param path: The path to check.
|
||||||
|
|
@ -98,7 +99,7 @@ def infinite_image_browsing_api(_: Any, app: FastAPI, **kwargs):
|
||||||
if not enable_access_control:
|
if not enable_access_control:
|
||||||
return True
|
return True
|
||||||
try:
|
try:
|
||||||
parent_paths: List[str] = img_search_dirs + mem["EXTRA_PATHS"]
|
parent_paths: List[str] = img_search_dirs + mem["EXTRA_PATHS"] + kwargs.get("extra_paths_cli", [])
|
||||||
path = os.path.normpath(path)
|
path = os.path.normpath(path)
|
||||||
for parent_path in parent_paths:
|
for parent_path in parent_paths:
|
||||||
if (len(path) <= len(parent_path)):
|
if (len(path) <= len(parent_path)):
|
||||||
|
|
@ -133,7 +134,7 @@ def infinite_image_browsing_api(_: Any, app: FastAPI, **kwargs):
|
||||||
try:
|
try:
|
||||||
conn = DataBase.get_conn()
|
conn = DataBase.get_conn()
|
||||||
all_custom_tags = Tag.get_all_custom_tag(conn)
|
all_custom_tags = Tag.get_all_custom_tag(conn)
|
||||||
extra_paths = ExtraPath.get_extra_paths(conn)
|
extra_paths = ExtraPath.get_extra_paths(conn) + [ExtraPath(path, "cli_access_only") for path in kwargs.get("extra_paths_cli", [])]
|
||||||
update_extra_paths(conn)
|
update_extra_paths(conn)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue