Revert "Merge pull request #334 from zanllp/feature/access-control"

This reverts commit bb2e512b48, reversing
changes made to 478409dee8.
pull/339/head
zanllp 2023-07-23 21:11:43 +08:00
parent acf6bee37f
commit 323cdaa866
7 changed files with 96 additions and 191 deletions

View File

@ -10,23 +10,8 @@ IIB_SECRET_KEY=
# If you want to configure the language for the front-end pages, please set it on the extension's global settings page.
IIB_SERVER_LANG=auto
# ---------------------------- ACCESS_CONTROL ----------------------------
# Used to configure whether to enable access control to the file system.
# If enabled, only access to the provided pre-set folders (including those provided by sd-webui and manually \
# added to Quick Move or specified via IIB_ACCESS_CONTROL_ALLOWED_PATHS) will be allowed.
# The available options are 'enable', 'disable', and 'auto'.
# Used for configuring whether to enable access control to the file system.
# If enabled, only access to the provided pre-set folders (including those provided by sd-webui and manually added to Quick Move) will be allowed.
# The optional choices are 'enable', 'disable', and 'auto'.
# The default value is 'auto', which will be determined based on the command-line parameters used to start sd-webui.
IIB_ACCESS_CONTROL=auto
# This variable is used to define a list of allowed paths for the application to access when access control mode is enabled.
# It can be set to a comma-separated string of file paths or directory paths, representing the resources that are allowed to be accessed by the application.
# In addition, if sd_webui_config or sd_webui_dir has been configured, or if you're running this repository as an extension of sd-webui,
# you can use the following shortcuts (txt2img, img2img, extra, save) as values for the ALLOWED_PATHS variable.
# IIB_ACCESS_CONTROL_ALLOWED_PATHS=save,extra,/output ...etc
# This variable is used to control fine-grained access control for different types of requests, but only if access control mode is enabled.
# It can be set to a string value that represents a specific permission or set of permissions, such as "read-only", "write-only", "read-write", or "no-access".
# This variable can be used to restrict access to certain API endpoints or data sources based on the permissions required by the user.
# IIB_ACCESS_CONTROL_PERMISSION=read-write

31
app.py
View File

@ -4,7 +4,7 @@ from fastapi.responses import FileResponse
import uvicorn
import os
from scripts.iib.api import infinite_image_browsing_api, index_html_path
from scripts.iib.tool import get_sd_webui_conf, get_valid_img_dirs, sd_img_dirs, normalize_paths
from scripts.iib.tool import get_sd_webui_conf, get_valid_img_dirs, sd_img_dirs
from scripts.iib.db.datamodel import DataBase, Image
from scripts.iib.db.update_image_data import update_image_data
import argparse
@ -17,6 +17,33 @@ default_port = 8000
default_host = "127.0.0.1"
def normalize_paths(paths: List[str]):
"""
Normalize a list of paths, ensuring that each path is an absolute path with no redundant components.
Args:
paths (List[str]): A list of paths to be normalized.
Returns:
List[str]: A list of normalized paths.
"""
res: List[str] = []
for path in paths:
# Skip empty or blank paths
if not path or len(path.strip()) == 0:
continue
# If the path is already an absolute path, use it as is
if os.path.isabs(path):
abs_path = path
# Otherwise, make the path absolute by joining it with the current working directory
else:
abs_path = os.path.join(os.getcwd(), path)
# If the absolute path exists, add it to the result after normalizing it
if os.path.exists(abs_path):
res.append(os.path.normpath(abs_path))
return res
def sd_webui_paths_check(sd_webui_config: str, relative_to_config: bool):
conf = {}
with open(sd_webui_config, "r") as f:
@ -121,7 +148,7 @@ class AppUtils:
infinite_image_browsing_api(
app,
sd_webui_config=sd_webui_config,
extra_paths_cli=normalize_paths(extra_paths, os.getcwd()),
extra_paths_cli=normalize_paths(extra_paths),
sd_webui_path_relative_to_config=self.sd_webui_path_relative_to_config,
allow_cors=self.allow_cors,
enable_shutdown=self.enable_shutdown,

View File

@ -4,12 +4,14 @@ import pkg_resources
req_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "requirements.txt")
def dist2package(dist: str):
return ({
"pyfunctional": "functional",
"python-dotenv": "dotenv",
"Pillow": "PIL"
}).get(dist, dist)
def dist2package(dist):
if dist == "python-dotenv":
package = "dotenv"
elif dist == "Pillow":
package = "PIL"
else:
package = dist
return package
# copy from controlnet, thanks
with open(req_file) as file:

