import os import json import shutil import errno import html from datetime import datetime, timedelta import gradio as gr from modules import extensions, shared, paths, errors, ui_symbols debug = shared.log.debug if os.environ.get('SD_EXT_DEBUG', None) is not None else lambda *args, **kwargs: None extensions_index = "https://vladmandic.github.io/sd-data/pages/extensions.json" hide_tags = ["localization"] extensions_list = [] sort_ordering = { "default": (True, lambda x: x.get('sort_default', '')), "user extensions": (True, lambda x: x.get('sort_user', '')), "trending": (True, lambda x: x.get('sort_trending', -1)), "update available": (True, lambda x: x.get('sort_update', '')), "updated date": (True, lambda x: x.get('updated', '2000-01-01T00:00')), "created date": (True, lambda x: x.get('created', '2000-01-01T00:00')), "name": (False, lambda x: x.get('name', '').lower()), "enabled": (False, lambda x: x.get('sort_enabled', '').lower()), "size": (True, lambda x: x.get('size', 0)), "stars": (True, lambda x: x.get('stars', 0)), "commits": (True, lambda x: x.get('commits', 0)), "issues": (True, lambda x: x.get('issues', 0)), } def get_installed(ext) -> extensions.Extension: installed: extensions.Extension = [e for e in extensions.extensions if (e.remote or '').startswith(ext['url'].replace('.git', ''))] return installed[0] if len(installed) > 0 else None def list_extensions(): global extensions_list # pylint: disable=global-statement fn = os.path.join(paths.script_path, "html", "extensions.json") extensions_list = shared.readfile(fn, silent=True) or [] if type(extensions_list) != list: shared.log.warning(f'Invalid extensions list: file={fn}') extensions_list = [] if len(extensions_list) == 0: shared.log.info('Extension list is empty: refresh required') found = [] for ext in extensions.extensions: ext.read_info() for ext in extensions_list: installed = get_installed(ext) if installed: found.append(installed) debug(f'Extension installed from index: {ext}') for ext in [e for e in extensions.extensions if e not in found]: # installed but not in index entry = { "name": ext.name or "", "description": ext.description or "", "url": ext.remote or "", "tags": [], "stars": 0, "issues": 0, "commits": 0, "size": 0, "long": ext.git_name or ext.name or "", "added": ext.ctime, "created": ext.ctime, "updated": ext.mtime, } extensions_list.append(entry) debug(f'Extension installed without index: {entry}') def check_access(): assert not shared.cmd_opts.disable_extension_access, "extension access disabled because of command line flags" def apply_changes(disable_list, update_list, disable_all): check_access() shared.log.debug(f'Extensions apply: disable={disable_list} update={update_list}') disabled = json.loads(disable_list) assert type(disabled) == list, f"wrong disable_list data for apply_changes: {disable_list}" update = json.loads(update_list) assert type(update) == list, f"wrong update_list data for apply_changes: {update_list}" update = set(update) for ext in extensions.extensions: if ext.name not in update: continue try: ext.git_fetch() except Exception as e: errors.display(e, f'extensions apply update: {ext.name}') shared.opts.disabled_extensions = disabled shared.opts.disable_all_extensions = disable_all shared.opts.save(shared.config_filename) shared.restart_server(restart=True) def check_updates(_id_task, disable_list, search_text, sort_column): check_access() disabled = json.loads(disable_list) assert type(disabled) == list, f"wrong disable_list data for apply_and_restart: {disable_list}" exts = [ext for ext in extensions.extensions if ext.remote is not None and ext.name not in disabled] shared.log.info(f'Extensions update check: update={len(exts)} disabled={len(disable_list)}') shared.state.job_count = len(exts) for ext in exts: shared.state.textinfo = ext.name try: ext.check_updates() if ext.can_update: ext.git_fetch() ext.read_info() commit_date = ext.commit_date or 1577836800 shared.log.info(f'Extensions updated: {ext.name} {ext.commit_hash[:8]} {datetime.utcfromtimestamp(commit_date)}') else: commit_date = ext.commit_date or 1577836800 shared.log.debug(f'Extensions no update available: {ext.name} {ext.commit_hash[:8]} {datetime.utcfromtimestamp(commit_date)}') except FileNotFoundError as e: if 'FETCH_HEAD' not in str(e): raise except Exception as e: errors.display(e, f'extensions check update: {ext.name}') shared.state.nextjob() return create_html(search_text, sort_column), "Extension update complete | Restart required" def make_commit_link(commit_hash, remote, text=None): if text is None: text = commit_hash[:8] if remote.startswith("https://github.com/"): if remote.endswith(".git"): remote = remote[:-4] href = remote + "/commit/" + commit_hash return f'{text}' else: return text def normalize_git_url(url): if url is None: return "" url = url.replace(".git", "") return url def install_extension_from_url(dirname, url, branch_name, search_text, sort_column): check_access() assert url, 'No URL specified' if dirname is None or dirname == "": *parts, last_part = url.split('/') # pylint: disable=unused-variable last_part = normalize_git_url(last_part) dirname = last_part target_dir = os.path.join(extensions.extensions_dir, dirname) shared.log.info(f'Installing extension: {url} into {target_dir}') assert not os.path.exists(target_dir), f'Extension directory already exists: {target_dir}' normalized_url = normalize_git_url(url) assert len([x for x in extensions.extensions if normalize_git_url(x.remote) == normalized_url]) == 0, 'Extension with this URL is already installed' tmpdir = os.path.join(paths.data_path, "tmp", dirname) if url.endswith('.git'): url = url.replace('.git', '') try: import git shutil.rmtree(tmpdir, True) if not branch_name: # if no branch is specified, use the default branch with git.Repo.clone_from(url, tmpdir, filter=['blob:none']) as repo: repo.remote().fetch() for submodule in repo.submodules: submodule.update() else: with git.Repo.clone_from(url, tmpdir, filter=['blob:none'], branch=branch_name) as repo: repo.remote().fetch() for submodule in repo.submodules: submodule.update() try: os.rename(tmpdir, target_dir) except OSError as err: if err.errno == errno.EXDEV: shutil.move(tmpdir, target_dir) else: raise err from launch import run_extension_installer run_extension_installer(target_dir) extensions.list_extensions() return [create_html(search_text, sort_column), html.escape(f"Extension installed: {target_dir} | Restart required")] except Exception as e: shared.log.error(f'Error installing extension: {url} {e}') finally: shutil.rmtree(tmpdir, True) return [] def install_extension(extension_to_install, search_text, sort_column): shared.log.info(f'Extension install: {extension_to_install}') code, message = install_extension_from_url(None, extension_to_install, None, search_text, sort_column) return code, message def uninstall_extension(extension_path, search_text, sort_column): def errorRemoveReadonly(func, path, exc): import stat excvalue = exc[1] shared.log.debug(f'Exception during cleanup: {func} {path} {excvalue.strerror}') if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES: shared.log.debug(f'Retrying cleanup: {path}') os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) func(path) found = [extension for extension in extensions.extensions if os.path.abspath(extension.path) == os.path.abspath(extension_path)] if len(found) > 0 and os.path.isdir(extension_path): found = found[0] try: shutil.rmtree(found.path, ignore_errors=False, onerror=errorRemoveReadonly) # extensions.extensions = [extension for extension in extensions.extensions if os.path.abspath(found.path) != os.path.abspath(extension_path)] except Exception as e: shared.log.warning(f'Extension uninstall failed: {found.path} {e}') list_extensions() global extensions_list # pylint: disable=global-statement extensions_list = [ext for ext in extensions_list if ext['name'] != found.name] shared.log.info(f'Extension uninstalled: {found.path}') code = create_html(search_text, sort_column) return code, f"Extension uninstalled: {found.path} | Restart required" else: shared.log.warning(f'Extension uninstall cannot find extension: {extension_path}') code = create_html(search_text, sort_column) return code, f"Extension uninstalled failed: {extension_path}" def update_extension(extension_path, search_text, sort_column): exts = [extension for extension in extensions.extensions if os.path.abspath(extension.path) == os.path.abspath(extension_path)] shared.state.job_count = len(exts) for ext in exts: shared.log.debug(f'Extensions update start: {ext.name} {ext.commit_hash} {ext.commit_date}') shared.state.textinfo = ext.name try: ext.check_updates() if ext.can_update: ext.git_fetch() ext.read_info() commit_date = ext.commit_date or 1577836800 shared.log.info(f'Extensions updated: {ext.name} {ext.commit_hash[:8]} {datetime.utcfromtimestamp(commit_date)}') else: commit_date = ext.commit_date or 1577836800 shared.log.info(f'Extensions no update available: {ext.name} {ext.commit_hash[:8]} {datetime.utcfromtimestamp(commit_date)}') except FileNotFoundError as e: if 'FETCH_HEAD' not in str(e): raise except Exception as e: shared.log.error(f'Extensions update failed: {ext.name}') errors.display(e, f'extensions check update: {ext.name}') shared.log.debug(f'Extensions update finish: {ext.name} {ext.commit_hash} {ext.commit_date}') shared.state.nextjob() return create_html(search_text, sort_column), f"Extension updated | {extension_path} | Restart required" def refresh_extensions_list(search_text, sort_column): global extensions_list # pylint: disable=global-statement import urllib.request try: shared.log.debug(f'Updating extensions list: url={extensions_index}') with urllib.request.urlopen(extensions_index, timeout=3.0) as response: text = response.read() extensions_list = json.loads(text) with open(os.path.join(paths.script_path, "html", "extensions.json"), "w", encoding="utf-8") as outfile: json_object = json.dumps(extensions_list, indent=2) outfile.write(json_object) shared.log.info(f'Updated extensions list: items={len(extensions_list)} url={extensions_index}') except Exception as e: shared.log.warning(f'Updated extensions list failed: {extensions_index} {e}') list_extensions() code = create_html(search_text, sort_column) return code, f'Extensions | {len(extensions.extensions)} registered | {len(extensions_list)} available' def search_extensions(search_text, sort_column): code = create_html(search_text, sort_column) return code, f'Search | {search_text} | {sort_column}' def create_html(search_text, sort_column): # shared.log.debug(f'Extensions manager: refresh list search="{search_text}" sort="{sort_column}"') code = """
| Status | Enabled | Extension | Description | Type | Current version | |
|---|---|---|---|---|---|---|
| {status} | {enabled_code} | {html.escape(ext.get("name", "unknown"))} {tags_text} |
{html.escape(ext.get("description", ""))}
Created {html.escape(dt('created'))} | Added {html.escape(dt('added'))} | Pushed {html.escape(dt('pushed'))} | Updated {html.escape(dt('updated'))} {author} | Stars {html.escape(str(ext.get('stars', 0)))} | Size {html.escape(str(ext.get('size', 0)))} | Commits {html.escape(str(ext.get('commits', 0)))} | Issues {html.escape(str(ext.get('issues', 0)))} | Trending {html.escape(str(ext['sort_trending']))} |
{type_code} | {version_code} | {install_code} |