Merge branch 'main' into main

pull/21/head
AlUlkesh 2023-02-08 02:38:59 +01:00 committed by GitHub
commit bcab30019b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 450 additions and 112 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
path_recorder.txt
__pycache__
*.json
*.sqlite3

View File

@ -14,6 +14,7 @@ from modules import shared, scripts, images
from modules.shared import opts, cmd_opts
from modules.ui_common import plaintext_to_html
from modules.ui_components import ToolButton
from scripts.wib import wib_db
from PIL import Image
from PIL.ExifTags import TAGS
from PIL.JpegImagePlugin import JpegImageFile
@ -22,25 +23,23 @@ from pathlib import Path
from send2trash import send2trash
from typing import List, Tuple
yappi_do = False
favorite_tab_name = "Favorites"
tabs_list = ["txt2img", "img2img", "instruct-pix2pix", "txt2img-grids", "img2img-grids", "Extras", favorite_tab_name, "Others"] #txt2img-grids and img2img-grids added by HaylockGrant
tabs_list = ["txt2img", "img2img", "txt2img-grids", "img2img-grids", "Extras", favorite_tab_name, "Others"] #txt2img-grids and img2img-grids added by HaylockGrant
num_of_imgs_per_page = 0
loads_files_num = 0
path_recorder_filename = os.path.join(scripts.basedir(), "path_recorder.txt")
path_recorder_filename_tmp = f"{path_recorder_filename}.tmp"
aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json")
exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json")
ranking_file = os.path.join(scripts.basedir(), "ranking.json")
image_ext_list = [".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"]
cur_ranking_value="0"
finfo_aes = {}
exif_cache = {}
finfo_exif = {}
aes_cache = {}
none_select = "Nothing selected"
refresh_symbol = '\U0001f504' # 🔄
up_symbol = '\U000025b2' # ▲
down_symbol = '\U000025bc' # ▼
#warning_permission = "You have no permission to visit {}. If you want to visit all directories, add command line argument option '--administrator', <a style='color:#990' href='https://github.com/AUTOMATIC1111/stable-diffusion-webui/wiki/Command-Line-Arguments-and-Settings' target='_blank' rel='noopener noreferrer'>More details here</a>"
current_depth = 0
init = True
logger = logging.getLogger(__name__)
logger_mode = logging.ERROR
@ -73,19 +72,16 @@ def img_path_subdirs_get(img_path):
def img_path_add_remove(img_dir, path_recorder, add_remove, img_path_depth):
if add_remove == "add" or (add_remove == "remove" and img_dir in path_recorder):
if os.path.exists(path_recorder_filename_tmp):
os.remove(path_recorder_filename_tmp)
if add_remove == "add":
path_recorder[img_dir] = {
"depth": int(img_path_depth),
"path_display": f"{img_dir} [{int(img_path_depth)}]"
}
wib_db.update_path_recorder(img_dir, path_recorder[img_dir]["depth"], path_recorder[img_dir]["path_display"])
else:
del path_recorder[img_dir]
wib_db.delete_path_recorder(img_dir)
path_recorder = {key: value for key, value in sorted(path_recorder.items(), key=lambda x: x[0].lower())}
with open(path_recorder_filename_tmp, "w") as f:
json.dump(path_recorder, f, indent=4)
os.replace(path_recorder_filename_tmp, path_recorder_filename)
path_recorder_formatted = [value.get("path_display") for key, value in path_recorder.items()]
if add_remove == "remove":
selected = None
@ -101,19 +97,7 @@ def sort_order_flip(turn_page_switch, sort_order):
return 1, -turn_page_switch, sort_order
def read_path_recorder(path_recorder, path_recorder_formatted):
if os.path.exists(path_recorder_filename):
try:
with open(path_recorder_filename) as f:
path_recorder = json.load(f)
except json.JSONDecodeError:
with open(path_recorder_filename) as f:
path = f.readline().rstrip("\n")
while len(path) > 0:
path_recorder[path] = {
"depth": 0,
"path_display": f"{path} [0]"
}
path = f.readline().rstrip("\n")
path_recorder = wib_db.load_path_recorder()
path_recorder_formatted = [value.get("path_display") for key, value in path_recorder.items()]
path_recorder_formatted = sorted(path_recorder_formatted, key=lambda x: x.lower())
return path_recorder, path_recorder_formatted
@ -170,20 +154,6 @@ def save_image(file_name):
else:
return "<div style='color:#999'>Image not found (may have been already moved)</div>"
def create_ranked_file(filename, ranking):
if os.path.isfile(filename):
if not os.path.isfile(ranking_file):
data = {}
else:
with open(ranking_file, 'r') as file:
data = json.load(file)
data[filename] = ranking
with open(ranking_file, 'w') as file:
json.dump(data, file, indent=0)
def delete_image(delete_num, name, filenames, image_index, visible_num):
if name == "":
return filenames, delete_num
@ -232,30 +202,29 @@ def traverse_all_files(curr_path, image_list, tabname_box, img_path_depth) -> Li
return image_list
def cache_exif(fileinfos):
exif_cache = {}
aes_cache = {}
if os.path.isfile(exif_cache_file):
with open(exif_cache_file, 'r') as file:
exif_cache = json.load(file)
if os.path.isfile(aes_cache_file):
with open(aes_cache_file, 'r') as file:
aes_cache = json.load(file)
global finfo_exif, exif_cache, finfo_aes, aes_cache
if yappi_do:
import yappi
import pandas as pd
yappi.set_clock_type("wall")
yappi.start()
cache_exif_start = time.time()
new_exif = 0
new_aes = 0
conn, cursor = wib_db.transaction_begin()
for fi_info in fileinfos:
found_exif = False
found_aes = False
if fi_info[0] in exif_cache:
finfo_exif[fi_info[0]] = exif_cache[fi_info[0]]
found_exif = True
if fi_info[0] in aes_cache:
finfo_aes[fi_info[0]] = aes_cache[fi_info[0]]
found_aes = True
if not found_exif or not found_aes:
try:
if any(fi_info[0].endswith(ext) for ext in image_ext_list):
found_exif = False
found_aes = False
if fi_info[0] in exif_cache:
finfo_exif[fi_info[0]] = exif_cache[fi_info[0]]
found_exif = True
if fi_info[0] in aes_cache:
finfo_aes[fi_info[0]] = aes_cache[fi_info[0]]
found_aes = True
if not found_exif or not found_aes:
finfo_exif[fi_info[0]] = "0"
exif_cache[fi_info[0]] = "0"
finfo_aes[fi_info[0]] = "0"
@ -268,12 +237,17 @@ def cache_exif(fileinfos):
if allExif:
finfo_exif[fi_info[0]] = allExif
exif_cache[fi_info[0]] = allExif
wib_db.update_exif_data(conn, fi_info[0], allExif)
new_exif = new_exif + 1
m = re.search("(?:aesthetic_score:|Score:) (\d+.\d+)", allExif)
if m:
finfo_aes[fi_info[0]] = m.group(1)
aes_cache[fi_info[0]] = m.group(1)
new_aes = new_aes + 1
aes_value = m.group(1)
else:
aes_value = "0"
finfo_aes[fi_info[0]] = aes_value
aes_cache[fi_info[0]] = aes_value
wib_db.update_aes_data(conn, fi_info[0], aes_value)
new_aes = new_aes + 1
else:
try:
filename = os.path.splitext(fi_info[0])[0] + ".txt"
@ -283,22 +257,41 @@ def cache_exif(fileinfos):
geninfo += line
finfo_exif[fi_info[0]] = geninfo
exif_cache[fi_info[0]] = geninfo
wib_db.update_exif_data(conn, fi_info[0], geninfo)
new_exif = new_exif + 1
m = re.search("(?:aesthetic_score:|Score:) (\d+.\d+)", geninfo)
if m:
finfo_aes[fi_info[0]] = m.group(1)
aes_cache[fi_info[0]] = m.group(1)
new_aes = new_aes + 1
aes_value = m.group(1)
else:
aes_value = "0"
finfo_aes[fi_info[0]] = aes_value
aes_cache[fi_info[0]] = aes_value
wib_db.update_aes_data(conn, fi_info[0], aes_value)
new_aes = new_aes + 1
except Exception:
logger.warning(f"No EXIF in PNG/JPG or txt file for {fi_info[0]}")
except SyntaxError:
logger.warning(f"Non-PNG/JPG file in directory when doing EXIF check: {fi_info[0]}")
# Saved with defaults to not scan it again next time
finfo_exif[fi_info[0]] = "0"
exif_cache[fi_info[0]] = "0"
allExif = "0"
wib_db.update_exif_data(conn, fi_info[0], allExif)
new_exif = new_exif + 1
aes_value = "0"
finfo_aes[fi_info[0]] = aes_value
aes_cache[fi_info[0]] = aes_value
wib_db.update_aes_data(conn, fi_info[0], aes_value)
new_aes = new_aes + 1
wib_db.transaction_end(conn, cursor)
with open(exif_cache_file, 'w') as file:
json.dump(exif_cache, file, indent=0)
with open(aes_cache_file, 'w') as file:
json.dump(aes_cache, file, indent=0)
if yappi_do:
yappi.stop()
pd.set_option('display.float_format', lambda x: '%.6f' % x)
yappi_stats = yappi.get_func_stats().strip_dirs()
data = [(s.name, s.ncall, s.tsub, s.ttot, s.ttot/s.ncall) for s in yappi_stats]
df = pd.DataFrame(data, columns=['name', 'ncall', 'tsub', 'ttot', 'tavg'])
print(df.to_string(index=False))
yappi.get_thread_stats().print_all()
cache_exif_end = time.time()
logger.debug(f"cache_exif: {new_exif}/{len(fileinfos)} cache_aes: {new_aes}/{len(fileinfos)} {round(cache_exif_end - cache_exif_start, 1)} seconds")
@ -320,7 +313,7 @@ def natural_keys(text):
return [ atof(c) for c in re.split(r'[+-]?([0-9]+(?:[.][0-9]*)?|[.][0-9]+)', text) ]
def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword):
def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search):
global current_depth
current_depth = 0
fileinfos = traverse_all_files(dir_name, [], tabname_box, img_path_depth)
@ -333,7 +326,26 @@ def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path
fileinfos = [x for x in fileinfos if keyword.lower() in x[0].lower()]
filenames = [finfo[0] for finfo in fileinfos]
if len(exif_keyword) != 0:
fileinfos = [x for x in fileinfos if exif_keyword.lower() in finfo_exif[x[0]].lower()]
if negative_prompt_search == "Yes":
fileinfos = [x for x in fileinfos if exif_keyword.lower() in finfo_exif[x[0]].lower()]
else:
result = []
for file_info in fileinfos:
file_name = file_info[0]
file_exif = finfo_exif[file_name].lower()
start_index = file_exif.find("negative prompt: ")
end_index = file_exif.find("\n", start_index)
if negative_prompt_search == "Only":
sub_string = file_exif[start_index:end_index].split("negative prompt: ")[-1].strip()
if exif_keyword.lower() in sub_string:
result.append(file_info)
else:
sub_string = file_exif[start_index:end_index].strip()
file_exif = file_exif.replace(sub_string, "")
if exif_keyword.lower() in file_exif:
result.append(file_info)
fileinfos = result
filenames = [finfo[0] for finfo in fileinfos]
if len(aes_filter) != 0:
fileinfos = [x for x in fileinfos if finfo_aes[x[0]] >= aes_filter]
@ -393,16 +405,13 @@ def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path
filenames = [finfo for finfo in fileinfos]
return filenames
def get_image_page(img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword):
def get_image_page(img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search):
if img_path == "":
return [], page_index, [], "", "", "", 0, ""
img_path, _ = pure_path(img_path)
#if not cmd_opts.administrator:
# head = os.path.abspath(".")
# abs_path = os.path.abspath(img_path)
# if len(abs_path) < len(head) or abs_path[:len(head)] != head:
# warning = warning_permission.format(img_path)
# return None, 0, None, "", "", "", None, None, warning
if page_index == 1 or page_index == 0 or len(filenames) == 0:
filenames = get_all_images(img_path, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword)
filenames = get_all_images(img_path, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search)
page_index = int(page_index)
length = len(filenames)
max_page_index = length // num_of_imgs_per_page + 1
@ -444,14 +453,6 @@ def change_dir(img_dir, path_recorder, load_switch, img_path_history, img_path_d
return warning, gr.update(visible=False), img_path_history, path_recorder, load_switch, img_path, img_path_depth
else:
img_dir, img_path_depth = pure_path(img_dir)
#try:
# if not cmd_opts.administrator:
# head = os.path.abspath(".")
# abs_path = os.path.abspath(img_dir)
# if len(abs_path) < len(head) or abs_path[:len(head)] != head:
# warning = warning_permission.format(img_dir)
#except:
# pass
if warning is None:
try:
if os.path.exists(img_dir):
@ -472,21 +473,21 @@ def update_move_text(unused):
return f'{"Move" if not opts.images_copy_image else "Copy"} to favorites'
def get_ranking(filename):
ranking_value = "None"
if os.path.isfile(ranking_file):
with open(ranking_file, 'r') as file:
data = json.load(file)
if filename in data:
ranking_value = data[filename]
ranking_value = wib_db.select_ranking(filename)
return ranking_value
def create_tab(tabname):
global init, exif_cache, aes_cache
custom_dir = False
path_recorder = {}
path_recorder_formatted = []
if init:
wib_db.check()
exif_cache = wib_db.load_exif_data(exif_cache)
aes_cache = wib_db.load_aes_data(aes_cache)
init = False
if tabname == "txt2img":
dir_name = opts.outdir_txt2img_samples
elif tabname == "img2img":
@ -545,7 +546,7 @@ def create_tab(tabname):
next_page = gr.Button('Next Page')
end_page = gr.Button('End Page')
with gr.Column(scale=10):
ranking = gr.Radio(value="None", choices=["1", "2", "3", "4", "5", "None"], label="ranking", elem_id=f"{tabname}_images_ranking", interactive=True)
ranking = gr.Radio(value="None", choices=["1", "2", "3", "4", "5", "None"], label="ranking", interactive=True, visible=False)
auto_next = gr.Checkbox(label="Next Image After Ranking (To be implemented)", interactive=False, visible=False)
history_gallery = gr.Gallery(show_label=False, elem_id=tabname + "_images_history_gallery").style(grid=opts.images_history_page_columns)
with gr.Row() as delete_panel:
@ -556,12 +557,13 @@ def create_tab(tabname):
with gr.Column(scale=1):
with gr.Row(scale=0.5):
sort_by = gr.Dropdown(value="date", choices=["path name", "date", "aesthetic_score", "random", "cfg scale", "steps", "seed", "sampler", "size", "model", "model hash", "ranking"], label="sort by")
sort_order = ToolButton(value=down_symbol)
sort_by = gr.Dropdown(value="date", choices=["path name", "date", "aesthetic_score", "random", "cfg scale", "steps", "seed", "sampler", "size", "model", "model hash", "ranking"], label="sort by")
sort_order = ToolButton(value=down_symbol)
with gr.Row():
keyword = gr.Textbox(value="", label="filename keyword")
exif_keyword = gr.Textbox(value="", label="exif keyword")
with gr.Row():
exif_keyword = gr.Textbox(value="", label="exif keyword")
negative_prompt_search = gr.Radio(value="No", choices=["No", "Yes", "Only"], label="Include negative prompt in exif search", interactive=True)
with gr.Column():
ranking_filter = gr.Radio(value="All", choices=["All", "1", "2", "3", "4", "5", "None"], label="ranking filter", interactive=True)
with gr.Row():
@ -629,11 +631,11 @@ def create_tab(tabname):
turn_page_switch.change(
fn=get_image_page,
inputs=[img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword],
inputs=[img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search],
outputs=[filenames, page_index, history_gallery, img_file_name, img_file_time, img_file_info, visible_img_num, warning_box]
)
turn_page_switch.change(fn=None, inputs=[tabname_box], outputs=None, _js="images_history_turnpage")
turn_page_switch.change(fn=lambda:(gr.update(visible=False), gr.update(visible=False)), inputs=None, outputs=[delete_panel, button_panel])
turn_page_switch.change(fn=lambda:(gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)), inputs=None, outputs=[delete_panel, button_panel, ranking])
sort_order.click(
fn=sort_order_flip,
@ -665,7 +667,7 @@ def create_tab(tabname):
# other functions
set_index.click(show_image_info, _js="images_history_get_current_img", inputs=[tabname_box, image_index, page_index, filenames], outputs=[img_file_name, img_file_time, image_index, hidden])
set_index.click(fn=lambda:(gr.update(visible=True), gr.update(visible=True)), inputs=None, outputs=[delete_panel, button_panel])
set_index.click(fn=lambda:(gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)), inputs=None, outputs=[delete_panel, button_panel, ranking])
img_file_name.change(fn=lambda : "", inputs=None, outputs=[collected_warning])
img_file_name.change(get_ranking, inputs=img_file_name, outputs=ranking)
@ -673,10 +675,9 @@ def create_tab(tabname):
hidden.change(fn=run_pnginfo, inputs=[hidden, img_path, img_file_name], outputs=[info1, img_file_info, info2])
#ranking
ranking.change(create_ranked_file, inputs=[img_file_name, ranking])
ranking.change(wib_db.update_ranking, inputs=[img_file_name, ranking])
#ranking.change(show_next_image_info, _js="images_history_get_current_img", inputs=[tabname_box, image_index, page_index, auto_next], outputs=[img_file_name, img_file_time, image_index, hidden])
try:
modules.generation_parameters_copypaste.bind_buttons(send_to_buttons, hidden, img_file_info)
except:
@ -742,7 +743,3 @@ def on_ui_settings():
script_callbacks.on_ui_settings(on_ui_settings)
script_callbacks.on_ui_tabs(on_ui_tabs)
#TODO:
#move by arrow key
#generate info in txt

