214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
import argparse
|
|
from contextlib import closing
|
|
from datetime import datetime
|
|
import os
|
|
import random
|
|
|
|
from scripts.iib.db.datamodel import DataBase
|
|
|
|
|
|
def get_creation_time_path(file_path: str) -> str:
|
|
"""
|
|
Get the creation time of a file and format it as YYYYMMDD_HHMMSS_<random>.
|
|
Format example: 20260123_011706_8662450
|
|
|
|
On Windows, this uses creation time. On Unix, falls back to modification time.
|
|
|
|
Args:
|
|
file_path (str): Path to the file.
|
|
|
|
Returns:
|
|
str: Formatted creation time string (YYYYMMDD_HHMMSS_<random>).
|
|
"""
|
|
# On Windows, st_ctime is creation time
|
|
# On Unix, st_ctime is metadata change time, so we use st_mtime
|
|
if os.name == 'nt':
|
|
timestamp = os.path.getctime(file_path)
|
|
else:
|
|
timestamp = os.path.getmtime(file_path)
|
|
|
|
dt = datetime.fromtimestamp(timestamp)
|
|
time_str = dt.strftime("%Y%m%d_%H%M%S")
|
|
random_fragment = random.randint(1000000, 9999999)
|
|
return f"{time_str}_{random_fragment}"
|
|
|
|
|
|
def check_database_exists(conn, file_path: str) -> bool:
|
|
"""
|
|
Check if a file path exists in the database.
|
|
|
|
Args:
|
|
conn: Database connection object.
|
|
file_path (str): Path to check.
|
|
|
|
Returns:
|
|
bool: True if the file path exists in the database.
|
|
"""
|
|
normalized_path = os.path.normpath(file_path)
|
|
|
|
with closing(conn.cursor()) as cur:
|
|
cur.execute(
|
|
"SELECT 1 FROM image WHERE path = ? LIMIT 1",
|
|
(normalized_path,)
|
|
)
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def update_database_paths(conn, old_path: str, new_path: str):
|
|
"""
|
|
Update database paths when a file is renamed.
|
|
|
|
Args:
|
|
conn: Database connection object.
|
|
old_path (str): Original file path.
|
|
new_path (str): New file path after rename.
|
|
"""
|
|
normalized_old = os.path.normpath(old_path)
|
|
normalized_new = os.path.normpath(new_path)
|
|
|
|
with closing(conn.cursor()) as cur:
|
|
# Update image table
|
|
cur.execute(
|
|
"UPDATE image SET path = ? WHERE path = ?",
|
|
(normalized_new, normalized_old)
|
|
)
|
|
|
|
|
|
def normalize_single_file(file_path: str, dry_run: bool, conn):
|
|
"""
|
|
Normalize a single file's name based on its creation time (YYYYMMDD_HHMMSS_<random>).
|
|
|
|
Args:
|
|
file_path (str): Path to the file to normalize.
|
|
dry_run (bool): If True, only print without renaming.
|
|
conn: Database connection object. If provided, will update database paths.
|
|
|
|
Returns:
|
|
tuple: (status, message) where status is 'normalized', 'skipped', or 'error'
|
|
"""
|
|
try:
|
|
# Get the directory and file extension
|
|
dir_path = os.path.dirname(file_path)
|
|
filename = os.path.basename(file_path)
|
|
name, ext = os.path.splitext(filename)
|
|
|
|
# Skip if already in the target format (YYYYMMDD_HHMMSS_<random>)
|
|
# Format: 8 digits + _ + 6 digits + _ + 7 digits = 23 characters
|
|
parts = name.split('_')
|
|
if len(parts) == 3 and len(parts[0]) == 8 and parts[0].isdigit() and len(parts[1]) == 6 and parts[1].isdigit() and len(parts[2]) == 7 and parts[2].isdigit():
|
|
return 'skipped', f"Skip (already normalized): {filename}"
|
|
|
|
# Get creation time formatted string
|
|
time_str = get_creation_time_path(file_path)
|
|
new_filename = f"{time_str}{ext}"
|
|
new_path = os.path.join(dir_path, new_filename)
|
|
|
|
# Handle duplicate filenames by appending a counter
|
|
counter = 1
|
|
while os.path.exists(new_path) and new_path != file_path:
|
|
new_filename = f"{time_str}_{counter}{ext}"
|
|
new_path = os.path.join(dir_path, new_filename)
|
|
counter += 1
|
|
|
|
if new_path == file_path:
|
|
return 'skipped', f"Skip (same name): {filename}"
|
|
|
|
if dry_run:
|
|
# Check if file exists in database and include in message
|
|
db_info = ""
|
|
if conn:
|
|
if check_database_exists(conn, file_path):
|
|
db_info = " [in database]"
|
|
else:
|
|
db_info = " [not in database]"
|
|
return 'normalized', f"Would normalize: {filename} -> {new_filename}{db_info}"
|
|
else:
|
|
os.rename(file_path, new_path)
|
|
# Update database if connection is provided
|
|
if conn:
|
|
update_database_paths(conn, file_path, new_path)
|
|
return 'normalized', f"Normalized: {filename} -> {new_filename}"
|
|
|
|
except Exception as e:
|
|
return 'error', f"Error normalizing {file_path}: {e}"
|
|
|
|
|
|
def normalize_filenames(dir_path: str, recursive: bool = False, dry_run: bool = False, db_path: str = None):
|
|
"""
|
|
Normalize all filenames in the specified directory to creation time format (YYYYMMDD_HHMMSS_<random>).
|
|
|
|
Args:
|
|
dir_path (str): Path to the directory containing files to normalize.
|
|
recursive (bool): Whether to recursively process subdirectories. Default is False.
|
|
dry_run (bool): If True, only print what would be normalized without actually renaming. Default is False.
|
|
db_path (str): Path to the IIB database file to update with new paths. Default is None.
|
|
"""
|
|
normalized_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
|
|
# Setup database connection if db_path is provided
|
|
conn = None
|
|
if db_path:
|
|
DataBase.path = os.path.normpath(os.path.join(os.getcwd(), db_path))
|
|
conn = DataBase.get_conn()
|
|
|
|
files_to_process = []
|
|
|
|
if recursive:
|
|
# Walk through all files in directory and subdirectories
|
|
for root, _, files in os.walk(dir_path):
|
|
for filename in files:
|
|
files_to_process.append(os.path.join(root, filename))
|
|
else:
|
|
# Only process files in the specified directory (non-recursive)
|
|
for entry in os.listdir(dir_path):
|
|
full_path = os.path.join(dir_path, entry)
|
|
if os.path.isfile(full_path):
|
|
files_to_process.append(full_path)
|
|
|
|
# Process all files
|
|
for file_path in files_to_process:
|
|
status, message = normalize_single_file(file_path, dry_run, conn)
|
|
print(message)
|
|
if status == 'normalized':
|
|
normalized_count += 1
|
|
elif status == 'skipped':
|
|
skipped_count += 1
|
|
elif status == 'error':
|
|
error_count += 1
|
|
|
|
print(f"\nSummary: {'Dry run - ' if dry_run else ''}Normalized: {normalized_count}, Skipped: {skipped_count}, Errors: {error_count}")
|
|
|
|
if conn and not dry_run:
|
|
conn.commit()
|
|
print(f"Database updated at: {db_path}")
|
|
|
|
|
|
def setup_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Normalize filenames to creation time format (YYYYMMDD_HHMMSS_<random>) and optionally update IIB database paths."
|
|
)
|
|
parser.add_argument(
|
|
"dir_path", type=str, help="Path to the directory containing files to normalize."
|
|
)
|
|
parser.add_argument(
|
|
"-r", "--recursive", action="store_true", help="Recursively process subdirectories."
|
|
)
|
|
parser.add_argument(
|
|
"--force", action="store_true", help="Actually perform the normalization. Without this flag, runs in dry-run mode."
|
|
)
|
|
parser.add_argument(
|
|
"--db_path", type=str, help="Path to the IIB database file to update with new paths. Default value is 'iib.db'.", default="iib.db"
|
|
)
|
|
return parser
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = setup_parser()
|
|
args = parser.parse_args()
|
|
|
|
# Default to dry-run mode unless --force is specified
|
|
dry_run = not args.force
|
|
normalize_filenames(args.dir_path, args.recursive, dry_run, args.db_path)
|