stable-diffusion-webui-imag.../scripts/wib/wib_db.py

622 lines
19 KiB
Python

import datetime
import json
import os
import sqlite3
from modules import scripts
version = 4
path_recorder_file = os.path.join(scripts.basedir(), "path_recorder.txt")
aes_cache_file = os.path.join(scripts.basedir(), "aes_scores.json")
exif_cache_file = os.path.join(scripts.basedir(), "exif_data.json")
ranking_file = os.path.join(scripts.basedir(), "ranking.json")
archive = os.path.join(scripts.basedir(), "archive")
db_file = os.path.join(scripts.basedir(), "wib.sqlite3")
np = "Negative prompt: "
st = "Steps: "
timeout = 30
def create_db(cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_data (
key TEXT PRIMARY KEY,
value TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS path_recorder (
path TEXT PRIMARY KEY,
depth INT,
path_display TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TRIGGER path_recorder_tr
AFTER UPDATE ON path_recorder
BEGIN
UPDATE path_recorder SET updated = CURRENT_TIMESTAMP WHERE path = OLD.path;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS exif_data (
file TEXT,
key TEXT,
value TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file, key)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS exif_data_key ON exif_data (key)
''')
cursor.execute('''
CREATE TRIGGER exif_data_tr
AFTER UPDATE ON exif_data
BEGIN
UPDATE exif_data SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file AND key = OLD.key;
END;
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS ranking (
file TEXT PRIMARY KEY,
name TEXT,
ranking TEXT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS ranking_name ON ranking (name)
''')
cursor.execute('''
CREATE TRIGGER ranking_tr
AFTER UPDATE ON ranking
BEGIN
UPDATE ranking SET updated = CURRENT_TIMESTAMP WHERE file = OLD.file;
END;
''')
return
def migrate_path_recorder(cursor):
if os.path.exists(path_recorder_file):
try:
with open(path_recorder_file) as f:
# json-version
path_recorder = json.load(f)
for path, values in path_recorder.items():
path = os.path.realpath(path)
depth = values["depth"]
path_display = f"{path} [{depth}]"
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
except json.JSONDecodeError:
with open(path_recorder_file) as f:
# old txt-version
path = f.readline().rstrip("\n")
while len(path) > 0:
path = os.path.realpath(path)
cursor.execute('''
INSERT INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, 0, f"{path} [0]"))
path = f.readline().rstrip("\n")
return
def update_exif_data(cursor, file, info):
prompt = "0"
negative_prompt = "0"
key_values = "0: 0"
if info != "0":
info_list = info.split("\n")
prompt = ""
negative_prompt = ""
key_values = ""
for info_item in info_list:
if info_item.startswith(st):
key_values = info_item
elif info_item.startswith(np):
negative_prompt = info_item.replace(np, "")
else:
if prompt == "":
prompt = info_item
else:
# multiline prompts
prompt = f"{prompt}\n{info_item}"
if key_values != "":
key_value_pairs = []
key_value = ""
quote_open = False
for char in key_values + ",":
key_value += char
if char == '"':
quote_open = not quote_open
if char == "," and not quote_open:
try:
k, v = key_value.strip(" ,").split(": ")
except ValueError:
k = key_value.strip(" ,").split(": ")[0]
v = ""
key_value_pairs.append((k, v))
key_value = ""
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
except sqlite3.IntegrityError:
# Duplicate, delete all "file" entries and try again
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (file,))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "prompt", prompt))
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, "negative_prompt", negative_prompt))
for (key, value) in key_value_pairs:
try:
cursor.execute('''
INSERT INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
except sqlite3.IntegrityError:
pass
return
def migrate_exif_data(cursor):
if os.path.exists(exif_cache_file):
with open(exif_cache_file, 'r') as file:
exif_cache = json.load(file)
for file, info in exif_cache.items():
file = os.path.realpath(file)
update_exif_data(cursor, file, info)
return
def migrate_ranking(cursor):
if os.path.exists(ranking_file):
with open(ranking_file, 'r') as file:
ranking = json.load(file)
for file, info in ranking.items():
if info != "None":
file = os.path.realpath(file)
name = os.path.basename(file)
cursor.execute('''
INSERT INTO ranking (file, name, ranking)
VALUES (?, ?, ?)
''', (file, name, info))
return
def update_db_data(cursor, key, value):
cursor.execute('''
INSERT OR REPLACE
INTO db_data (key, value)
VALUES (?, ?)
''', (key, value))
return
def get_version():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT value
FROM db_data
WHERE key = 'version'
''',)
db_version = cursor.fetchone()
return db_version
def migrate_path_recorder_dirs(cursor):
cursor.execute('''
SELECT path, path_display
FROM path_recorder
''')
for (path, path_display) in cursor.fetchall():
real_path = os.path.realpath(path)
if path != real_path:
update_from = path
update_to = real_path
try:
cursor.execute('''
UPDATE path_recorder
SET path = ?,
path_display = ? || SUBSTR(path_display, LENGTH(?) + 1)
WHERE path = ?
''', (update_to, update_to, update_from, update_from))
except sqlite3.IntegrityError as e:
# these are double keys, because the same file can be in the db with different path notations
(e_msg,) = e.args
if e_msg.startswith("UNIQUE constraint"):
cursor.execute('''
DELETE FROM path_recorder
WHERE path = ?
''', (update_from,))
else:
raise
return
def migrate_exif_data_dirs(cursor):
cursor.execute('''
SELECT file
FROM exif_data
''')
for (filepath,) in cursor.fetchall():
(path, file) = os.path.split(filepath)
real_path = os.path.realpath(path)
if path != real_path:
update_from = filepath
update_to = os.path.join(real_path, file)
try:
cursor.execute('''
UPDATE exif_data
SET file = ?
WHERE file = ?
''', (update_to, update_from))
except sqlite3.IntegrityError as e:
# these are double keys, because the same file can be in the db with different path notations
(e_msg,) = e.args
if e_msg.startswith("UNIQUE constraint"):
cursor.execute('''
DELETE FROM exif_data
WHERE file = ?
''', (update_from,))
else:
raise
return
def migrate_ranking_dirs(cursor, db_version):
if db_version == "1":
cursor.execute('''
ALTER TABLE ranking
ADD COLUMN name TEXT
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS ranking_name ON ranking (name)
''')
cursor.execute('''
SELECT file, ranking
FROM ranking
''')
for (filepath, ranking) in cursor.fetchall():
if filepath == "" or ranking == "None":
cursor.execute('''
DELETE FROM ranking
WHERE file = ?
''', (filepath,))
else:
(path, file) = os.path.split(filepath)
real_path = os.path.realpath(path)
name = file
update_from = filepath
update_to = os.path.join(real_path, file)
try:
cursor.execute('''
UPDATE ranking
SET file = ?,
name = ?
WHERE file = ?
''', (update_to, name, update_from))
except sqlite3.IntegrityError as e:
# these are double keys, because the same file can be in the db with different path notations
(e_msg,) = e.args
if e_msg.startswith("UNIQUE constraint"):
cursor.execute('''
DELETE FROM ranking
WHERE file = ?
''', (update_from,))
else:
raise
return
def get_c_time(file):
c_time = datetime.datetime.fromtimestamp(os.path.getctime(file)).strftime('%Y-%m-%d %H:%M:%S')
return c_time
def migrate_ranking_c_time(cursor):
cursor.execute('''
SELECT file
FROM ranking
''')
for (file,) in cursor.fetchall():
if os.path.exists(file):
c_time = get_c_time(file)
cursor.execute('''
UPDATE ranking
SET created = ?
WHERE file = ?
''', (c_time, file))
return
def check():
if not os.path.exists(db_file):
conn, cursor = transaction_begin()
print("Image Browser: Creating database")
create_db(cursor)
update_db_data(cursor, "version", version)
migrate_path_recorder(cursor)
migrate_exif_data(cursor)
migrate_ranking(cursor)
migrate_ranking_c_time(cursor)
transaction_end(conn, cursor)
print("Image Browser: Database created")
db_version = get_version()
conn, cursor = transaction_begin()
if db_version[0] <= "2":
# version 1 database had mixed path notations, changed them all to abspath
# version 2 database still had mixed path notations, because of windows short name, changed them all to realpath
print(f"Image Browser: Upgrading database from version {db_version[0]} to version {version}")
migrate_path_recorder_dirs(cursor)
migrate_exif_data_dirs(cursor)
migrate_ranking_dirs(cursor, db_version[0])
if db_version[0] <= "3":
migrate_ranking_c_time(cursor)
update_db_data(cursor, "version", version)
print(f"Image Browser: Database upgraded from version {db_version[0]} to version {version}")
transaction_end(conn, cursor)
return version
def load_path_recorder():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT path, depth, path_display
FROM path_recorder
''')
path_recorder = {path: {"depth": depth, "path_display": path_display} for path, depth, path_display in cursor.fetchall()}
return path_recorder
def select_ranking(file):
conn, cursor = transaction_begin()
cursor.execute('''
SELECT ranking
FROM ranking
WHERE file = ?
''', (file,))
ranking_value = cursor.fetchone()
# if ranking not found search again, without path (moved?)
if ranking_value is None:
c_time = get_c_time(file)
name = os.path.basename(file)
cursor.execute('''
SELECT ranking
FROM ranking
WHERE name = ?
AND created = ?
''', (name, c_time))
ranking_value = cursor.fetchone()
# and insert with current filepath
if ranking_value is not None:
cursor.execute('''
DELETE
FROM ranking
WHERE name = ?
AND created = ?
''', (name, c_time))
(insert_ranking,) = ranking_value
cursor.execute('''
INSERT INTO ranking (file, name, ranking, created)
VALUES (?, ?, ?, ?)
''', (file, name, insert_ranking, c_time))
if ranking_value is None:
return_ranking = "None"
else:
(return_ranking,) = ranking_value
transaction_end(conn, cursor)
return return_ranking
def update_ranking(file, ranking):
name = os.path.basename(file)
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
c_time = get_c_time(file)
if ranking == "None":
cursor.execute('''
DELETE FROM ranking
WHERE name = ?
AND created = ?
''', (name, c_time))
else:
cursor.execute('''
INSERT OR REPLACE
INTO ranking (file, name, ranking, created)
VALUES (?, ?, ?, ?)
''', (file, name, ranking, c_time))
return
def update_path_recorder(path, depth, path_display):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE
INTO path_recorder (path, depth, path_display)
VALUES (?, ?, ?)
''', (path, depth, path_display))
return
def delete_path_recorder(path):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM path_recorder
WHERE path = ?
''', (path,))
return
def update_path_recorder_mult(cursor, update_from, update_to):
cursor.execute('''
UPDATE path_recorder
SET path = ?,
path_display = ? || SUBSTR(path_display, LENGTH(?) + 1)
WHERE path = ?
''', (update_to, update_to, update_from, update_from))
return
def update_exif_data_mult(cursor, update_from, update_to):
update_from = update_from + os.path.sep
update_to = update_to + os.path.sep
cursor.execute('''
UPDATE exif_data
SET file = ? || SUBSTR(file, LENGTH(?) + 1)
WHERE file like ? || '%'
''', (update_to, update_from, update_from))
return
def update_ranking_mult(cursor, update_from, update_to):
update_from = update_from + os.path.sep
update_to = update_to + os.path.sep
cursor.execute('''
UPDATE ranking
SET file = ? || SUBSTR(file, LENGTH(?) + 1)
WHERE file like ? || '%'
''', (update_to, update_from, update_from))
return
def transaction_begin():
conn = sqlite3.connect(db_file, timeout=timeout)
conn.isolation_level = None
cursor = conn.cursor()
cursor.execute("BEGIN")
return conn, cursor
def transaction_end(conn, cursor):
cursor.execute("COMMIT")
conn.close()
return
def update_aes_data(cursor, file, value):
key = "aesthetic_score"
cursor.execute('''
INSERT OR REPLACE
INTO exif_data (file, key, value)
VALUES (?, ?, ?)
''', (file, key, value))
return
def select_prompts(file):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT key, value
FROM exif_data
WHERE file = ?
AND KEY in ('prompt', 'negative_prompt')
''', (file,))
rows = cursor.fetchall()
prompt = ""
neg_prompt = ""
for row in rows:
(key, value) = row
if key == 'prompt':
prompt = value
elif key == 'negative_prompt':
neg_prompt = value
return prompt, neg_prompt
def load_exif_data(exif_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, group_concat(
case when key = 'prompt' or key = 'negative_prompt' then key || ': ' || value || '\n'
else key || ': ' || value
end, ', ') AS string
FROM (
SELECT *
FROM exif_data
ORDER BY
CASE WHEN key = 'prompt' THEN 0
WHEN key = 'negative_prompt' THEN 1
ELSE 2 END,
key
)
GROUP BY file
''')
rows = cursor.fetchall()
for row in rows:
exif_cache[row[0]] = row[1]
return exif_cache
def load_aes_data(aes_cache):
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file, value
FROM exif_data
WHERE key in ("aesthetic_score", "Score")
''')
rows = cursor.fetchall()
for row in rows:
aes_cache[row[0]] = row[1]
return aes_cache
def get_exif_dirs():
with sqlite3.connect(db_file, timeout=timeout) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT file
FROM exif_data
''')
rows = cursor.fetchall()
dirs = {}
for row in rows:
dir = os.path.dirname(row[0])
dirs[dir] = dir
return dirs