View File

@ -2,5 +2,4 @@ fastapi
uvicorn
piexif
python-dotenv
Pillow
pyfunctional
Pillow

View File

@ -18,13 +18,12 @@ from scripts.iib.tool import (
open_folder,
get_img_geninfo_txt_path,
unique_by,
create_zip_file,
normalize_paths,
create_zip_file
)
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
import asyncio
from typing import List, Optional
from typing import List, Optional, TypedDict
from pydantic import BaseModel
from fastapi.responses import FileResponse
from PIL import Image
@ -38,11 +37,9 @@ from scripts.iib.db.datamodel import (
Floder,
ImageTag,
ExtraPath,
FileInfoDict,
)
from scripts.iib.db.update_image_data import update_image_data
from scripts.iib.logger import logger
from functional import seq
index_html_path = os.path.join(cwd, "vue/dist/index.html") # 在app.py也被使用
@ -54,23 +51,8 @@ secret_key = os.getenv("IIB_SECRET_KEY")
if secret_key:
print("Secret key loaded successfully. ")
WRITEABLE_PERMISSIONS = ["read-write", "write-only"]
is_api_writeable = not (os.getenv("IIB_ACCESS_CONTROL_PERMISSION")) or (
os.getenv("IIB_ACCESS_CONTROL_PERMISSION") in WRITEABLE_PERMISSIONS
)
async def write_permission_required():
if not is_api_writeable:
error_msg = (
"User is not authorized to perform this action. Required permission: "
+ ", ".join(WRITEABLE_PERMISSIONS)
)
raise HTTPException(status_code=403, detail=error_msg)
async def verify_secret(request: Request):
async def get_token(request: Request):
if not secret_key:
return
token = request.cookies.get("IIB_S")
@ -102,32 +84,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
pass
def update_all_scanned_paths():
allowed_paths = os.getenv("IIB_ACCESS_CONTROL_ALLOWED_PATHS")
if allowed_paths:
sd_webui_conf = get_sd_webui_conf(**kwargs)
path_config_key_map = {
"save": "outdir_save",
"extra": "outdir_extras_samples",
"txt2img": "outdir_txt2img_samples",
"img2img": "outdir_img2img_samples",
}
def path_map(path: str):
path = path.strip()
if path in path_config_key_map:
return sd_webui_conf.get(path_config_key_map.get(path))
return path
paths = normalize_paths(
seq(allowed_paths.split(","))
.map(path_map)
.filter(lambda x: x)
.to_list()
)
else:
paths = (
img_search_dirs + mem["extra_paths"] + kwargs.get("extra_paths_cli", [])
)
paths = img_search_dirs + mem["extra_paths"] + kwargs.get("extra_paths_cli", [])
mem["all_scanned_paths"] = unique_by(paths)
update_all_scanned_paths()
@ -183,9 +140,6 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
if not is_path_trusted(path):
raise HTTPException(status_code=403)
def filter_allowed_files(files: List[FileInfoDict]):
return [x for x in files if is_path_trusted(x["fullpath"])]
static_dir = f"{cwd}/vue/dist"
if os.path.exists(static_dir):
app.mount(
@ -198,7 +152,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
async def greeting():
return "hello"
@app.get(f"{pre}/global_setting", dependencies=[Depends(verify_secret)])
@app.get(f"{pre}/global_setting", dependencies=[Depends(get_token)])
async def global_setting():
all_custom_tags = []
@ -228,10 +182,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
class DeleteFilesReq(BaseModel):
file_paths: List[str]
@app.post(
pre + "/delete_files",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/delete_files", dependencies=[Depends(get_token)])
async def delete_files(req: DeleteFilesReq):
conn = DataBase.get_conn()
for path in req.file_paths:
@ -269,10 +220,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
class CreateFoldersReq(BaseModel):
dest_folder: str
@app.post(
pre + "/mkdirs",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/mkdirs", dependencies=[Depends(get_token)])
async def create_folders(req: CreateFoldersReq):
if enable_access_control:
if not is_path_under_parents(req.dest_folder):
@ -284,10 +232,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
dest: str
create_dest_folder: Optional[bool] = False
@app.post(
pre + "/copy_files",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/copy_files", dependencies=[Depends(get_token)])
async def copy_files(req: MoveFilesReq):
for path in req.file_paths:
check_path_trust(path)
@ -304,10 +249,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
)
raise HTTPException(400, detail=error_msg)
@app.post(
pre + "/move_files",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/move_files", dependencies=[Depends(get_token)])
async def move_files(req: MoveFilesReq):
if req.create_dest_folder:
os.makedirs(req.dest, exist_ok=True)
@ -339,7 +281,16 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
)
raise HTTPException(400, detail=error_msg)
@app.get(pre + "/files", dependencies=[Depends(verify_secret)])
class FileInfoDict(TypedDict):
type: str
date: float
size: int
name: str
bytes: bytes
created_time: float
fullpath: str
@app.get(pre + "/files", dependencies=[Depends(get_token)])
async def get_target_floder_files(folder_path: str):
files: List[FileInfoDict] = []
try:
@ -392,9 +343,9 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
logger.error(e)
raise HTTPException(status_code=400, detail=str(e))
return {"files": filter_allowed_files(files)}
return {"files": [x for x in files if is_path_trusted(x["fullpath"])]}
@app.get(pre + "/image-thumbnail", dependencies=[Depends(verify_secret)])
@app.get(pre + "/image-thumbnail", dependencies=[Depends(get_token)])
async def thumbnail(path: str, t: str, size: str = "256x256"):
check_path_trust(path)
if not temp_path:
@ -427,7 +378,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
headers={"Cache-Control": "max-age=31536000", "ETag": hash},
)
@app.get(pre + "/file", dependencies=[Depends(verify_secret)])
@app.get(pre + "/file", dependencies=[Depends(get_token)])
async def get_file(path: str, t: str, disposition: Optional[str] = None):
filename = path
import mimetypes
@ -458,12 +409,12 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
headers=headers,
)
@app.post(pre + "/send_img_path", dependencies=[Depends(verify_secret)])
@app.post(pre + "/send_img_path", dependencies=[Depends(get_token)])
async def api_set_send_img_path(path: str):
send_img_path["value"] = path
# 等待图片信息生成完成
@app.get(pre + "/gen_info_completed", dependencies=[Depends(verify_secret)])
@app.get(pre + "/gen_info_completed", dependencies=[Depends(get_token)])
async def api_set_send_img_path():
for _ in range(30): # timeout 3s
if send_img_path["value"] == "": # 等待setup里面生成完成
@ -473,7 +424,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
await asyncio.sleep(0.1)
return send_img_path["value"] == ""
@app.get(pre + "/image_geninfo", dependencies=[Depends(verify_secret)])
@app.get(pre + "/image_geninfo", dependencies=[Depends(get_token)])
async def image_geninfo(path: str):
with Image.open(path) as img:
return read_info_from_image(img, path)
@ -481,27 +432,25 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
class CheckPathExistsReq(BaseModel):
paths: List[str]
@app.post(pre + "/check_path_exists", dependencies=[Depends(verify_secret)])
@app.post(pre + "/check_path_exists", dependencies=[Depends(get_token)])
async def check_path_exists(req: CheckPathExistsReq):
res = {}
for path in req.paths:
res[path] = os.path.exists(path) and is_path_trusted(path)
res[path] = os.path.exists(path)
return res
@app.get(pre)
def index_bd():
return FileResponse(index_html_path)
class PathsReq(BaseModel):
paths: List[str]
class OpenFolderReq(BaseModel):
path: str
@app.post(
pre + "/open_folder",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/open_folder", dependencies=[Depends(get_token)])
def open_folder_using_explore(req: OpenFolderReq):
if not is_path_trusted(req.path):
raise HTTPException(status_code=403)
@ -514,11 +463,8 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
raise HTTPException(status_code=403, detail="Shutdown is disabled.")
os.kill(os.getpid(), 9)
return {"message": "Application is shutting down."}
@app.post(
pre + "/zip",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(pre + "/zip", dependencies=[Depends(get_token)])
def zip_files(req: PathsReq):
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
@ -530,7 +476,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
db_pre = pre + "/db"
@app.get(db_pre + "/basic_info", dependencies=[Depends(verify_secret)])
@app.get(db_pre + "/basic_info", dependencies=[Depends(get_token)])
async def get_db_basic_info():
conn = DataBase.get_conn()
img_count = DbImg.count(conn)
@ -543,7 +489,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
"expired_dirs": expired_dirs,
}
@app.get(db_pre + "/expired_dirs", dependencies=[Depends(verify_secret)])
@app.get(db_pre + "/expired_dirs", dependencies=[Depends(get_token)])
async def get_db_expired():
conn = DataBase.get_conn()
expired_dirs = Floder.get_expired_dirs(conn)
@ -552,10 +498,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
"expired_dirs": expired_dirs,
}
@app.post(
db_pre + "/update_image_data",
dependencies=[Depends(verify_secret)],
)
@app.post(db_pre + "/update_image_data", dependencies=[Depends(get_token)])
async def update_image_db_data():
try:
DataBase._initing = True
@ -575,15 +518,17 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
or_tags: List[int]
not_tags: List[int]
@app.post(db_pre + "/match_images_by_tags", dependencies=[Depends(verify_secret)])
@app.post(db_pre + "/match_images_by_tags", dependencies=[Depends(get_token)])
async def match_image_by_tags(req: MatchImagesByTagsReq):
conn = DataBase.get_conn()
imgs = ImageTag.get_images_by_tags(
conn, {"and": req.and_tags, "or": req.or_tags, "not": req.not_tags}
)
return filter_allowed_files([x.to_file_info() for x in imgs])
return [
x.to_file_info()
for x in ImageTag.get_images_by_tags(
conn, {"and": req.and_tags, "or": req.or_tags, "not": req.not_tags}
)
]
@app.get(db_pre + "/img_selected_custom_tag", dependencies=[Depends(verify_secret)])
@app.get(db_pre + "/img_selected_custom_tag", dependencies=[Depends(get_token)])
async def get_img_selected_custom_tag(path: str):
path = os.path.normpath(path)
if not is_valid_image_path(path):
@ -602,7 +547,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
# tags = Tag.get_all_custom_tag()
return ImageTag.get_tags_for_image(conn, img.id, type="custom")
@app.post(db_pre + "/get_image_tags", dependencies=[Depends(verify_secret)])
@app.post(db_pre + "/get_image_tags", dependencies=[Depends(get_token)])
async def get_img_tags(req: PathsReq):
conn = DataBase.get_conn()
return ImageTag.batch_get_tags_by_path(conn, req.paths)
@ -611,10 +556,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
img_path: str
tag_id: int
@app.post(
db_pre + "/toggle_custom_tag_to_img",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(db_pre + "/toggle_custom_tag_to_img", dependencies=[Depends(get_token)])
async def toggle_custom_tag_to_img(req: ToggleCustomTagToImgReq):
conn = DataBase.get_conn()
path = os.path.normpath(req.img_path)
@ -648,10 +590,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
class AddCustomTagReq(BaseModel):
tag_name: str
@app.post(
db_pre + "/add_custom_tag",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(db_pre + "/add_custom_tag", dependencies=[Depends(get_token)])
async def add_custom_tag(req: AddCustomTagReq):
conn = DataBase.get_conn()
tag = Tag.get_or_create(conn, name=req.tag_name, type="custom")
@ -661,10 +600,7 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
class RemoveCustomTagReq(BaseModel):
tag_id: str
@app.post(
db_pre + "/remove_custom_tag",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(db_pre + "/remove_custom_tag", dependencies=[Depends(get_token)])
async def remove_custom_tag(req: RemoveCustomTagReq):
conn = DataBase.get_conn()
ImageTag.remove(conn, tag_id=req.tag_id)
@ -674,27 +610,22 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
img_id: int
tag_id: str
@app.post(
db_pre + "/remove_custom_tag_from_img",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.post(db_pre + "/remove_custom_tag_from_img", dependencies=[Depends(get_token)])
async def remove_custom_tag_from_img(req: RemoveCustomTagFromReq):
conn = DataBase.get_conn()
ImageTag.remove(conn, image_id=req.img_id, tag_id=req.tag_id)
@app.get(db_pre + "/search_by_substr", dependencies=[Depends(verify_secret)])
@app.get(db_pre + "/search_by_substr", dependencies=[Depends(get_token)])
async def search_by_substr(substr: str):
conn = DataBase.get_conn()
imgs = DbImg.find_by_substring(conn=conn, substring=substr)
return filter_allowed_files([x.to_file_info() for x in imgs])
return [x.to_file_info() for x in imgs]
class ScannedPathModel(BaseModel):
path: str
@app.post(
f"{db_pre}/scanned_paths",
status_code=201,
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
f"{db_pre}/scanned_paths", status_code=201, dependencies=[Depends(get_token)]
)
async def create_scanned_path(scanned_path: ScannedPathModel):
if enable_access_control:
@ -710,17 +641,14 @@ def infinite_image_browsing_api(app: FastAPI, **kwargs):
@app.get(
f"{db_pre}/scanned_paths",
response_model=List[ScannedPathModel],
dependencies=[Depends(verify_secret)],
dependencies=[Depends(get_token)],
)
async def read_scanned_paths():
conn = DataBase.get_conn()
paths = ExtraPath.get_extra_paths(conn, "scanned")
return [{"path": path.path} for path in paths]
@app.delete(
f"{db_pre}/scanned_paths",
dependencies=[Depends(verify_secret), Depends(write_permission_required)],
)
@app.delete(f"{db_pre}/scanned_paths", dependencies=[Depends(get_token)])
async def delete_scanned_path(scanned_path: ScannedPathModel):
conn = DataBase.get_conn()
ExtraPath.remove(conn, scanned_path.path)

View File

@ -1,5 +1,5 @@
from sqlite3 import Connection, connect
from typing import Dict, List, Optional, TypedDict
from typing import Dict, List, Optional
from scripts.iib.tool import (
cwd,
get_modified_date,
@ -13,15 +13,7 @@ from contextlib import closing
import os
import threading
class FileInfoDict(TypedDict):
type: str
date: float
size: int
name: str
bytes: bytes
created_time: float
fullpath: str
class DataBase:
local = threading.local()
@ -67,7 +59,7 @@ class Image:
self.size = size
self.date = date
def to_file_info(self) -> FileInfoDict:
def to_file_info(self):
return {
"type": "file",
"id": self.id,

View File

@ -60,32 +60,6 @@ def get_sd_webui_conf(**kwargs):
pass
return {}
def normalize_paths(paths: List[str], base = cwd):
"""
Normalize a list of paths, ensuring that each path is an absolute path with no redundant components.
Args:
paths (List[str]): A list of paths to be normalized.
Returns:
List[str]: A list of normalized paths.
"""
res: List[str] = []
for path in paths:
# Skip empty or blank paths
if not path or len(path.strip()) == 0:
continue
# If the path is already an absolute path, use it as is
if os.path.isabs(path):
abs_path = path
# Otherwise, make the path absolute by joining it with the current working directory
else:
abs_path = os.path.join(base, path)
# If the absolute path exists, add it to the result after normalizing it
if os.path.exists(abs_path):
res.append(os.path.normpath(abs_path))
return res
def get_valid_img_dirs(
conf,
@ -458,5 +432,3 @@ def open_folder(folder_path, file_path=None):
subprocess.run(["open", folder])
elif os.name == "posix":
subprocess.run(["xdg-open", folder])