340
scripts/wib/wib_db.py Normal file
View File

@ -0,0 +1,340 @@
import json
import os
import sqlite3
from modules import scripts
path_recorder_file = os.path.join(scripts.basedir(), "path_recorder.txt")
aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json")
exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json")
ranking_file = os.path.join(scripts.basedir(), "ranking.json")
archive = os.path.join(scripts.basedir(), "archive")
db_file = os.path.join(scripts.basedir(), "wib.sqlite3")
np = "Negative prompt: "
st = "Steps: "
def create_db(cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_data (
key TEXT PRIMARY KEY,
value TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS path_recorder (
path TEXT PRIMARY KEY,
depth INT,
path_display TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER path_recorder_tr
AFTER UPDATE ON path_recorder
BEGIN
UPDATE path_recorder SET updated = CURRENT_TIMESTAMP WHERE path = OLD.path;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS exif_data (
file TEXT,
key TEXT,
value TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file, key)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS exif_data_key ON exif_data (key)
''')
cursor.execute('''
CREATE TRIGGER exif_data_tr
AFTER UPDATE ON exif_data
BEGIN
UPDATE exif_data SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file AND key = OLD.key;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS ranking (
file TEXT PRIMARY KEY,
ranking TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER ranking_tr
AFTER UPDATE ON ranking
BEGIN
UPDATE ranking SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file;
END;
''')
return
def migrate_path_recorder(cursor):
if os.path.exists(path_recorder_file):
try:
with open(path_recorder_file) as f:
# json-version
path_recorder = json.load(f)
for path, values in path_recorder.items():
depth = values["depth"]
path_display = values["path_display"]
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
except json.JSONDecodeError:
with open(path_recorder_file) as f:
# old txt-version
path = f.readline().rstrip("\n")
while len(path) > 0:
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, 0, f"{path} [0]"))
path = f.readline().rstrip("\n")
return
def update_exif_data(cursor, file, info):
prompt = "0"
negative_prompt = "0"
key_values = "0: 0"
if info != "0":
info_list = info.split("\n")
prompt = ""
negative_prompt = ""
key_values = ""
for info_item in info_list:
if info_item.startswith(st):
key_values = info_item
elif info_item.startswith(np):
negative_prompt = info_item.replace(np, "")
else:
prompt = info_item
if key_values != "":
key_value_pairs = []
key_value = ""
quote_open = False
for char in key_values + ",":
key_value += char
if char == '"':
quote_open = not quote_open
if char == "," and not quote_open:
try:
k, v = key_value.strip(" ,").split(": ")
except ValueError:
k = key_value.strip(" ,").split(": ")[0]
v = ""
key_value_pairs.append((k, v))
key_value = ""
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
except sqlite3.IntegrityError:
# Duplicate, delete all "file" entries and try again
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (file,))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "negative_prompt", negative_prompt))
for (key, value) in key_value_pairs:
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
except sqlite3.IntegrityError:
pass
return
def migrate_exif_data(cursor):
if os.path.exists(exif_cache_file):
with open(exif_cache_file, 'r') as file:
#with open("c:\\tools\\ai\\stable-diffusion-webui\\extensions\\stable-diffusion-webui-images-browser\\test.json ", 'r') as file:
exif_cache = json.load(file)
for file, info in exif_cache.items():
update_exif_data(cursor, file, info)
return
def migrate_ranking(cursor):
if os.path.exists(ranking_file):
with open(ranking_file, 'r') as file:
ranking = json.load(file)
for file, info in ranking.items():
if info != "None":
cursor.execute('''
INSERT INTO ranking (file, ranking)
VALUES (?, ?)
''', (file, info))
return
def update_db_data(cursor, key, value):
cursor.execute('''
INSERT OR REPLACE
INTO db_data (key, value)
VALUES (?, ?)
''', (key, value))
return
def check():
if not os.path.exists(db_file):
conn, cursor = transaction_begin()
create_db(cursor)
update_db_data(cursor, "version", "1")
migrate_path_recorder(cursor)
migrate_exif_data(cursor)
migrate_ranking(cursor)
transaction_end(conn, cursor)
return
def load_path_recorder():
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT path, depth, path_display
FROM path_recorder
''')
path_recorder = {path: {"depth": depth, "path_display": path_display} for path, depth, path_display in cursor.fetchall()}
return path_recorder
def select_ranking(filename):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ranking
FROM ranking
WHERE file = ?
''', (filename,))
ranking_value = cursor.fetchone()
if ranking_value is None:
ranking_value = ["0"]
return ranking_value[0]
def update_ranking(file, ranking):
if ranking != "0":
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO ranking (file, ranking)
VALUES (?, ?)
''', (file, ranking))
return
def update_path_recorder(path, depth, path_display):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
return
def delete_path_recorder(path):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM path_recorder
WHERE path = ?
''', (path,))
return
def transaction_begin():
conn = sqlite3.connect(db_file)
conn.isolation_level = None
cursor = conn.cursor()
cursor.execute("BEGIN")
return conn, cursor
def transaction_end(conn, cursor):
cursor.execute("COMMIT")
conn.close()
return
def update_aes_data(cursor, file, value):
key = "aesthetic_score"
cursor.execute('''
INSERT OR REPLACE
INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
return
def load_exif_data(exif_cache):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, group_concat(
case when key = 'prompt' or key = 'negative_prompt' then key || ': ' || value || '\n'
else key || ': ' || value
end, ', ') AS string
FROM (
SELECT *
FROM exif_data
ORDER BY
CASE WHEN key = 'prompt' THEN 0
WHEN key = 'negative_prompt' THEN 1
ELSE 2 END,
key
)
GROUP BY file
''')
rows = cursor.fetchall()
for row in rows:
exif_cache[row[0]] = row[1]
return exif_cache
def load_aes_data(aes_cache):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, value
FROM exif_data
WHERE key in ("aesthetic_score", "Score")
''')
rows = cursor.fetchall()
for row in rows:
aes_cache[row[0]] = row[1]
return aes_cache