diff --git a/.gitignore b/.gitignore index 369c321..d7d931d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ path_recorder.txt __pycache__ *.json +*.sqlite3 diff --git a/scripts/images_history.py b/scripts/images_history.py index 02bb60a..10c37c3 100644 --- a/scripts/images_history.py +++ b/scripts/images_history.py @@ -14,6 +14,7 @@ from modules import shared, scripts, images from modules.shared import opts, cmd_opts from modules.ui_common import plaintext_to_html from modules.ui_components import ToolButton +from scripts.wib import wib_db from PIL import Image from PIL.ExifTags import TAGS from PIL.JpegImagePlugin import JpegImageFile @@ -22,25 +23,23 @@ from pathlib import Path from send2trash import send2trash from typing import List, Tuple +yappi_do = False favorite_tab_name = "Favorites" -tabs_list = ["txt2img", "img2img", "instruct-pix2pix", "txt2img-grids", "img2img-grids", "Extras", favorite_tab_name, "Others"] #txt2img-grids and img2img-grids added by HaylockGrant +tabs_list = ["txt2img", "img2img", "txt2img-grids", "img2img-grids", "Extras", favorite_tab_name, "Others"] #txt2img-grids and img2img-grids added by HaylockGrant num_of_imgs_per_page = 0 loads_files_num = 0 -path_recorder_filename = os.path.join(scripts.basedir(), "path_recorder.txt") -path_recorder_filename_tmp = f"{path_recorder_filename}.tmp" -aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json") -exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json") -ranking_file = os.path.join(scripts.basedir(), "ranking.json") image_ext_list = [".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"] cur_ranking_value="0" finfo_aes = {} +exif_cache = {} finfo_exif = {} +aes_cache = {} none_select = "Nothing selected" refresh_symbol = '\U0001f504' # 🔄 up_symbol = '\U000025b2' # ▲ down_symbol = '\U000025bc' # ▼ -#warning_permission = "You have no permission to visit {}. If you want to visit all directories, add command line argument option '--administrator', More details here" current_depth = 0 +init = True logger = logging.getLogger(__name__) logger_mode = logging.ERROR @@ -73,19 +72,16 @@ def img_path_subdirs_get(img_path): def img_path_add_remove(img_dir, path_recorder, add_remove, img_path_depth): if add_remove == "add" or (add_remove == "remove" and img_dir in path_recorder): - if os.path.exists(path_recorder_filename_tmp): - os.remove(path_recorder_filename_tmp) if add_remove == "add": path_recorder[img_dir] = { "depth": int(img_path_depth), "path_display": f"{img_dir} [{int(img_path_depth)}]" } + wib_db.update_path_recorder(img_dir, path_recorder[img_dir]["depth"], path_recorder[img_dir]["path_display"]) else: del path_recorder[img_dir] + wib_db.delete_path_recorder(img_dir) path_recorder = {key: value for key, value in sorted(path_recorder.items(), key=lambda x: x[0].lower())} - with open(path_recorder_filename_tmp, "w") as f: - json.dump(path_recorder, f, indent=4) - os.replace(path_recorder_filename_tmp, path_recorder_filename) path_recorder_formatted = [value.get("path_display") for key, value in path_recorder.items()] if add_remove == "remove": selected = None @@ -101,19 +97,7 @@ def sort_order_flip(turn_page_switch, sort_order): return 1, -turn_page_switch, sort_order def read_path_recorder(path_recorder, path_recorder_formatted): - if os.path.exists(path_recorder_filename): - try: - with open(path_recorder_filename) as f: - path_recorder = json.load(f) - except json.JSONDecodeError: - with open(path_recorder_filename) as f: - path = f.readline().rstrip("\n") - while len(path) > 0: - path_recorder[path] = { - "depth": 0, - "path_display": f"{path} [0]" - } - path = f.readline().rstrip("\n") + path_recorder = wib_db.load_path_recorder() path_recorder_formatted = [value.get("path_display") for key, value in path_recorder.items()] path_recorder_formatted = sorted(path_recorder_formatted, key=lambda x: x.lower()) return path_recorder, path_recorder_formatted @@ -170,20 +154,6 @@ def save_image(file_name): else: return "
Image not found (may have been already moved)
" -def create_ranked_file(filename, ranking): - if os.path.isfile(filename): - if not os.path.isfile(ranking_file): - data = {} - - else: - with open(ranking_file, 'r') as file: - data = json.load(file) - - data[filename] = ranking - - with open(ranking_file, 'w') as file: - json.dump(data, file, indent=0) - def delete_image(delete_num, name, filenames, image_index, visible_num): if name == "": return filenames, delete_num @@ -232,30 +202,29 @@ def traverse_all_files(curr_path, image_list, tabname_box, img_path_depth) -> Li return image_list def cache_exif(fileinfos): - exif_cache = {} - aes_cache = {} - - if os.path.isfile(exif_cache_file): - with open(exif_cache_file, 'r') as file: - exif_cache = json.load(file) - if os.path.isfile(aes_cache_file): - with open(aes_cache_file, 'r') as file: - aes_cache = json.load(file) + global finfo_exif, exif_cache, finfo_aes, aes_cache + if yappi_do: + import yappi + import pandas as pd + yappi.set_clock_type("wall") + yappi.start() + cache_exif_start = time.time() new_exif = 0 new_aes = 0 + conn, cursor = wib_db.transaction_begin() for fi_info in fileinfos: - found_exif = False - found_aes = False - if fi_info[0] in exif_cache: - finfo_exif[fi_info[0]] = exif_cache[fi_info[0]] - found_exif = True - if fi_info[0] in aes_cache: - finfo_aes[fi_info[0]] = aes_cache[fi_info[0]] - found_aes = True - if not found_exif or not found_aes: - try: + if any(fi_info[0].endswith(ext) for ext in image_ext_list): + found_exif = False + found_aes = False + if fi_info[0] in exif_cache: + finfo_exif[fi_info[0]] = exif_cache[fi_info[0]] + found_exif = True + if fi_info[0] in aes_cache: + finfo_aes[fi_info[0]] = aes_cache[fi_info[0]] + found_aes = True + if not found_exif or not found_aes: finfo_exif[fi_info[0]] = "0" exif_cache[fi_info[0]] = "0" finfo_aes[fi_info[0]] = "0" @@ -268,12 +237,17 @@ def cache_exif(fileinfos): if allExif: finfo_exif[fi_info[0]] = allExif exif_cache[fi_info[0]] = allExif + wib_db.update_exif_data(conn, fi_info[0], allExif) new_exif = new_exif + 1 m = re.search("(?:aesthetic_score:|Score:) (\d+.\d+)", allExif) if m: - finfo_aes[fi_info[0]] = m.group(1) - aes_cache[fi_info[0]] = m.group(1) - new_aes = new_aes + 1 + aes_value = m.group(1) + else: + aes_value = "0" + finfo_aes[fi_info[0]] = aes_value + aes_cache[fi_info[0]] = aes_value + wib_db.update_aes_data(conn, fi_info[0], aes_value) + new_aes = new_aes + 1 else: try: filename = os.path.splitext(fi_info[0])[0] + ".txt" @@ -283,22 +257,41 @@ def cache_exif(fileinfos): geninfo += line finfo_exif[fi_info[0]] = geninfo exif_cache[fi_info[0]] = geninfo + wib_db.update_exif_data(conn, fi_info[0], geninfo) new_exif = new_exif + 1 m = re.search("(?:aesthetic_score:|Score:) (\d+.\d+)", geninfo) if m: - finfo_aes[fi_info[0]] = m.group(1) - aes_cache[fi_info[0]] = m.group(1) - new_aes = new_aes + 1 + aes_value = m.group(1) + else: + aes_value = "0" + finfo_aes[fi_info[0]] = aes_value + aes_cache[fi_info[0]] = aes_value + wib_db.update_aes_data(conn, fi_info[0], aes_value) + new_aes = new_aes + 1 except Exception: logger.warning(f"No EXIF in PNG/JPG or txt file for {fi_info[0]}") - except SyntaxError: - logger.warning(f"Non-PNG/JPG file in directory when doing EXIF check: {fi_info[0]}") + # Saved with defaults to not scan it again next time + finfo_exif[fi_info[0]] = "0" + exif_cache[fi_info[0]] = "0" + allExif = "0" + wib_db.update_exif_data(conn, fi_info[0], allExif) + new_exif = new_exif + 1 + aes_value = "0" + finfo_aes[fi_info[0]] = aes_value + aes_cache[fi_info[0]] = aes_value + wib_db.update_aes_data(conn, fi_info[0], aes_value) + new_aes = new_aes + 1 + + wib_db.transaction_end(conn, cursor) - with open(exif_cache_file, 'w') as file: - json.dump(exif_cache, file, indent=0) - - with open(aes_cache_file, 'w') as file: - json.dump(aes_cache, file, indent=0) + if yappi_do: + yappi.stop() + pd.set_option('display.float_format', lambda x: '%.6f' % x) + yappi_stats = yappi.get_func_stats().strip_dirs() + data = [(s.name, s.ncall, s.tsub, s.ttot, s.ttot/s.ncall) for s in yappi_stats] + df = pd.DataFrame(data, columns=['name', 'ncall', 'tsub', 'ttot', 'tavg']) + print(df.to_string(index=False)) + yappi.get_thread_stats().print_all() cache_exif_end = time.time() logger.debug(f"cache_exif: {new_exif}/{len(fileinfos)} cache_aes: {new_aes}/{len(fileinfos)} {round(cache_exif_end - cache_exif_start, 1)} seconds") @@ -320,7 +313,7 @@ def natural_keys(text): return [ atof(c) for c in re.split(r'[+-]?([0-9]+(?:[.][0-9]*)?|[.][0-9]+)', text) ] -def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword): +def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search): global current_depth current_depth = 0 fileinfos = traverse_all_files(dir_name, [], tabname_box, img_path_depth) @@ -333,7 +326,26 @@ def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path fileinfos = [x for x in fileinfos if keyword.lower() in x[0].lower()] filenames = [finfo[0] for finfo in fileinfos] if len(exif_keyword) != 0: - fileinfos = [x for x in fileinfos if exif_keyword.lower() in finfo_exif[x[0]].lower()] + if negative_prompt_search == "Yes": + fileinfos = [x for x in fileinfos if exif_keyword.lower() in finfo_exif[x[0]].lower()] + else: + result = [] + for file_info in fileinfos: + file_name = file_info[0] + file_exif = finfo_exif[file_name].lower() + start_index = file_exif.find("negative prompt: ") + end_index = file_exif.find("\n", start_index) + if negative_prompt_search == "Only": + sub_string = file_exif[start_index:end_index].split("negative prompt: ")[-1].strip() + if exif_keyword.lower() in sub_string: + result.append(file_info) + else: + sub_string = file_exif[start_index:end_index].strip() + file_exif = file_exif.replace(sub_string, "") + + if exif_keyword.lower() in file_exif: + result.append(file_info) + fileinfos = result filenames = [finfo[0] for finfo in fileinfos] if len(aes_filter) != 0: fileinfos = [x for x in fileinfos if finfo_aes[x[0]] >= aes_filter] @@ -393,16 +405,13 @@ def get_all_images(dir_name, sort_by, sort_order, keyword, tabname_box, img_path filenames = [finfo for finfo in fileinfos] return filenames -def get_image_page(img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword): +def get_image_page(img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search): + if img_path == "": + return [], page_index, [], "", "", "", 0, "" + img_path, _ = pure_path(img_path) - #if not cmd_opts.administrator: - # head = os.path.abspath(".") - # abs_path = os.path.abspath(img_path) - # if len(abs_path) < len(head) or abs_path[:len(head)] != head: - # warning = warning_permission.format(img_path) - # return None, 0, None, "", "", "", None, None, warning if page_index == 1 or page_index == 0 or len(filenames) == 0: - filenames = get_all_images(img_path, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword) + filenames = get_all_images(img_path, sort_by, sort_order, keyword, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search) page_index = int(page_index) length = len(filenames) max_page_index = length // num_of_imgs_per_page + 1 @@ -444,14 +453,6 @@ def change_dir(img_dir, path_recorder, load_switch, img_path_history, img_path_d return warning, gr.update(visible=False), img_path_history, path_recorder, load_switch, img_path, img_path_depth else: img_dir, img_path_depth = pure_path(img_dir) - #try: - # if not cmd_opts.administrator: - # head = os.path.abspath(".") - # abs_path = os.path.abspath(img_dir) - # if len(abs_path) < len(head) or abs_path[:len(head)] != head: - # warning = warning_permission.format(img_dir) - #except: - # pass if warning is None: try: if os.path.exists(img_dir): @@ -472,21 +473,21 @@ def update_move_text(unused): return f'{"Move" if not opts.images_copy_image else "Copy"} to favorites' def get_ranking(filename): - ranking_value = "None" - if os.path.isfile(ranking_file): - with open(ranking_file, 'r') as file: - data = json.load(file) - if filename in data: - ranking_value = data[filename] - + ranking_value = wib_db.select_ranking(filename) return ranking_value def create_tab(tabname): + global init, exif_cache, aes_cache custom_dir = False path_recorder = {} path_recorder_formatted = [] - + if init: + wib_db.check() + exif_cache = wib_db.load_exif_data(exif_cache) + aes_cache = wib_db.load_aes_data(aes_cache) + init = False + if tabname == "txt2img": dir_name = opts.outdir_txt2img_samples elif tabname == "img2img": @@ -545,7 +546,7 @@ def create_tab(tabname): next_page = gr.Button('Next Page') end_page = gr.Button('End Page') with gr.Column(scale=10): - ranking = gr.Radio(value="None", choices=["1", "2", "3", "4", "5", "None"], label="ranking", elem_id=f"{tabname}_images_ranking", interactive=True) + ranking = gr.Radio(value="None", choices=["1", "2", "3", "4", "5", "None"], label="ranking", interactive=True, visible=False) auto_next = gr.Checkbox(label="Next Image After Ranking (To be implemented)", interactive=False, visible=False) history_gallery = gr.Gallery(show_label=False, elem_id=tabname + "_images_history_gallery").style(grid=opts.images_history_page_columns) with gr.Row() as delete_panel: @@ -556,12 +557,13 @@ def create_tab(tabname): with gr.Column(scale=1): with gr.Row(scale=0.5): - sort_by = gr.Dropdown(value="date", choices=["path name", "date", "aesthetic_score", "random", "cfg scale", "steps", "seed", "sampler", "size", "model", "model hash", "ranking"], label="sort by") - sort_order = ToolButton(value=down_symbol) + sort_by = gr.Dropdown(value="date", choices=["path name", "date", "aesthetic_score", "random", "cfg scale", "steps", "seed", "sampler", "size", "model", "model hash", "ranking"], label="sort by") + sort_order = ToolButton(value=down_symbol) with gr.Row(): keyword = gr.Textbox(value="", label="filename keyword") - exif_keyword = gr.Textbox(value="", label="exif keyword") - + with gr.Row(): + exif_keyword = gr.Textbox(value="", label="exif keyword") + negative_prompt_search = gr.Radio(value="No", choices=["No", "Yes", "Only"], label="Include negative prompt in exif search", interactive=True) with gr.Column(): ranking_filter = gr.Radio(value="All", choices=["All", "1", "2", "3", "4", "5", "None"], label="ranking filter", interactive=True) with gr.Row(): @@ -629,11 +631,11 @@ def create_tab(tabname): turn_page_switch.change( fn=get_image_page, - inputs=[img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword], + inputs=[img_path, page_index, filenames, keyword, sort_by, sort_order, tabname_box, img_path_depth, ranking_filter, aes_filter, exif_keyword, negative_prompt_search], outputs=[filenames, page_index, history_gallery, img_file_name, img_file_time, img_file_info, visible_img_num, warning_box] ) turn_page_switch.change(fn=None, inputs=[tabname_box], outputs=None, _js="images_history_turnpage") - turn_page_switch.change(fn=lambda:(gr.update(visible=False), gr.update(visible=False)), inputs=None, outputs=[delete_panel, button_panel]) + turn_page_switch.change(fn=lambda:(gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)), inputs=None, outputs=[delete_panel, button_panel, ranking]) sort_order.click( fn=sort_order_flip, @@ -665,7 +667,7 @@ def create_tab(tabname): # other functions set_index.click(show_image_info, _js="images_history_get_current_img", inputs=[tabname_box, image_index, page_index, filenames], outputs=[img_file_name, img_file_time, image_index, hidden]) - set_index.click(fn=lambda:(gr.update(visible=True), gr.update(visible=True)), inputs=None, outputs=[delete_panel, button_panel]) + set_index.click(fn=lambda:(gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)), inputs=None, outputs=[delete_panel, button_panel, ranking]) img_file_name.change(fn=lambda : "", inputs=None, outputs=[collected_warning]) img_file_name.change(get_ranking, inputs=img_file_name, outputs=ranking) @@ -673,10 +675,9 @@ def create_tab(tabname): hidden.change(fn=run_pnginfo, inputs=[hidden, img_path, img_file_name], outputs=[info1, img_file_info, info2]) #ranking - ranking.change(create_ranked_file, inputs=[img_file_name, ranking]) + ranking.change(wib_db.update_ranking, inputs=[img_file_name, ranking]) #ranking.change(show_next_image_info, _js="images_history_get_current_img", inputs=[tabname_box, image_index, page_index, auto_next], outputs=[img_file_name, img_file_time, image_index, hidden]) - - + try: modules.generation_parameters_copypaste.bind_buttons(send_to_buttons, hidden, img_file_info) except: @@ -742,7 +743,3 @@ def on_ui_settings(): script_callbacks.on_ui_settings(on_ui_settings) script_callbacks.on_ui_tabs(on_ui_tabs) - -#TODO: -#move by arrow key -#generate info in txt diff --git a/scripts/wib/wib_db.py b/scripts/wib/wib_db.py new file mode 100644 index 0000000..3ad97c3 --- /dev/null +++ b/scripts/wib/wib_db.py @@ -0,0 +1,340 @@ +import json +import os +import sqlite3 +from modules import scripts + +path_recorder_file = os.path.join(scripts.basedir(), "path_recorder.txt") +aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json") +exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json") +ranking_file = os.path.join(scripts.basedir(), "ranking.json") +archive = os.path.join(scripts.basedir(), "archive") +db_file = os.path.join(scripts.basedir(), "wib.sqlite3") +np = "Negative prompt: " +st = "Steps: " + +def create_db(cursor): + cursor.execute(''' + CREATE TABLE IF NOT EXISTS db_data ( + key TEXT PRIMARY KEY, + value TEXT + ) + ''') + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS path_recorder ( + path TEXT PRIMARY KEY, + depth INT, + path_display TEXT, + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + + cursor.execute(''' + CREATE TRIGGER path_recorder_tr + AFTER UPDATE ON path_recorder + BEGIN + UPDATE path_recorder SET updated = CURRENT_TIMESTAMP WHERE path = OLD.path; + END; + ''') + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS exif_data ( + file TEXT, + key TEXT, + value TEXT, + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (file, key) + ) + ''') + + cursor.execute(''' + CREATE INDEX IF NOT EXISTS exif_data_key ON exif_data (key) + ''') + + cursor.execute(''' + CREATE TRIGGER exif_data_tr + AFTER UPDATE ON exif_data + BEGIN + UPDATE exif_data SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file AND key = OLD.key; + END; + ''') + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS ranking ( + file TEXT PRIMARY KEY, + ranking TEXT, + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + + cursor.execute(''' + CREATE TRIGGER ranking_tr + AFTER UPDATE ON ranking + BEGIN + UPDATE ranking SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file; + END; + ''') + + return + +def migrate_path_recorder(cursor): + if os.path.exists(path_recorder_file): + try: + with open(path_recorder_file) as f: + # json-version + path_recorder = json.load(f) + for path, values in path_recorder.items(): + depth = values["depth"] + path_display = values["path_display"] + cursor.execute(''' + INSERT INTO path_recorder (path, depth, path_display) + VALUES (?, ?, ?) + ''', (path, depth, path_display)) + except json.JSONDecodeError: + with open(path_recorder_file) as f: + # old txt-version + path = f.readline().rstrip("\n") + while len(path) > 0: + cursor.execute(''' + INSERT INTO path_recorder (path, depth, path_display) + VALUES (?, ?, ?) + ''', (path, 0, f"{path} [0]")) + path = f.readline().rstrip("\n") + + return + +def update_exif_data(cursor, file, info): + prompt = "0" + negative_prompt = "0" + key_values = "0: 0" + if info != "0": + info_list = info.split("\n") + prompt = "" + negative_prompt = "" + key_values = "" + for info_item in info_list: + if info_item.startswith(st): + key_values = info_item + elif info_item.startswith(np): + negative_prompt = info_item.replace(np, "") + else: + prompt = info_item + if key_values != "": + key_value_pairs = [] + key_value = "" + quote_open = False + for char in key_values + ",": + key_value += char + if char == '"': + quote_open = not quote_open + if char == "," and not quote_open: + try: + k, v = key_value.strip(" ,").split(": ") + except ValueError: + k = key_value.strip(" ,").split(": ")[0] + v = "" + key_value_pairs.append((k, v)) + key_value = "" + + try: + cursor.execute(''' + INSERT INTO exif_data (file, key, value) + VALUES (?, ?, ?) + ''', (file, "prompt", prompt)) + except sqlite3.IntegrityError: + # Duplicate, delete all "file" entries and try again + cursor.execute(''' + DELETE FROM exif_data + WHERE file = ? + ''', (file,)) + + cursor.execute(''' + INSERT INTO exif_data (file, key, value) + VALUES (?, ?, ?) + ''', (file, "prompt", prompt)) + + cursor.execute(''' + INSERT INTO exif_data (file, key, value) + VALUES (?, ?, ?) + ''', (file, "negative_prompt", negative_prompt)) + + for (key, value) in key_value_pairs: + try: + cursor.execute(''' + INSERT INTO exif_data (file, key, value) + VALUES (?, ?, ?) + ''', (file, key, value)) + except sqlite3.IntegrityError: + pass + + return + +def migrate_exif_data(cursor): + if os.path.exists(exif_cache_file): + with open(exif_cache_file, 'r') as file: + #with open("c:\\tools\\ai\\stable-diffusion-webui\\extensions\\stable-diffusion-webui-images-browser\\test.json ", 'r') as file: + exif_cache = json.load(file) + + for file, info in exif_cache.items(): + update_exif_data(cursor, file, info) + + return + +def migrate_ranking(cursor): + if os.path.exists(ranking_file): + with open(ranking_file, 'r') as file: + ranking = json.load(file) + for file, info in ranking.items(): + if info != "None": + cursor.execute(''' + INSERT INTO ranking (file, ranking) + VALUES (?, ?) + ''', (file, info)) + + return + +def update_db_data(cursor, key, value): + cursor.execute(''' + INSERT OR REPLACE + INTO db_data (key, value) + VALUES (?, ?) + ''', (key, value)) + + return + +def check(): + if not os.path.exists(db_file): + conn, cursor = transaction_begin() + create_db(cursor) + update_db_data(cursor, "version", "1") + migrate_path_recorder(cursor) + migrate_exif_data(cursor) + migrate_ranking(cursor) + transaction_end(conn, cursor) + + return + +def load_path_recorder(): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + SELECT path, depth, path_display + FROM path_recorder + ''') + path_recorder = {path: {"depth": depth, "path_display": path_display} for path, depth, path_display in cursor.fetchall()} + + return path_recorder + +def select_ranking(filename): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + SELECT ranking + FROM ranking + WHERE file = ? + ''', (filename,)) + ranking_value = cursor.fetchone() + + if ranking_value is None: + ranking_value = ["0"] + return ranking_value[0] + +def update_ranking(file, ranking): + if ranking != "0": + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT OR REPLACE + INTO ranking (file, ranking) + VALUES (?, ?) + ''', (file, ranking)) + + return + +def update_path_recorder(path, depth, path_display): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT OR REPLACE + INTO path_recorder (path, depth, path_display) + VALUES (?, ?, ?) + ''', (path, depth, path_display)) + + return + +def delete_path_recorder(path): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + DELETE FROM path_recorder + WHERE path = ? + ''', (path,)) + + return + +def transaction_begin(): + conn = sqlite3.connect(db_file) + conn.isolation_level = None + cursor = conn.cursor() + cursor.execute("BEGIN") + return conn, cursor + +def transaction_end(conn, cursor): + cursor.execute("COMMIT") + conn.close() + return + +def update_aes_data(cursor, file, value): + key = "aesthetic_score" + + cursor.execute(''' + INSERT OR REPLACE + INTO exif_data (file, key, value) + VALUES (?, ?, ?) + ''', (file, key, value)) + + return + +def load_exif_data(exif_cache): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + SELECT file, group_concat( + case when key = 'prompt' or key = 'negative_prompt' then key || ': ' || value || '\n' + else key || ': ' || value + end, ', ') AS string + FROM ( + SELECT * + FROM exif_data + ORDER BY + CASE WHEN key = 'prompt' THEN 0 + WHEN key = 'negative_prompt' THEN 1 + ELSE 2 END, + key + ) + GROUP BY file + ''') + + rows = cursor.fetchall() + for row in rows: + exif_cache[row[0]] = row[1] + + return exif_cache + +def load_aes_data(aes_cache): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute(''' + SELECT file, value + FROM exif_data + WHERE key in ("aesthetic_score", "Score") + ''') + + rows = cursor.fetchall() + for row in rows: + aes_cache[row[0]] = row[1] + + return aes_cache