Create and opt in to dict type safety from JSON

Opt in via the keyword-only argument `dict_only`
pull/4479/head
awsr 2025-12-10 05:03:18 -08:00
parent 1b44c4e498
commit c22ca8d76e
No known key found for this signature in database
14 changed files with 34 additions and 26 deletions

View File

@ -77,7 +77,7 @@ def civit_update_metadata(raw:bool=False):
model.id = d['modelId']
download_civit_meta(model.fn, model.id)
fn = os.path.splitext(item['filename'])[0] + '.json'
model.meta = readfile(fn, silent=True)
model.meta = readfile(fn, silent=True, dict_only=True)
model.name = model.meta.get('name', model.name)
model.versions = len(model.meta.get('modelVersions', []))
versions = model.meta.get('modelVersions', [])

View File

@ -12,7 +12,7 @@ progress_ok = True
def init_cache():
global cache_data # pylint: disable=global-statement
if cache_data is None:
cache_data = {} if not os.path.isfile(cache_filename) else shared.readfile(cache_filename, lock=True)
cache_data = {} if not os.path.isfile(cache_filename) else shared.readfile(cache_filename, lock=True, dict_only=True)
def dump_cache():
@ -22,7 +22,7 @@ def dump_cache():
def cache(subsection):
global cache_data # pylint: disable=global-statement
if cache_data is None:
cache_data = {} if not os.path.isfile(cache_filename) else shared.readfile(cache_filename, lock=True)
cache_data = {} if not os.path.isfile(cache_filename) else shared.readfile(cache_filename, lock=True, dict_only=True)
s = cache_data.get(subsection, {})
cache_data[subsection] = s
return s

View File

@ -2,6 +2,7 @@ import os
import sys
import time
import json
from typing import overload, Literal
import fasteners
import orjson
from installer import log
@ -10,7 +11,13 @@ from installer import log
locking_available = True # used by file read/write locking
def readfile(filename, silent=False, lock=False):
@overload
def readfile(filename: str, silent: bool = False, lock: bool = False) -> dict | list: ...
@overload
def readfile(filename: str, silent: bool = False, lock: bool = False, *, dict_only: Literal[True]) -> dict: ...
def readfile(filename: str, silent=False, lock=False, *, dict_only=False) -> dict | list:
global locking_available # pylint: disable=global-statement
data = {}
lock_file = None
@ -51,6 +58,11 @@ def readfile(filename, silent=False, lock=False):
os.remove(f"{filename}.lock")
except Exception:
locking_available = False
if isinstance(data, list) and dict_only:
data0 = data[0]
if isinstance(data0, dict):
return data0
return {}
return data

View File

@ -107,9 +107,7 @@ class NetworkOnDisk:
if self.filename is not None:
fn = os.path.splitext(self.filename)[0] + '.json'
if os.path.exists(fn):
data = shared.readfile(fn, silent=True)
if type(data) is list:
data = data[0]
data = shared.readfile(fn, silent=True, dict_only=True)
return data
def get_desc(self):

View File

@ -163,7 +163,7 @@ class Options():
log.debug(f'Settings: fn="{filename}" created')
self.save(filename)
return
self.data = readfile(filename, lock=True)
self.data = readfile(filename, lock=True, dict_only=True)
if self.data.get('quicksettings') is not None and self.data.get('quicksettings_list') is None:
self.data['quicksettings_list'] = [i.strip() for i in self.data.get('quicksettings').split(',')]
unknown_settings = []

View File

@ -329,7 +329,7 @@ def select_checkpoint(op='model', sd_model_checkpoint=None):
def init_metadata():
global sd_metadata # pylint: disable=global-statement
if sd_metadata is None:
sd_metadata = shared.readfile(sd_metadata_file, lock=True) if os.path.isfile(sd_metadata_file) else {}
sd_metadata = shared.readfile(sd_metadata_file, lock=True, dict_only=True) if os.path.isfile(sd_metadata_file) else {}
def extract_thumbnail(filename, data):
@ -349,7 +349,7 @@ def extract_thumbnail(filename, data):
def read_metadata_from_safetensors(filename):
global sd_metadata # pylint: disable=global-statement
if sd_metadata is None:
sd_metadata = shared.readfile(sd_metadata_file, lock=True) if os.path.isfile(sd_metadata_file) else {}
sd_metadata = shared.readfile(sd_metadata_file, lock=True, dict_only=True) if os.path.isfile(sd_metadata_file) else {}
res = sd_metadata.get(filename, None)
if res is not None:
return res

View File

