Add support for specialized access control paths and permissions
parent
478409dee8
commit
a0814fc500
21
.env.example
21
.env.example
|
|
@ -10,8 +10,23 @@ IIB_SECRET_KEY=
|
|||
# If you want to configure the language for the front-end pages, please set it on the extension's global settings page.
|
||||
IIB_SERVER_LANG=auto
|
||||
|
||||
# Used for configuring whether to enable access control to the file system.
|
||||
# If enabled, only access to the provided pre-set folders (including those provided by sd-webui and manually added to Quick Move) will be allowed.
|
||||
# The optional choices are 'enable', 'disable', and 'auto'.
|
||||
|
||||
# ---------------------------- ACCESS_CONTROL ----------------------------
|
||||
|
||||
# Used to configure whether to enable access control to the file system.
|
||||
# If enabled, only access to the provided pre-set folders (including those provided by sd-webui and manually \
|
||||
# added to Quick Move or specified via IIB_ACCESS_CONTROL_ALLOWED_PATHS) will be allowed.
|
||||
# The available options are 'enable', 'disable', and 'auto'.
|
||||
# The default value is 'auto', which will be determined based on the command-line parameters used to start sd-webui.
|
||||
IIB_ACCESS_CONTROL=auto
|
||||
|
||||
# This variable is used to define a list of allowed paths for the application to access when access control mode is enabled.
|
||||
# It can be set to a comma-separated string of file paths or directory paths, representing the resources that are allowed to be accessed by the application.
|
||||
# In addition, if sd_webui_config or sd_webui_dir has been configured, or if you're running this repository as an extension of sd-webui,
|
||||
# you can use the following shortcuts (txt2img, img2img, extra, save) as values for the ALLOWED_PATHS variable.
|
||||
# IIB_ACCESS_CONTROL_ALLOWED_PATHS=save,extra,/output ...etc
|
||||
|
||||
# This variable is used to control fine-grained access control for different types of requests, but only if access control mode is enabled.
|
||||
# It can be set to a string value that represents a specific permission or set of permissions, such as "read-only", "write-only", "read-write", or "no-access".
|
||||
# This variable can be used to restrict access to certain API endpoints or data sources based on the permissions required by the user.
|
||||
# IIB_ACCESS_CONTROL_PERMISSION=read-write
|
||||
31
app.py
31
app.py
|
|
@ -4,7 +4,7 @@ from fastapi.responses import FileResponse
|
|||
import uvicorn
|
||||
import os
|
||||
from scripts.iib.api import infinite_image_browsing_api, index_html_path
|
||||
from scripts.iib.tool import get_sd_webui_conf, get_valid_img_dirs, sd_img_dirs
|
||||
from scripts.iib.tool import get_sd_webui_conf, get_valid_img_dirs, sd_img_dirs, normalize_paths
|
||||
from scripts.iib.db.datamodel import DataBase, Image
|
||||
from scripts.iib.db.update_image_data import update_image_data
|
||||
import argparse
|
||||
|
|
@ -17,33 +17,6 @@ default_port = 8000
|
|||
default_host = "127.0.0.1"
|
||||
|
||||
|
||||
def normalize_paths(paths: List[str]):
|
||||
"""
|
||||
Normalize a list of paths, ensuring that each path is an absolute path with no redundant components.
|
||||
|
||||
Args:
|
||||
paths (List[str]): A list of paths to be normalized.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of normalized paths.
|
||||
"""
|
||||
res: List[str] = []
|
||||
for path in paths:
|
||||
# Skip empty or blank paths
|
||||
if not path or len(path.strip()) == 0:
|
||||
continue
|
||||
# If the path is already an absolute path, use it as is
|
||||
if os.path.isabs(path):
|
||||
abs_path = path
|
||||
# Otherwise, make the path absolute by joining it with the current working directory
|
||||
else:
|
||||
abs_path = os.path.join(os.getcwd(), path)
|
||||
# If the absolute path exists, add it to the result after normalizing it
|
||||
if os.path.exists(abs_path):
|
||||
res.append(os.path.normpath(abs_path))
|
||||
return res
|
||||
|
||||
|
||||
def sd_webui_paths_check(sd_webui_config: str, relative_to_config: bool):
|
||||
conf = {}
|
||||
with open(sd_webui_config, "r") as f:
|
||||
|
|
@ -148,7 +121,7 @@ class AppUtils:
|
|||
infinite_image_browsing_api(
|
||||
app,
|
||||
sd_webui_config=sd_webui_config,
|
||||
extra_paths_cli=normalize_paths(extra_paths),
|
||||
extra_paths_cli=normalize_paths(extra_paths, os.getcwd()),
|
||||
sd_webui_path_relative_to_config=self.sd_webui_path_relative_to_config,
|
||||
allow_cors=self.allow_cors,
|
||||
enable_shutdown=self.enable_shutdown,
|
||||
|
|
|
|||
14
install.py
14
install.py
|
|
@ -4,14 +4,12 @@ import pkg_resources
|
|||
|
||||
req_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "requirements.txt")
|
||||
|
||||
def dist2package(dist):
|
||||
if dist == "python-dotenv":
|
||||
package = "dotenv"
|
||||
elif dist == "Pillow":
|
||||
package = "PIL"
|
||||
else:
|
||||
package = dist
|
||||
return package
|
||||
def dist2package(dist: str):
|
||||
return ({
|
||||
"pyfunctional": "functional",
|
||||
"python-dotenv": "dotenv",
|
||||
"Pillow": "PIL"
|
||||
}).get(dist, dist)
|
||||
|
||||
# copy from controlnet, thanks
|
||||
with open(req_file) as file:
|
||||
|
|
|
|||
|
|
@ -2,4 +2,5 @@ fastapi
|
|||
uvicorn
|
||||
piexif
|
||||
python-dotenv
|
||||
Pillow
|
||||
Pillow
|
||||
pyfunctional
|
||||
|
|
@ -18,12 +18,13 @@ from scripts.iib.tool import (
|
|||
open_folder,
|
||||
get_img_geninfo_txt_path,
|
||||
unique_by,
|
||||
create_zip_file
|
||||
create_zip_file,
|
||||
normalize_paths,
|
||||
)
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
import asyncio
|
||||
from typing import List, Optional, TypedDict
|
||||
from typing import List, Optional
|
||||
from pydantic import BaseModel
|
||||
from fastapi.responses import FileResponse
|
||||
from PIL import Image
|
||||
|
|
@ -37,9 +38,11 @@ from scripts.iib.db.datamodel import (
|
|||
Floder,
|
||||
ImageTag,
|
||||
ExtraPath,
|
||||
FileInfoDict,
|
||||
)
|
||||
from scripts.iib.db.update_image_data import update_image_data
|
||||
from scripts.iib.logger import logger
|
||||
from functional import seq
|
||||
|
||||
|
||||
index_html_path = os.path.join(cwd, "vue/dist/index.html") # 在app.py也被使用
|
||||
|
|
@ -51,8 +54,23 @@ secret_key = os.getenv("IIB_SECRET_KEY")
|
|||
if secret_key:
|
||||
print("Secret key loaded successfully. ")
|
||||
|
||||
WRITEABLE_PERMISSIONS = ["read-write", "write-only"]
|
||||
|
||||
async def get_token(request: Request):
|
||||
is_api_writeable = not (os.getenv("IIB_ACCESS_CONTROL_PERMISSION")) or (
|
||||
os.getenv("IIB_ACCESS_CONTROL_PERMISSION") in WRITEABLE_PERMISSIONS
|
||||
)
|
||||
|
||||
|
||||
async def write_permission_required():
|
||||
if not is_api_writeable:
|
||||
error_msg = (
|
||||
"User is not authorized to perform this action. Required permission: "
|
||||
+ ", ".join(WRITEABLE_PERMISSIONS)
|
||||
)
|
||||
raise HTTPException(status_code=403, detail=error_msg)
|
||||
|
||||
|
||||
async def verify_secret(request: Request):
|
||||
if not secret_key:
|
||||
return
|
||||
token = request.cookies.get("IIB_S")
|
||||
|
|
@ -84,7 +102,32 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
pass
|
||||
|
||||
def update_all_scanned_paths():
|
||||
paths = img_search_dirs + mem["extra_paths"] + kwargs.get("extra_paths_cli", [])
|
||||
allowed_paths = os.getenv("IIB_ACCESS_CONTROL_ALLOWED_PATHS")
|
||||
if allowed_paths:
|
||||
sd_webui_conf = get_sd_webui_conf(**kwargs)
|
||||
path_config_key_map = {
|
||||
"save": "outdir_save",
|
||||
"extra": "outdir_extras_samples",
|
||||
"txt2img": "outdir_txt2img_samples",
|
||||
"img2img": "outdir_img2img_samples",
|
||||
}
|
||||
|
||||
def path_map(path: str):
|
||||
path = path.strip()
|
||||
if path in path_config_key_map:
|
||||
return sd_webui_conf.get(path_config_key_map.get(path))
|
||||
return path
|
||||
|
||||
paths = normalize_paths(
|
||||
seq(allowed_paths.split(","))
|
||||
.map(path_map)
|
||||
.filter(lambda x: x)
|
||||
.to_list()
|
||||
)
|
||||
else:
|
||||
paths = (
|
||||
img_search_dirs + mem["extra_paths"] + kwargs.get("extra_paths_cli", [])
|
||||
)
|
||||
mem["all_scanned_paths"] = unique_by(paths)
|
||||
|
||||
update_all_scanned_paths()
|
||||
|
|
@ -140,6 +183,9 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
if not is_path_trusted(path):
|
||||
raise HTTPException(status_code=403)
|
||||
|
||||
def filter_allowed_files(files: List[FileInfoDict]):
|
||||
return [x for x in files if is_path_trusted(x["fullpath"])]
|
||||
|
||||
static_dir = f"{cwd}/vue/dist"
|
||||
if os.path.exists(static_dir):
|
||||
app.mount(
|
||||
|
|
@ -152,7 +198,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
async def greeting():
|
||||
return "hello"
|
||||
|
||||
@app.get(f"{pre}/global_setting", dependencies=[Depends(get_token)])
|
||||
@app.get(f"{pre}/global_setting", dependencies=[Depends(verify_secret)])
|
||||
async def global_setting():
|
||||
all_custom_tags = []
|
||||
|
||||
|
|
@ -182,7 +228,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
class DeleteFilesReq(BaseModel):
|
||||
file_paths: List[str]
|
||||
|
||||
@app.post(pre + "/delete_files", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
pre + "/delete_files",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def delete_files(req: DeleteFilesReq):
|
||||
conn = DataBase.get_conn()
|
||||
for path in req.file_paths:
|
||||
|
|
@ -220,7 +269,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
class CreateFoldersReq(BaseModel):
|
||||
dest_folder: str
|
||||
|
||||
@app.post(pre + "/mkdirs", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
pre + "/mkdirs",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def create_folders(req: CreateFoldersReq):
|
||||
if enable_access_control:
|
||||
if not is_path_under_parents(req.dest_folder):
|
||||
|
|
@ -232,7 +284,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
dest: str
|
||||
create_dest_folder: Optional[bool] = False
|
||||
|
||||
@app.post(pre + "/copy_files", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
pre + "/copy_files",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def copy_files(req: MoveFilesReq):
|
||||
for path in req.file_paths:
|
||||
check_path_trust(path)
|
||||
|
|
@ -249,7 +304,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
)
|
||||
raise HTTPException(400, detail=error_msg)
|
||||
|
||||
@app.post(pre + "/move_files", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
pre + "/move_files",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def move_files(req: MoveFilesReq):
|
||||
if req.create_dest_folder:
|
||||
os.makedirs(req.dest, exist_ok=True)
|
||||
|
|
@ -281,16 +339,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
)
|
||||
raise HTTPException(400, detail=error_msg)
|
||||
|
||||
class FileInfoDict(TypedDict):
|
||||
type: str
|
||||
date: float
|
||||
size: int
|
||||
name: str
|
||||
bytes: bytes
|
||||
created_time: float
|
||||
fullpath: str
|
||||
|
||||
@app.get(pre + "/files", dependencies=[Depends(get_token)])
|
||||
@app.get(pre + "/files", dependencies=[Depends(verify_secret)])
|
||||
async def get_target_floder_files(folder_path: str):
|
||||
files: List[FileInfoDict] = []
|
||||
try:
|
||||
|
|
@ -343,9 +392,9 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
logger.error(e)
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
return {"files": [x for x in files if is_path_trusted(x["fullpath"])]}
|
||||
return {"files": filter_allowed_files(files)}
|
||||
|
||||
@app.get(pre + "/image-thumbnail", dependencies=[Depends(get_token)])
|
||||
@app.get(pre + "/image-thumbnail", dependencies=[Depends(verify_secret)])
|
||||
async def thumbnail(path: str, t: str, size: str = "256x256"):
|
||||
check_path_trust(path)
|
||||
if not temp_path:
|
||||
|
|
@ -378,7 +427,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
headers={"Cache-Control": "max-age=31536000", "ETag": hash},
|
||||
)
|
||||
|
||||
@app.get(pre + "/file", dependencies=[Depends(get_token)])
|
||||
@app.get(pre + "/file", dependencies=[Depends(verify_secret)])
|
||||
async def get_file(path: str, t: str, disposition: Optional[str] = None):
|
||||
filename = path
|
||||
import mimetypes
|
||||
|
|
@ -409,12 +458,12 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
headers=headers,
|
||||
)
|
||||
|
||||
@app.post(pre + "/send_img_path", dependencies=[Depends(get_token)])
|
||||
@app.post(pre + "/send_img_path", dependencies=[Depends(verify_secret)])
|
||||
async def api_set_send_img_path(path: str):
|
||||
send_img_path["value"] = path
|
||||
|
||||
# 等待图片信息生成完成
|
||||
@app.get(pre + "/gen_info_completed", dependencies=[Depends(get_token)])
|
||||
@app.get(pre + "/gen_info_completed", dependencies=[Depends(verify_secret)])
|
||||
async def api_set_send_img_path():
|
||||
for _ in range(30): # timeout 3s
|
||||
if send_img_path["value"] == "": # 等待setup里面生成完成
|
||||
|
|
@ -424,7 +473,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
await asyncio.sleep(0.1)
|
||||
return send_img_path["value"] == ""
|
||||
|
||||
@app.get(pre + "/image_geninfo", dependencies=[Depends(get_token)])
|
||||
@app.get(pre + "/image_geninfo", dependencies=[Depends(verify_secret)])
|
||||
async def image_geninfo(path: str):
|
||||
with Image.open(path) as img:
|
||||
return read_info_from_image(img, path)
|
||||
|
|
@ -432,25 +481,27 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
class CheckPathExistsReq(BaseModel):
|
||||
paths: List[str]
|
||||
|
||||
@app.post(pre + "/check_path_exists", dependencies=[Depends(get_token)])
|
||||
@app.post(pre + "/check_path_exists", dependencies=[Depends(verify_secret)])
|
||||
async def check_path_exists(req: CheckPathExistsReq):
|
||||
res = {}
|
||||
for path in req.paths:
|
||||
res[path] = os.path.exists(path)
|
||||
res[path] = os.path.exists(path) and is_path_trusted(path)
|
||||
return res
|
||||
|
||||
@app.get(pre)
|
||||
def index_bd():
|
||||
return FileResponse(index_html_path)
|
||||
|
||||
|
||||
class PathsReq(BaseModel):
|
||||
paths: List[str]
|
||||
|
||||
class OpenFolderReq(BaseModel):
|
||||
path: str
|
||||
|
||||
@app.post(pre + "/open_folder", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
pre + "/open_folder",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
def open_folder_using_explore(req: OpenFolderReq):
|
||||
if not is_path_trusted(req.path):
|
||||
raise HTTPException(status_code=403)
|
||||
|
|
@ -463,8 +514,11 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
raise HTTPException(status_code=403, detail="Shutdown is disabled.")
|
||||
os.kill(os.getpid(), 9)
|
||||
return {"message": "Application is shutting down."}
|
||||
|
||||
@app.post(pre + "/zip", dependencies=[Depends(get_token)])
|
||||
|
||||
@app.post(
|
||||
pre + "/zip",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
def zip_files(req: PathsReq):
|
||||
now = datetime.now()
|
||||
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
|
||||
|
|
@ -476,7 +530,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
|
||||
db_pre = pre + "/db"
|
||||
|
||||
@app.get(db_pre + "/basic_info", dependencies=[Depends(get_token)])
|
||||
@app.get(db_pre + "/basic_info", dependencies=[Depends(verify_secret)])
|
||||
async def get_db_basic_info():
|
||||
conn = DataBase.get_conn()
|
||||
img_count = DbImg.count(conn)
|
||||
|
|
@ -489,7 +543,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
"expired_dirs": expired_dirs,
|
||||
}
|
||||
|
||||
@app.get(db_pre + "/expired_dirs", dependencies=[Depends(get_token)])
|
||||
@app.get(db_pre + "/expired_dirs", dependencies=[Depends(verify_secret)])
|
||||
async def get_db_expired():
|
||||
conn = DataBase.get_conn()
|
||||
expired_dirs = Floder.get_expired_dirs(conn)
|
||||
|
|
@ -498,7 +552,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
"expired_dirs": expired_dirs,
|
||||
}
|
||||
|
||||
@app.post(db_pre + "/update_image_data", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
db_pre + "/update_image_data",
|
||||
dependencies=[Depends(verify_secret)],
|
||||
)
|
||||
async def update_image_db_data():
|
||||
try:
|
||||
DataBase._initing = True
|
||||
|
|
@ -518,17 +575,15 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
or_tags: List[int]
|
||||
not_tags: List[int]
|
||||
|
||||
@app.post(db_pre + "/match_images_by_tags", dependencies=[Depends(get_token)])
|
||||
@app.post(db_pre + "/match_images_by_tags", dependencies=[Depends(verify_secret)])
|
||||
async def match_image_by_tags(req: MatchImagesByTagsReq):
|
||||
conn = DataBase.get_conn()
|
||||
return [
|
||||
x.to_file_info()
|
||||
for x in ImageTag.get_images_by_tags(
|
||||
conn, {"and": req.and_tags, "or": req.or_tags, "not": req.not_tags}
|
||||
)
|
||||
]
|
||||
imgs = ImageTag.get_images_by_tags(
|
||||
conn, {"and": req.and_tags, "or": req.or_tags, "not": req.not_tags}
|
||||
)
|
||||
return filter_allowed_files([x.to_file_info() for x in imgs])
|
||||
|
||||
@app.get(db_pre + "/img_selected_custom_tag", dependencies=[Depends(get_token)])
|
||||
@app.get(db_pre + "/img_selected_custom_tag", dependencies=[Depends(verify_secret)])
|
||||
async def get_img_selected_custom_tag(path: str):
|
||||
path = os.path.normpath(path)
|
||||
if not is_valid_image_path(path):
|
||||
|
|
@ -547,7 +602,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
# tags = Tag.get_all_custom_tag()
|
||||
return ImageTag.get_tags_for_image(conn, img.id, type="custom")
|
||||
|
||||
@app.post(db_pre + "/get_image_tags", dependencies=[Depends(get_token)])
|
||||
@app.post(db_pre + "/get_image_tags", dependencies=[Depends(verify_secret)])
|
||||
async def get_img_tags(req: PathsReq):
|
||||
conn = DataBase.get_conn()
|
||||
return ImageTag.batch_get_tags_by_path(conn, req.paths)
|
||||
|
|
@ -556,7 +611,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
img_path: str
|
||||
tag_id: int
|
||||
|
||||
@app.post(db_pre + "/toggle_custom_tag_to_img", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
db_pre + "/toggle_custom_tag_to_img",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def toggle_custom_tag_to_img(req: ToggleCustomTagToImgReq):
|
||||
conn = DataBase.get_conn()
|
||||
path = os.path.normpath(req.img_path)
|
||||
|
|
@ -590,7 +648,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
class AddCustomTagReq(BaseModel):
|
||||
tag_name: str
|
||||
|
||||
@app.post(db_pre + "/add_custom_tag", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
db_pre + "/add_custom_tag",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def add_custom_tag(req: AddCustomTagReq):
|
||||
conn = DataBase.get_conn()
|
||||
tag = Tag.get_or_create(conn, name=req.tag_name, type="custom")
|
||||
|
|
@ -600,7 +661,10 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
class RemoveCustomTagReq(BaseModel):
|
||||
tag_id: str
|
||||
|
||||
@app.post(db_pre + "/remove_custom_tag", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
db_pre + "/remove_custom_tag",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def remove_custom_tag(req: RemoveCustomTagReq):
|
||||
conn = DataBase.get_conn()
|
||||
ImageTag.remove(conn, tag_id=req.tag_id)
|
||||
|
|
@ -610,22 +674,27 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
img_id: int
|
||||
tag_id: str
|
||||
|
||||
@app.post(db_pre + "/remove_custom_tag_from_img", dependencies=[Depends(get_token)])
|
||||
@app.post(
|
||||
db_pre + "/remove_custom_tag_from_img",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def remove_custom_tag_from_img(req: RemoveCustomTagFromReq):
|
||||
conn = DataBase.get_conn()
|
||||
ImageTag.remove(conn, image_id=req.img_id, tag_id=req.tag_id)
|
||||
|
||||
@app.get(db_pre + "/search_by_substr", dependencies=[Depends(get_token)])
|
||||
@app.get(db_pre + "/search_by_substr", dependencies=[Depends(verify_secret)])
|
||||
async def search_by_substr(substr: str):
|
||||
conn = DataBase.get_conn()
|
||||
imgs = DbImg.find_by_substring(conn=conn, substring=substr)
|
||||
return [x.to_file_info() for x in imgs]
|
||||
return filter_allowed_files([x.to_file_info() for x in imgs])
|
||||
|
||||
class ScannedPathModel(BaseModel):
|
||||
path: str
|
||||
|
||||
@app.post(
|
||||
f"{db_pre}/scanned_paths", status_code=201, dependencies=[Depends(get_token)]
|
||||
f"{db_pre}/scanned_paths",
|
||||
status_code=201,
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def create_scanned_path(scanned_path: ScannedPathModel):
|
||||
if enable_access_control:
|
||||
|
|
@ -641,14 +710,17 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
|
|||
@app.get(
|
||||
f"{db_pre}/scanned_paths",
|
||||
response_model=List[ScannedPathModel],
|
||||
dependencies=[Depends(get_token)],
|
||||
dependencies=[Depends(verify_secret)],
|
||||
)
|
||||
async def read_scanned_paths():
|
||||
conn = DataBase.get_conn()
|
||||
paths = ExtraPath.get_extra_paths(conn, "scanned")
|
||||
return [{"path": path.path} for path in paths]
|
||||
|
||||
@app.delete(f"{db_pre}/scanned_paths", dependencies=[Depends(get_token)])
|
||||
@app.delete(
|
||||
f"{db_pre}/scanned_paths",
|
||||
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
|
||||
)
|
||||
async def delete_scanned_path(scanned_path: ScannedPathModel):
|
||||
conn = DataBase.get_conn()
|
||||
ExtraPath.remove(conn, scanned_path.path)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from sqlite3 import Connection, connect
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, TypedDict
|
||||
from scripts.iib.tool import (
|
||||
cwd,
|
||||
get_modified_date,
|
||||
|
|
@ -13,7 +13,15 @@ from contextlib import closing
|
|||
import os
|
||||
import threading
|
||||
|
||||
|
||||
class FileInfoDict(TypedDict):
|
||||
type: str
|
||||
date: float
|
||||
size: int
|
||||
name: str
|
||||
bytes: bytes
|
||||
created_time: float
|
||||
fullpath: str
|
||||
|
||||
class DataBase:
|
||||
local = threading.local()
|
||||
|
||||
|
|
@ -59,7 +67,7 @@ class Image:
|
|||
self.size = size
|
||||
self.date = date
|
||||
|
||||
def to_file_info(self):
|
||||
def to_file_info(self) -> FileInfoDict:
|
||||
return {
|
||||
"type": "file",
|
||||
"id": self.id,
|
||||
|
|
|
|||
|
|
@ -60,6 +60,32 @@ def get_sd_webui_conf(**kwargs):
|
|||
pass
|
||||
return {}
|
||||
|
||||
def normalize_paths(paths: List[str], base = cwd):
|
||||
"""
|
||||
Normalize a list of paths, ensuring that each path is an absolute path with no redundant components.
|
||||
|
||||
Args:
|
||||
paths (List[str]): A list of paths to be normalized.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of normalized paths.
|
||||
"""
|
||||
res: List[str] = []
|
||||
for path in paths:
|
||||
# Skip empty or blank paths
|
||||
if not path or len(path.strip()) == 0:
|
||||
continue
|
||||
# If the path is already an absolute path, use it as is
|
||||
if os.path.isabs(path):
|
||||
abs_path = path
|
||||
# Otherwise, make the path absolute by joining it with the current working directory
|
||||
else:
|
||||
abs_path = os.path.join(base, path)
|
||||
# If the absolute path exists, add it to the result after normalizing it
|
||||
if os.path.exists(abs_path):
|
||||
res.append(os.path.normpath(abs_path))
|
||||
return res
|
||||
|
||||
|
||||
def get_valid_img_dirs(
|
||||
conf,
|
||||
|
|
@ -432,3 +458,5 @@ def open_folder(folder_path, file_path=None):
|
|||
subprocess.run(["open", folder])
|
||||
elif os.name == "posix":
|
||||
subprocess.run(["xdg-open", folder])
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue