stable-diffusion-webui-imag.../scripts/wib/wib_db.py

342 lines
10 KiB
Python

import json
import os
import sqlite3
from modules import scripts
path_recorder_file = os.path.join(scripts.basedir(), "path_recorder.txt")
aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json")
exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json")
ranking_file = os.path.join(scripts.basedir(), "ranking.json")
archive = os.path.join(scripts.basedir(), "archive")
db_file = os.path.join(scripts.basedir(), "wib.sqlite3")
np = "Negative prompt: "
st = "Steps: "
timeout = 30
def create_db(cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_data (
key TEXT PRIMARY KEY,
value TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS path_recorder (
path TEXT PRIMARY KEY,
depth INT,
path_display TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER path_recorder_tr
AFTER UPDATE ON path_recorder
BEGIN
UPDATE path_recorder SET updated = CURRENT_TIMESTAMP WHERE path = OLD.path;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS exif_data (
file TEXT,
key TEXT,
value TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file, key)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS exif_data_key ON exif_data (key)
''')
cursor.execute('''
CREATE TRIGGER exif_data_tr
AFTER UPDATE ON exif_data
BEGIN
UPDATE exif_data SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file AND key = OLD.key;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS ranking (
file TEXT PRIMARY KEY,
ranking TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER ranking_tr
AFTER UPDATE ON ranking
BEGIN
UPDATE ranking SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file;
END;
''')
return
def migrate_path_recorder(cursor):
if os.path.exists(path_recorder_file):
try:
with open(path_recorder_file) as f:
# json-version
path_recorder = json.load(f)
for path, values in path_recorder.items():
depth = values["depth"]
path_display = values["path_display"]
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
except json.JSONDecodeError:
with open(path_recorder_file) as f:
# old txt-version
path = f.readline().rstrip("\n")
while len(path) > 0:
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, 0, f"{path} [0]"))
path = f.readline().rstrip("\n")
return
def update_exif_data(cursor, file, info):
prompt = "0"
negative_prompt = "0"
key_values = "0: 0"
if info != "0":
info_list = info.split("\n")
prompt = ""
negative_prompt = ""
key_values = ""
for info_item in info_list:
if info_item.startswith(st):
key_values = info_item
elif info_item.startswith(np):
negative_prompt = info_item.replace(np, "")
else:
prompt = info_item
if key_values != "":
key_value_pairs = []
key_value = ""
quote_open = False
for char in key_values + ",":
key_value += char
if char == '"':
quote_open = not quote_open
if char == "," and not quote_open:
try:
k, v = key_value.strip(" ,").split(": ")
except ValueError:
k = key_value.strip(" ,").split(": ")[0]
v = ""
key_value_pairs.append((k, v))
key_value = ""
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
except sqlite3.IntegrityError:
# Duplicate, delete all "file" entries and try again
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (file,))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "negative_prompt", negative_prompt))
for (key, value) in key_value_pairs:
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
except sqlite3.IntegrityError:
pass
return
def migrate_exif_data(cursor):
if os.path.exists(exif_cache_file):
with open(exif_cache_file, 'r') as file:
#with open("c:\\tools\\ai\\stable-diffusion-webui\\extensions\\stable-diffusion-webui-images-browser\\test.json ", 'r') as file:
exif_cache = json.load(file)
for file, info in exif_cache.items():
update_exif_data(cursor, file, info)
return
def migrate_ranking(cursor):
if os.path.exists(ranking_file):
with open(ranking_file, 'r') as file:
ranking = json.load(file)
for file, info in ranking.items():
if info != "None":
cursor.execute('''
INSERT INTO ranking (file, ranking)
VALUES (?, ?)
''', (file, info))
return
def update_db_data(cursor, key, value):
cursor.execute('''
INSERT OR REPLACE
INTO db_data (key, value)
VALUES (?, ?)
''', (key, value))
return
def check():
if not os.path.exists(db_file):
conn, cursor = transaction_begin()
create_db(cursor)
update_db_data(cursor, "version", "1")
migrate_path_recorder(cursor)
migrate_exif_data(cursor)
migrate_ranking(cursor)
transaction_end(conn, cursor)
return
def load_path_recorder():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT path, depth, path_display
FROM path_recorder
''')
path_recorder = {path: {"depth": depth, "path_display": path_display} for path, depth, path_display in cursor.fetchall()}
return path_recorder
def select_ranking(filename):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ranking
FROM ranking
WHERE file = ?
''', (filename,))
ranking_value = cursor.fetchone()
if ranking_value is None:
ranking_value = ["0"]
return ranking_value[0]
def update_ranking(file, ranking):
if ranking != "0":
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO ranking (file, ranking)
VALUES (?, ?)
''', (file, ranking))
return
def update_path_recorder(path, depth, path_display):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
return
def delete_path_recorder(path):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM path_recorder
WHERE path = ?
''', (path,))
return
def transaction_begin():
conn = sqlite3.connect(db_file, timeout=timeout)
conn.isolation_level = None
cursor = conn.cursor()
cursor.execute("BEGIN")
return conn, cursor
def transaction_end(conn, cursor):
cursor.execute("COMMIT")
conn.close()
return
def update_aes_data(cursor, file, value):
key = "aesthetic_score"
cursor.execute('''
INSERT OR REPLACE
INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
return
def load_exif_data(exif_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, group_concat(
case when key = 'prompt' or key = 'negative_prompt' then key || ': ' || value || '\n'
else key || ': ' || value
end, ', ') AS string
FROM (
SELECT *
FROM exif_data
ORDER BY
CASE WHEN key = 'prompt' THEN 0
WHEN key = 'negative_prompt' THEN 1
ELSE 2 END,
key
)
GROUP BY file
''')
rows = cursor.fetchall()
for row in rows:
exif_cache[row[0]] = row[1]
return exif_cache
def load_aes_data(aes_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, value
FROM exif_data
WHERE key in ("aesthetic_score", "Score")
''')
rows = cursor.fetchall()
for row in rows:
aes_cache[row[0]] = row[1]
return aes_cache