@ -152,7 +152,7 @@ def guess_by_diffusers(fn, current_guess):
return current_guess, None
index = os.path.join(fn, 'model_index.json')
if os.path.exists(index) and os.path.isfile(index):
index = shared.readfile(index, silent=True)
index = shared.readfile(index, silent=True, dict_only=True)
name = index.get('_name_or_path', None)
if name is not None and name in exclude_by_name:
return current_guess, None
@ -171,7 +171,7 @@ def guess_by_diffusers(fn, current_guess):
is_quant = True
break
if folder.endswith('config.json'):
quantization_config = shared.readfile(folder, silent=True).get("quantization_config", None)
quantization_config = shared.readfile(folder, silent=True, dict_only=True).get("quantization_config", None)
if quantization_config is not None:
is_quant = True
break
@ -182,7 +182,7 @@ def guess_by_diffusers(fn, current_guess):
is_quant = True
break
if f.endswith('config.json'):
quantization_config = shared.readfile(f, silent=True).get("quantization_config", None)
quantization_config = shared.readfile(f, silent=True, dict_only=True).get("quantization_config", None)
if quantization_config is not None:
is_quant = True
break

View File

@ -608,9 +608,9 @@ def load_sdnq_module(fn: str, module_name: str, load_method: str):
quantization_config_path = os.path.join(fn, module_name, 'quantization_config.json')
model_config_path = os.path.join(fn, module_name, 'config.json')
if os.path.exists(quantization_config_path):
quantization_config = shared.readfile(quantization_config_path, silent=True)
quantization_config = shared.readfile(quantization_config_path, silent=True, dict_only=True)
elif os.path.exists(model_config_path):
quantization_config = shared.readfile(model_config_path, silent=True).get("quantization_config", None)
quantization_config = shared.readfile(model_config_path, silent=True, dict_only=True).get("quantization_config", None)
if quantization_config is None:
return None, module_name, 0
model_name = os.path.join(fn, module_name)

View File

@ -52,7 +52,7 @@ def load_unet(model, repo_id:str=None):
config_file = os.path.splitext(unet_dict[shared.opts.sd_unet])[0] + '.json'
if os.path.exists(config_file):
config = shared.readfile(config_file)
config = shared.readfile(config_file, dict_only=True)
else:
config = None
config_file = 'default'

View File

@ -440,7 +440,7 @@ class ExtraNetworksPage:
def update_all_previews(self, items):
global preview_map # pylint: disable=global-statement
if preview_map is None:
preview_map = shared.readfile('html/previews.json', silent=True)
preview_map = shared.readfile('html/previews.json', silent=True, dict_only=True)
t0 = time.time()
reference_path = os.path.abspath(os.path.join('models', 'Reference'))
possible_paths = list(set([os.path.dirname(item['filename']) for item in items] + [reference_path]))
@ -520,12 +520,10 @@ class ExtraNetworksPage:
t0 = time.time()
fn = os.path.splitext(path)[0] + '.json'
if not data and os.path.exists(fn):
data = shared.readfile(fn, silent=True)
data = shared.readfile(fn, silent=True, dict_only=True)
fn = os.path.join(path, 'model_index.json')
if not data and os.path.exists(fn):
data = shared.readfile(fn, silent=True)
if type(data) is list:
data = data[0]
data = shared.readfile(fn, silent=True, dict_only=True)
t1 = time.time()
self.info_time += t1-t0
return data
@ -868,7 +866,7 @@ def create_ui(container, button_parent, tabname, skip_indexing = False):
if hasattr(item, 'mtime') and item.mtime is not None:
stat_mtime = item.mtime
desc = item.description
fullinfo = shared.readfile(os.path.splitext(item.filename)[0] + '.json', silent=True)
fullinfo = shared.readfile(os.path.splitext(item.filename)[0] + '.json', silent=True, dict_only=True)
if 'modelVersions' in fullinfo: # sanitize massive objects
fullinfo['modelVersions'] = []
info = fullinfo

View File

@ -40,7 +40,7 @@ class ExtraNetworksPageCheckpoints(ui_extra_networks.ExtraNetworksPage):
shared.log.debug(f'Networks: type="reference" autodownload={shared.opts.sd_checkpoint_autodownload} enable={shared.opts.extra_network_reference_enable}')
return []
count = { 'total': 0, 'ready': 0, 'hidden': 0, 'experimental': 0, 'base': 0 }
shared.reference_models = readfile(os.path.join('html', 'reference.json'))
shared.reference_models = readfile(os.path.join('html', 'reference.json'), dict_only=True)
for k, v in shared.reference_models.items():
count['total'] += 1
url = v['path']

View File

@ -118,7 +118,7 @@ class UiLoadsave:
def read_from_file(self):
from modules.shared import readfile
return readfile(self.filename)
return readfile(self.filename, dict_only=True)
def write_to_file(self, current_ui_settings):
from modules.shared import writefile

View File

@ -23,7 +23,7 @@ class Upscaler:
def __init__(self, create_dirs=True):
global models # pylint: disable=global-statement
if models is None:
models = shared.readfile('html/upscalers.json')
models = shared.readfile('html/upscalers.json', dict_only=True)
self.mod_pad_h = None
self.tile_size = shared.opts.upscaler_tile_size
self.tile_pad = shared.opts.upscaler_tile_overlap

2
wiki

@ -1 +1 @@
Subproject commit 762006f354b7f1a9735bb85698bd32a16121126a
Subproject commit 12af554d26d12ce84d43e35f81da89bdbeac4057