stable-diffusion-webui-imag.../scripts/wib/wib_db.py

537 lines
16 KiB
Python

import json
import os
import sqlite3
from modules import scripts
version = 2
path_recorder_file = os.path.join(scripts.basedir(), "path_recorder.txt")
aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json")
exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json")
ranking_file = os.path.join(scripts.basedir(), "ranking.json")
archive = os.path.join(scripts.basedir(), "archive")
db_file = os.path.join(scripts.basedir(), "wib.sqlite3")
np = "Negative prompt: "
st = "Steps: "
timeout = 30
def create_db(cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_data (
key TEXT PRIMARY KEY,
value TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS path_recorder (
path TEXT PRIMARY KEY,
depth INT,
path_display TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER path_recorder_tr
AFTER UPDATE ON path_recorder
BEGIN
UPDATE path_recorder SET updated = CURRENT_TIMESTAMP WHERE path = OLD.path;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS exif_data (
file TEXT,
key TEXT,
value TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file, key)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS exif_data_key ON exif_data (key)
''')
cursor.execute('''
CREATE TRIGGER exif_data_tr
AFTER UPDATE ON exif_data
BEGIN
UPDATE exif_data SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file AND key = OLD.key;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS ranking (
file TEXT PRIMARY KEY,
name TEXT,
ranking TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS ranking_name ON ranking (name)
''')
cursor.execute('''
CREATE TRIGGER ranking_tr
AFTER UPDATE ON ranking
BEGIN
UPDATE ranking SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file;
END;
''')
return
def migrate_path_recorder(cursor):
if os.path.exists(path_recorder_file):
try:
with open(path_recorder_file) as f:
# json-version
path_recorder = json.load(f)
for path, values in path_recorder.items():
path = os.path.abspath(path)
depth = values["depth"]
path_display = f"{path} [{depth}]"
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
except json.JSONDecodeError:
with open(path_recorder_file) as f:
# old txt-version
path = f.readline().rstrip("\n")
while len(path) > 0:
path = os.path.abspath(path)
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, 0, f"{path} [0]"))
path = f.readline().rstrip("\n")
return
def update_exif_data(cursor, file, info):
prompt = "0"
negative_prompt = "0"
key_values = "0: 0"
if info != "0":
info_list = info.split("\n")
prompt = ""
negative_prompt = ""
key_values = ""
for info_item in info_list:
if info_item.startswith(st):
key_values = info_item
elif info_item.startswith(np):
negative_prompt = info_item.replace(np, "")
else:
if prompt == "":
prompt = info_item
else:
# multiline prompts
prompt = f"{prompt}\n{info_item}"
if key_values != "":
key_value_pairs = []
key_value = ""
quote_open = False
for char in key_values + ",":
key_value += char
if char == '"':
quote_open = not quote_open
if char == "," and not quote_open:
try:
k, v = key_value.strip(" ,").split(": ")
except ValueError:
k = key_value.strip(" ,").split(": ")[0]
v = ""
key_value_pairs.append((k, v))
key_value = ""
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
except sqlite3.IntegrityError:
# Duplicate, delete all "file" entries and try again
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (file,))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "negative_prompt", negative_prompt))
for (key, value) in key_value_pairs:
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
except sqlite3.IntegrityError:
pass
return
def migrate_exif_data(cursor):
if os.path.exists(exif_cache_file):
with open(exif_cache_file, 'r') as file:
exif_cache = json.load(file)
for file, info in exif_cache.items():
file = os.path.abspath(file)
update_exif_data(cursor, file, info)
return
def migrate_ranking(cursor):
if os.path.exists(ranking_file):
with open(ranking_file, 'r') as file:
ranking = json.load(file)
for file, info in ranking.items():
if info != "None":
file = os.path.abspath(file)
name = os.path.basename(file)
cursor.execute('''
INSERT INTO ranking (file, name, ranking)
VALUES (?, ?, ?)
''', (file, name, info))
return
def update_db_data(cursor, key, value):
cursor.execute('''
INSERT OR REPLACE
INTO db_data (key, value)
VALUES (?, ?)
''', (key, value))
return
def get_version():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT value
FROM db_data
WHERE key = 'version'
''',)
db_version = cursor.fetchone()
return db_version
def migrate_path_recorder_dirs(cursor):
cursor.execute('''
SELECT path, path_display
FROM path_recorder
''')
for (path, path_display) in cursor.fetchall():
abs_path = os.path.abspath(path)
if path != abs_path:
update_from = path
update_to = abs_path
cursor.execute('''
UPDATE path_recorder
SET path = ?,
path_display = ? || SUBSTR(path_display, LENGTH(?) + 1)
WHERE path = ?
''', (update_to, update_to, update_from, update_from))
return
def migrate_exif_data_dirs(cursor):
cursor.execute('''
SELECT file
FROM exif_data
''')
for (filepath,) in cursor.fetchall():
(path, file) = os.path.split(filepath)
abs_path = os.path.abspath(path)
if path != abs_path:
update_from = filepath
update_to = os.path.join(abs_path, file)
try:
cursor.execute('''
UPDATE exif_data
SET file = ?
WHERE file = ?
''', (update_to, update_from))
except sqlite3.IntegrityError as e:
# these are double keys, because the same file can be in the db with different path notations
(e_msg,) = e.args
if e_msg.startswith("UNIQUE constraint"):
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (update_from,))
else:
raise
return
def migrate_ranking_dirs(cursor):
cursor.execute('''
ALTER TABLE ranking
ADD COLUMN name TEXT
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS ranking_name ON ranking (name)
''')
cursor.execute('''
SELECT file, ranking
FROM ranking
''')
for (filepath, ranking) in cursor.fetchall():
if filepath == "" or ranking == "None":
cursor.execute('''
DELETE FROM ranking
WHERE file = ?
''', (filepath,))
else:
(path, file) = os.path.split(filepath)
abs_path = os.path.abspath(path)
name = file
update_from = filepath
update_to = os.path.join(abs_path, file)
cursor.execute('''
UPDATE ranking
SET file = ?,
name = ?
WHERE file = ?
''', (update_to, name, update_from))
return
def check():
if not os.path.exists(db_file):
conn, cursor = transaction_begin()
create_db(cursor)
update_db_data(cursor, "version", version)
migrate_path_recorder(cursor)
migrate_exif_data(cursor)
migrate_ranking(cursor)
transaction_end(conn, cursor)
db_version = get_version()
if db_version[0] == "1":
# version 1 database had mixed path notations, this will change them all to abspath
conn, cursor = transaction_begin()
update_db_data(cursor, "version", version)
migrate_path_recorder_dirs(cursor)
migrate_exif_data_dirs(cursor)
migrate_ranking_dirs(cursor)
transaction_end(conn, cursor)
return version
def load_path_recorder():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT path, depth, path_display
FROM path_recorder
''')
path_recorder = {path: {"depth": depth, "path_display": path_display} for path, depth, path_display in cursor.fetchall()}
return path_recorder
def select_ranking(filename):
conn, cursor = transaction_begin()
cursor.execute('''
SELECT ranking
FROM ranking
WHERE file = ?
''', (filename,))
ranking_value = cursor.fetchone()
# if ranking not found search again, without path (moved?)
if ranking_value is None:
name = os.path.basename(filename)
cursor.execute('''
SELECT ranking
FROM ranking
WHERE name = ?
''', (name,))
ranking_value = cursor.fetchone()
# and insert with current filepath
if ranking_value is not None:
(insert_ranking,) = ranking_value
cursor.execute('''
INSERT INTO ranking (file, name, ranking)
VALUES (?, ?, ?)
''', (filename, name, insert_ranking))
if ranking_value is None:
return_ranking = "None"
else:
(return_ranking,) = ranking_value
transaction_end(conn, cursor)
return return_ranking
def update_ranking(file, ranking):
name = os.path.basename(file)
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
if ranking == "None":
cursor.execute('''
DELETE FROM ranking
WHERE name = ?
''', (name,))
else:
cursor.execute('''
INSERT OR REPLACE
INTO ranking (file, name, ranking)
VALUES (?, ?, ?)
''', (file, name, ranking))
return
def update_path_recorder(path, depth, path_display):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
return
def delete_path_recorder(path):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM path_recorder
WHERE path = ?
''', (path,))
return
def update_path_recorder_mult(cursor, update_from, update_to):
cursor.execute('''
UPDATE path_recorder
SET path = ?,
path_display = ? || SUBSTR(path_display, LENGTH(?) + 1)
WHERE path = ?
''', (update_to, update_to, update_from, update_from))
return
def update_exif_data_mult(cursor, update_from, update_to):
update_from = update_from + os.path.sep
update_to = update_to + os.path.sep
cursor.execute('''
UPDATE exif_data
SET file = ? || SUBSTR(file, LENGTH(?) + 1)
WHERE file like ? || '%'
''', (update_to, update_from, update_from))
return
def update_ranking_mult(cursor, update_from, update_to):
update_from = update_from + os.path.sep
update_to = update_to + os.path.sep
cursor.execute('''
UPDATE ranking
SET file = ? || SUBSTR(file, LENGTH(?) + 1)
WHERE file like ? || '%'
''', (update_to, update_from, update_from))
return
def transaction_begin():
conn = sqlite3.connect(db_file, timeout=timeout)
conn.isolation_level = None
cursor = conn.cursor()
cursor.execute("BEGIN")
return conn, cursor
def transaction_end(conn, cursor):
cursor.execute("COMMIT")
conn.close()
return
def update_aes_data(cursor, file, value):
key = "aesthetic_score"
cursor.execute('''
INSERT OR REPLACE
INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
return
def load_exif_data(exif_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, group_concat(
case when key = 'prompt' or key = 'negative_prompt' then key || ': ' || value || '\n'
else key || ': ' || value
end, ', ') AS string
FROM (
SELECT *
FROM exif_data
ORDER BY
CASE WHEN key = 'prompt' THEN 0
WHEN key = 'negative_prompt' THEN 1
ELSE 2 END,
key
)
GROUP BY file
''')
rows = cursor.fetchall()
for row in rows:
exif_cache[row[0]] = row[1]
return exif_cache
def load_aes_data(aes_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, value
FROM exif_data
WHERE key in ("aesthetic_score", "Score")
''')
rows = cursor.fetchall()
for row in rows:
aes_cache[row[0]] = row[1]
return aes_cache
def get_exif_dirs():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file
FROM exif_data
''')
rows = cursor.fetchall()
dirs = {}
for row in rows:
dir = os.path.dirname(row[0])
dirs[dir] = dir
return dirs