322 lines
9.5 KiB
Python
322 lines
9.5 KiB
Python
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
import zlib
|
|
|
|
import gradio as gr
|
|
|
|
from scripts.mo.environment import env
|
|
from scripts.mo.models import ModelType
|
|
from scripts.mo.utils import get_model_files_in_dir, find_preview_file, link_preview, read_hash_cache, \
|
|
calculate_file_temp_hash, write_hash_cache, calculate_sha256
|
|
|
|
|
|
def _ui_state_report():
|
|
with gr.Column():
|
|
gr.Button('Generate state report')
|
|
|
|
|
|
def _on_local_files_scan_click():
|
|
result = []
|
|
|
|
def search_in_dir(model_type) -> list:
|
|
dir_path = env.get_model_path(model_type)
|
|
local = []
|
|
files = get_model_files_in_dir(dir_path)
|
|
for file in files:
|
|
preview_file = find_preview_file(file)
|
|
rec = {
|
|
'filename': os.path.basename(file),
|
|
'model_type': model_type.value,
|
|
'path': file,
|
|
}
|
|
|
|
if preview_file is not None and preview_file:
|
|
prev = {
|
|
'preview_filename': os.path.basename(preview_file),
|
|
'preview_path': preview_file,
|
|
'preview_link': link_preview(preview_file)
|
|
}
|
|
rec.update(prev)
|
|
|
|
local.append(rec)
|
|
return local
|
|
|
|
result.extend(search_in_dir(ModelType.CHECKPOINT))
|
|
result.extend(search_in_dir(ModelType.VAE))
|
|
result.extend(search_in_dir(ModelType.LORA))
|
|
result.extend(search_in_dir(ModelType.HYPER_NETWORK))
|
|
result.extend(search_in_dir(ModelType.EMBEDDING))
|
|
result.extend(search_in_dir(ModelType.LYCORIS))
|
|
|
|
return gr.JSON.update(value=json.dumps(result))
|
|
|
|
|
|
def _ui_local_files():
|
|
with gr.Column():
|
|
scan_button = gr.Button('Scan Local Model files')
|
|
|
|
local_files_json = gr.JSON(label='Local files')
|
|
|
|
scan_button.click(fn=_on_local_files_scan_click,
|
|
outputs=local_files_json)
|
|
|
|
|
|
def _on_read_hash_click():
|
|
cache = read_hash_cache()
|
|
return [
|
|
gr.JSON.update(value=json.dumps(cache)),
|
|
gr.Button.update(visible=False)
|
|
]
|
|
|
|
|
|
def calculate_crc32(file_path):
|
|
# Initialize the CRC32 checksum
|
|
crc32 = 0
|
|
|
|
try:
|
|
# Open the file in binary mode
|
|
with open(file_path, "rb") as file:
|
|
# Read the file in chunks to conserve memory
|
|
chunk_size = 1024 # You can adjust this according to your needs
|
|
while True:
|
|
data = file.read(chunk_size)
|
|
if not data:
|
|
break
|
|
crc32 = zlib.crc32(data, crc32)
|
|
|
|
except FileNotFoundError:
|
|
print(f"File not found: {file_path}")
|
|
return None
|
|
|
|
# Ensure the CRC32 value is a positive integer
|
|
crc32 = crc32 & 0xFFFFFFFF
|
|
|
|
return hex(crc32)[2:]
|
|
|
|
|
|
def calculate_md5(file_path):
|
|
# Create an instance of the MD5 hash object
|
|
md5_hash = hashlib.md5()
|
|
|
|
try:
|
|
# Open the file in binary mode
|
|
with open(file_path, "rb") as file:
|
|
# Read the file in chunks to conserve memory
|
|
chunk_size = 8192 # You can adjust this according to your needs
|
|
while True:
|
|
data = file.read(chunk_size)
|
|
if not data:
|
|
break
|
|
md5_hash.update(data)
|
|
|
|
except FileNotFoundError:
|
|
print(f"File not found: {file_path}")
|
|
return None
|
|
|
|
# Get the MD5 hash value as a hexadecimal string
|
|
md5_hex = md5_hash.hexdigest()
|
|
|
|
return md5_hex
|
|
|
|
|
|
def calculate_adler32(file_path):
|
|
# Initialize the Adler-32 checksum
|
|
adler32_checksum = zlib.adler32(b'', 0)
|
|
|
|
try:
|
|
# Open the file in binary mode
|
|
with open(file_path, "rb") as file:
|
|
# Read the file in chunks to conserve memory
|
|
chunk_size = 1024 # You can adjust this according to your needs
|
|
while True:
|
|
data = file.read(chunk_size)
|
|
if not data:
|
|
break
|
|
adler32_checksum = zlib.adler32(data, adler32_checksum)
|
|
|
|
# Ensure the Adler-32 value is a positive integer
|
|
adler32_checksum &= 0xFFFFFFFF
|
|
|
|
return hex(adler32_checksum)[2:]
|
|
|
|
except FileNotFoundError:
|
|
print(f"File not found: {file_path}")
|
|
return None
|
|
|
|
|
|
def _on_calculate_hash_click():
|
|
result = []
|
|
|
|
def calc_in_dir(model_type) -> list:
|
|
dir_path = env.get_model_path(model_type)
|
|
local = []
|
|
files = get_model_files_in_dir(dir_path)
|
|
for file in files:
|
|
start_ms = int(time.time() * 1000)
|
|
sha256 = calculate_sha256(file)
|
|
time_spent_sha256 = int(time.time() * 1000) - start_ms
|
|
|
|
start_ms = int(time.time() * 1000)
|
|
crc32 = calculate_crc32(file)
|
|
time_spent_crc32 = int(time.time() * 1000) - start_ms
|
|
|
|
start_ms = int(time.time() * 1000)
|
|
md5 = calculate_md5(file)
|
|
time_spent_md5 = int(time.time() * 1000) - start_ms
|
|
|
|
start_ms = int(time.time() * 1000)
|
|
adler32 = calculate_adler32(file)
|
|
time_spent_adler32 = int(time.time() * 1000) - start_ms
|
|
|
|
rec = {
|
|
'path': file,
|
|
'file_size': os.path.getsize(file),
|
|
'temp_hash': calculate_file_temp_hash(file),
|
|
'sha256': sha256,
|
|
'sha256_time_ms': time_spent_sha256,
|
|
'crc32': crc32,
|
|
'crc32_time_ms': time_spent_crc32,
|
|
'md5': md5,
|
|
'md5_time_ms': time_spent_md5,
|
|
'adler32': adler32,
|
|
'adler32_time_ms': time_spent_adler32
|
|
}
|
|
local.append(rec)
|
|
return local
|
|
|
|
result.extend(calc_in_dir(ModelType.CHECKPOINT))
|
|
result.extend(calc_in_dir(ModelType.VAE))
|
|
result.extend(calc_in_dir(ModelType.LORA))
|
|
result.extend(calc_in_dir(ModelType.HYPER_NETWORK))
|
|
result.extend(calc_in_dir(ModelType.EMBEDDING))
|
|
result.extend(calc_in_dir(ModelType.LYCORIS))
|
|
|
|
return [
|
|
gr.JSON.update(value=json.dumps(result)),
|
|
gr.Button.update(visible=True)
|
|
]
|
|
|
|
|
|
def _on_compare_hash_click():
|
|
result = []
|
|
|
|
cache = read_hash_cache()
|
|
|
|
def find_in_cache(file_path, temp_hash):
|
|
for entry in cache:
|
|
if entry.get('path') == file_path and entry.get('temp_hash') == temp_hash and \
|
|
entry.get('sha256') is not None:
|
|
return entry['sha256']
|
|
|
|
def search_in_dir(model_type) -> list:
|
|
dir_path = env.get_model_path(model_type)
|
|
local = []
|
|
files = get_model_files_in_dir(dir_path)
|
|
for file in files:
|
|
temp_hash = calculate_file_temp_hash(file)
|
|
|
|
rec = {
|
|
'path': file,
|
|
'temp_hash': temp_hash,
|
|
'sha256': find_in_cache(file, temp_hash)
|
|
}
|
|
|
|
local.append(rec)
|
|
return local
|
|
|
|
result.extend(search_in_dir(ModelType.CHECKPOINT))
|
|
result.extend(search_in_dir(ModelType.VAE))
|
|
result.extend(search_in_dir(ModelType.LORA))
|
|
result.extend(search_in_dir(ModelType.HYPER_NETWORK))
|
|
result.extend(search_in_dir(ModelType.EMBEDDING))
|
|
result.extend(search_in_dir(ModelType.LYCORIS))
|
|
|
|
return [
|
|
gr.JSON.update(value=json.dumps(result)),
|
|
gr.Button.update(visible=False)
|
|
]
|
|
|
|
|
|
def _on_hash_cache_save_click(json_data):
|
|
write_hash_cache(json_data)
|
|
|
|
|
|
def _ui_hash_cache():
|
|
with gr.Column():
|
|
read_button = gr.Button('Read hash cache')
|
|
compare_hash_button = gr.Button('Compare hash with cache')
|
|
calculate_button = gr.Button('Calculate hashes')
|
|
save_hash_button = gr.Button('Save hash', visible=False)
|
|
|
|
hash_cache_json = gr.JSON(label='Local files')
|
|
|
|
read_button.click(fn=_on_read_hash_click, outputs=[hash_cache_json, save_hash_button])
|
|
calculate_button.click(fn=_on_calculate_hash_click, outputs=[hash_cache_json, save_hash_button])
|
|
compare_hash_button.click(fn=_on_compare_hash_click, outputs=[hash_cache_json, save_hash_button])
|
|
|
|
save_hash_button.click(fn=_on_hash_cache_save_click, inputs=hash_cache_json)
|
|
|
|
|
|
def _on_remove_duplicates_click():
|
|
records = env.storage.get_all_records()
|
|
counter_set = set()
|
|
duplicates_list = []
|
|
|
|
for record in records:
|
|
key = f'{record.name}-{record.url}'
|
|
if key in counter_set:
|
|
duplicates_list.append(record)
|
|
else:
|
|
counter_set.add(key)
|
|
|
|
for record in duplicates_list:
|
|
env.storage.remove_record(record.id_)
|
|
|
|
return f'{len(duplicates_list)} duplicates has been removed.'
|
|
|
|
|
|
def _on_remove_all_records_click():
|
|
records = env.storage.get_all_records()
|
|
for record in records:
|
|
env.storage.remove_record(record.id_)
|
|
|
|
return "All records has been removed."
|
|
|
|
|
|
def _ui_debug_utils():
|
|
with gr.Row():
|
|
with gr.Column():
|
|
remove_duplicates_button = gr.Button("Remove Records duplicate")
|
|
remove_all_records = gr.Button("Remove all Records")
|
|
with gr.Column():
|
|
debug_html_output = gr.HTML()
|
|
|
|
remove_duplicates_button.click(fn=_on_remove_duplicates_click, outputs=[debug_html_output])
|
|
remove_all_records.click(fn=_on_remove_all_records_click, outputs=[debug_html_output])
|
|
|
|
|
|
def debug_ui_block():
|
|
with gr.Column():
|
|
with gr.Row():
|
|
gr.Markdown('## Debug')
|
|
gr.Markdown('')
|
|
gr.Markdown('')
|
|
gr.Markdown('')
|
|
back_button = gr.Button('Back')
|
|
|
|
with gr.Tab('State report'):
|
|
_ui_state_report()
|
|
|
|
with gr.Tab('Local files'):
|
|
_ui_local_files()
|
|
|
|
with gr.Tab('Hash cache'):
|
|
_ui_hash_cache()
|
|
|
|
with gr.Tab('Utils'):
|
|
_ui_debug_utils()
|
|
|
|
back_button.click(fn=None, _js='navigateBack')
|