419 lines
13 KiB
Python
419 lines
13 KiB
Python
from sqlite3 import Connection, connect
|
|
from typing import Dict, List, Optional
|
|
from scripts.tool import (
|
|
cwd,
|
|
get_modified_date,
|
|
human_readable_size,
|
|
tags_translate,
|
|
is_dev,
|
|
)
|
|
from contextlib import closing
|
|
import os
|
|
import threading
|
|
|
|
|
|
class DataBase:
|
|
local = threading.local()
|
|
|
|
_initing = False
|
|
|
|
num = 0
|
|
|
|
@classmethod
|
|
def get_conn(clz) -> Connection:
|
|
# for : sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread
|
|
if hasattr(clz.local, "conn"):
|
|
return clz.local.conn
|
|
else:
|
|
conn = clz.init()
|
|
clz.local.conn = conn
|
|
return conn
|
|
|
|
@classmethod
|
|
def init(clz):
|
|
# 创建连接并打开数据库
|
|
conn = connect(os.path.join(cwd, "iib.db"))
|
|
Floder.create_table(conn)
|
|
ImageTag.create_table(conn)
|
|
Tag.create_table(conn)
|
|
Image.create_table(conn)
|
|
clz.num += 1
|
|
if is_dev:
|
|
print(f"当前连接数{clz.num}")
|
|
return conn
|
|
|
|
|
|
class Image:
|
|
def __init__(self, path, exif=None, size=0, date="", id=None):
|
|
self.path = path
|
|
self.exif = exif
|
|
self.id = id
|
|
self.size = size
|
|
self.date = date
|
|
|
|
def to_file_info(self):
|
|
return {
|
|
"type": "file",
|
|
"id": self.id,
|
|
"date": self.date,
|
|
"created_date": self.date,
|
|
"size": human_readable_size(self.size),
|
|
"bytes": self.size,
|
|
"name": os.path.basename(self.path),
|
|
"fullpath": self.path,
|
|
}
|
|
|
|
def save(self, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"INSERT OR REPLACE INTO image (path, exif, size, date) VALUES (?, ?, ?, ?)",
|
|
(self.path, self.exif, self.size, self.date),
|
|
)
|
|
self.id = cur.lastrowid
|
|
|
|
@classmethod
|
|
def get(cls, conn: Connection, id_or_path):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"SELECT * FROM image WHERE id = ? OR path = ?", (id_or_path, id_or_path)
|
|
)
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return None
|
|
else:
|
|
return cls.from_row(row)
|
|
|
|
@classmethod
|
|
def get_by_ids(cls, conn: Connection, ids: List[int]) -> List["Image"]:
|
|
if not ids:
|
|
return []
|
|
|
|
query = """
|
|
SELECT * FROM image
|
|
WHERE id IN ({})
|
|
""".format(
|
|
",".join("?" * len(ids))
|
|
)
|
|
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(query, ids)
|
|
rows = cur.fetchall()
|
|
|
|
images = []
|
|
for row in rows:
|
|
images.append(cls.from_row(row))
|
|
return images
|
|
|
|
@classmethod
|
|
def create_table(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"""CREATE TABLE IF NOT EXISTS image (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path TEXT UNIQUE,
|
|
exif TEXT,
|
|
size INTEGER,
|
|
date TEXT
|
|
)"""
|
|
)
|
|
cur.execute("CREATE INDEX IF NOT EXISTS image_idx_path ON image(path)")
|
|
|
|
@classmethod
|
|
def count(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT COUNT(*) FROM image")
|
|
count = cur.fetchone()[0]
|
|
return count
|
|
|
|
@classmethod
|
|
def from_row(cls, row: tuple):
|
|
image = cls(path=row[1], exif=row[2], size=row[3], date=row[4])
|
|
image.id = row[0]
|
|
return image
|
|
|
|
@classmethod
|
|
def remove(cls, conn: Connection, image_id: int) -> None:
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("DELETE FROM image WHERE id = ?", (image_id,))
|
|
conn.commit()
|
|
|
|
|
|
class Tag:
|
|
def __init__(self, name: str, score: int, type: str, count=0):
|
|
self.name = name
|
|
self.score = score
|
|
self.type = type
|
|
self.count = count
|
|
self.id = None
|
|
self.display_name = tags_translate.get(name)
|
|
|
|
def save(self, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"INSERT OR REPLACE INTO tag (id, name, score, type, count) VALUES (?, ?, ?, ?, ?)",
|
|
(self.id, self.name, self.score, self.type, self.count),
|
|
)
|
|
self.id = cur.lastrowid
|
|
|
|
@classmethod
|
|
def remove(cls, conn, tag_id):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("DELETE FROM tag WHERE id = ?", (tag_id,))
|
|
conn.commit()
|
|
|
|
@classmethod
|
|
def get(cls, conn: Connection, id):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM tag WHERE id = ?", (id,))
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return None
|
|
else:
|
|
return cls.from_row(row)
|
|
|
|
@classmethod
|
|
def get_all_custom_tag(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM tag where type = 'custom'")
|
|
rows = cur.fetchall()
|
|
tags: list[Tag] = []
|
|
for row in rows:
|
|
tags.append(cls.from_row(row))
|
|
return tags
|
|
|
|
@classmethod
|
|
def get_all(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM tag")
|
|
rows = cur.fetchall()
|
|
tags: list[Tag] = []
|
|
for row in rows:
|
|
tags.append(cls.from_row(row))
|
|
return tags
|
|
|
|
@classmethod
|
|
def get_or_create(cls, conn: Connection, name: str, type: str):
|
|
assert name and type
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"SELECT tag.* FROM tag WHERE name = ? and type = ?", (name, type)
|
|
)
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
tag = cls(name=name, score=0, type=type)
|
|
tag.save(conn)
|
|
return tag
|
|
else:
|
|
return cls.from_row(row)
|
|
|
|
@classmethod
|
|
def from_row(cls, row: tuple):
|
|
tag = cls(name=row[1], score=row[2], type=row[3], count=row[4])
|
|
tag.id = row[0]
|
|
return tag
|
|
|
|
@classmethod
|
|
def create_table(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"""CREATE TABLE IF NOT EXISTS tag (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT,
|
|
score INTEGER,
|
|
type TEXT,
|
|
count INTEGER,
|
|
UNIQUE(name, type) ON CONFLICT REPLACE
|
|
);
|
|
"""
|
|
)
|
|
cur.execute("CREATE INDEX IF NOT EXISTS tag_idx_name ON tag(name)")
|
|
cur.execute(
|
|
"""INSERT OR IGNORE INTO tag(name, score, type, count)
|
|
VALUES ("like", 0, "custom", 0);
|
|
"""
|
|
)
|
|
|
|
|
|
class ImageTag:
|
|
def __init__(self, image_id: int, tag_id: int):
|
|
assert tag_id and image_id
|
|
self.image_id = image_id
|
|
self.tag_id = tag_id
|
|
|
|
def save(self, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"INSERT INTO image_tag (image_id, tag_id) VALUES (?, ?)",
|
|
(self.image_id, self.tag_id),
|
|
)
|
|
|
|
@classmethod
|
|
def get_tags_for_image(
|
|
cls,
|
|
conn: Connection,
|
|
image_id: int,
|
|
tag_id: Optional[int] = None,
|
|
type: Optional[str] = None,
|
|
):
|
|
with closing(conn.cursor()) as cur:
|
|
query = "SELECT tag.* FROM tag INNER JOIN image_tag ON tag.id = image_tag.tag_id WHERE image_tag.image_id = ?"
|
|
params = [image_id]
|
|
if tag_id:
|
|
query += " AND image_tag.tag_id = ?"
|
|
params.append(tag_id)
|
|
if type:
|
|
query += " AND tag.type = ?"
|
|
params.append(type)
|
|
cur.execute(query, tuple(params))
|
|
rows = cur.fetchall()
|
|
return [Tag.from_row(x) for x in rows]
|
|
|
|
@classmethod
|
|
def get_images_for_tag(cls, conn: Connection, tag_id):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"SELECT image.* FROM image INNER JOIN image_tag ON image.id = image_tag.image_id WHERE image_tag.tag_id = ?",
|
|
(tag_id,),
|
|
)
|
|
rows = cur.fetchall()
|
|
images = []
|
|
for row in rows:
|
|
images.append(Image.from_row(row))
|
|
return images
|
|
|
|
@classmethod
|
|
def create_table(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"""CREATE TABLE IF NOT EXISTS image_tag (
|
|
image_id INTEGER,
|
|
tag_id INTEGER,
|
|
FOREIGN KEY (image_id) REFERENCES image(id),
|
|
FOREIGN KEY (tag_id) REFERENCES tag(id),
|
|
PRIMARY KEY (image_id, tag_id)
|
|
)"""
|
|
)
|
|
|
|
@classmethod
|
|
def get_images_by_tags(
|
|
cls, conn: Connection, tag_dict: Dict[str, List[int]], limit: int = 500
|
|
) -> List[Image]:
|
|
query = """
|
|
SELECT image.id, image.path, image.size,image.date
|
|
FROM image
|
|
INNER JOIN image_tag ON image.id = image_tag.image_id
|
|
"""
|
|
|
|
where_clauses = []
|
|
params = []
|
|
|
|
for operator, tag_ids in tag_dict.items():
|
|
if operator == "and":
|
|
where_clauses.append(
|
|
"tag_id IN ({})".format(",".join("?" * len(tag_ids)))
|
|
)
|
|
params.extend(tag_ids)
|
|
elif operator == "not":
|
|
where_clauses.append(
|
|
"tag_id NOT IN ({})".format(",".join("?" * len(tag_ids)))
|
|
)
|
|
params.extend(tag_ids)
|
|
|
|
if where_clauses:
|
|
query += " WHERE " + " AND ".join(where_clauses)
|
|
|
|
query += " GROUP BY image.id"
|
|
|
|
if "and" in tag_dict:
|
|
query += " HAVING COUNT(DISTINCT tag_id) = ?"
|
|
params.append(len(tag_dict["and"]))
|
|
|
|
query += " ORDER BY date DESC LIMIT ?"
|
|
params.append(limit)
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(query, params)
|
|
rows = cur.fetchall()
|
|
return [
|
|
Image(id=row[0], path=row[1], size=row[2], date=row[3]) for row in rows
|
|
]
|
|
|
|
@classmethod
|
|
def remove(
|
|
cls,
|
|
conn: Connection,
|
|
image_id: Optional[int] = None,
|
|
tag_id: Optional[int] = None,
|
|
) -> None:
|
|
assert image_id or tag_id
|
|
with closing(conn.cursor()) as cur:
|
|
if tag_id and image_id:
|
|
cur.execute(
|
|
"DELETE FROM image_tag WHERE image_id = ? and tag_id = ?",
|
|
(image_id, tag_id),
|
|
)
|
|
elif tag_id:
|
|
cur.execute("DELETE FROM image_tag WHERE tag_id = ?", (tag_id,))
|
|
else:
|
|
cur.execute("DELETE FROM image_tag WHERE image_id = ?", (image_id,))
|
|
conn.commit()
|
|
|
|
|
|
class Floder:
|
|
def __init__(self, id: int, path: str, modified_date: str):
|
|
self.id = id
|
|
self.path = path
|
|
self.modified_date = modified_date
|
|
|
|
@classmethod
|
|
def create_table(cls, conn):
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"""CREATE TABLE IF NOT EXISTS folders
|
|
(id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path TEXT,
|
|
modified_date TEXT)"""
|
|
)
|
|
cur.execute("CREATE INDEX IF NOT EXISTS folders_idx_path ON folders(path)")
|
|
|
|
@classmethod
|
|
def check_need_update(cls, conn: Connection, folder_path: str):
|
|
folder_path = os.path.normpath(folder_path)
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM folders WHERE path=?", (folder_path,))
|
|
folder_record = cur.fetchone() # 如果这个文件夹没有记录,或者修改时间与数据库不同,则需要修改
|
|
return not folder_record or (
|
|
folder_record[2] != get_modified_date(folder_path)
|
|
)
|
|
|
|
@classmethod
|
|
def update_modified_date_or_create(cls, conn: Connection, folder_path: str):
|
|
folder_path = os.path.normpath(folder_path)
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM folders WHERE path = ?", (folder_path,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
cur.execute(
|
|
"UPDATE folders SET modified_date = ? WHERE path = ?",
|
|
(get_modified_date(folder_path), folder_path),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"INSERT INTO folders (path, modified_date) VALUES (?, ?)",
|
|
(folder_path, get_modified_date(folder_path)),
|
|
)
|
|
|
|
@classmethod
|
|
def get_expired_dirs(cls, conn: Connection):
|
|
dirs: List[str] = []
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute("SELECT * FROM folders")
|
|
result_set = cur.fetchall()
|
|
for row in result_set:
|
|
folder_path = row[1]
|
|
if (
|
|
os.path.exists(folder_path)
|
|
and get_modified_date(folder_path) != row[2]
|
|
):
|
|
dirs.append(folder_path)
|
|
return dirs
|