import os
import json
import shutil
import errno
import html
import re
from datetime import datetime, timezone, timedelta
import gradio as gr
from modules import extensions, shared, paths, errors, ui_symbols, call_queue
debug = shared.log.debug if os.environ.get('SD_EXT_DEBUG', None) is not None else lambda *args, **kwargs: None
extensions_index = "https://vladmandic.github.io/sd-data/pages/extensions.json"
hide_tags = ["localization"]
exclude_extensions = ['sdnext-modernui', 'sdnext-kanvas']
extensions_list = []
sort_ordering = {
"default": (True, lambda x: x.get('sort_default', '')),
"user extensions": (True, lambda x: x.get('sort_user', '')),
"trending": (True, lambda x: x.get('sort_trending', -1)),
"update available": (True, lambda x: x.get('sort_update', '')),
"updated date": (True, lambda x: x.get('updated', '2000-01-01T00:00')),
"created date": (True, lambda x: x.get('created', '2000-01-01T00:00')),
"name": (False, lambda x: x.get('name', '').lower()),
"enabled": (False, lambda x: x.get('sort_enabled', '').lower()),
"size": (True, lambda x: x.get('size', 0)),
"stars": (True, lambda x: x.get('stars', 0)),
"commits": (True, lambda x: x.get('commits', 0)),
"issues": (True, lambda x: x.get('issues', 0)),
}
extensions_data_file = os.path.join("data", "extensions.json")
re_snake_case = re.compile(r'_(?=[a-zA-z0-9])')
re_camelCase = re.compile(r'(?<=[a-z])([A-Z])')
def get_installed(ext):
installed = [e for e in extensions.extensions if (e.remote or '').startswith(ext['url'].replace('.git', ''))]
return installed[0] if len(installed) > 0 else None
def list_extensions():
global extensions_list # pylint: disable=global-statement
extensions_list = shared.readfile(extensions_data_file, silent=True, as_type="list")
if len(extensions_list) == 0:
shared.log.info("Extension list: No information found. Refresh required.")
found = []
for ext in extensions.extensions:
ext.read_info()
for ext in extensions_list:
installed = get_installed(ext)
if installed:
found.append(installed)
debug(f'Extension installed from index: {ext}')
for ext in [e for e in extensions.extensions if e not in found]: # installed but not in index
entry = {
"name": ext.name or "",
"description": ext.description or "",
"url": ext.remote or "",
"tags": [],
"stars": 0,
"issues": 0,
"commits": 0,
"size": 0,
"long": ext.git_name or ext.name or "",
"added": ext.ctime,
"created": ext.ctime,
"updated": ext.mtime,
}
if ext.name not in exclude_extensions:
extensions_list.append(entry)
debug(f'Extension installed without index: {entry}')
def apply_changes(disable_list, update_list, disable_all):
if shared.cmd_opts.disable_extension_access:
shared.log.error('Extension: apply changes disallowed because public access is enabled and insecure is not specified')
return
shared.log.debug(f'Extensions apply: disable={disable_list} update={update_list}')
disabled = json.loads(disable_list)
assert type(disabled) == list, f"wrong disable_list data for apply_changes: {disable_list}"
update = json.loads(update_list)
assert type(update) == list, f"wrong update_list data for apply_changes: {update_list}"
update = set(update)
for ext in extensions.extensions:
if ext.name not in update:
continue
try:
ext.git_fetch()
except Exception as e:
errors.display(e, f'extensions apply update: {ext.name}')
shared.opts.disabled_extensions = disabled
shared.opts.disable_all_extensions = disable_all
shared.opts.save()
shared.restart_server(restart=True)
def check_updates(_id_task, disable_list, search_text, sort_column):
if shared.cmd_opts.disable_extension_access:
shared.log.error('Extension: apply changes disallowed because public access is enabled and insecure is not specified')
return create_html(search_text, sort_column)
disabled = json.loads(disable_list)
assert type(disabled) == list, f"wrong disable_list data for apply_and_restart: {disable_list}"
exts = [ext for ext in extensions.extensions if ext.remote is not None and ext.name not in disabled]
shared.log.info(f'Extensions update check: update={len(exts)} disabled={len(disable_list)}')
shared.state.job_count = len(exts)
for ext in exts:
shared.state.textinfo = ext.name
try:
ext.check_updates()
if ext.can_update:
ext.git_fetch()
ext.read_info()
commit_date = ext.commit_date or 1577836800
shared.log.info(f'Extensions updated: {ext.name} {ext.commit_hash[:8]} {extensions.format_dt(extensions.ts2utc(commit_date), seconds=True)}')
else:
commit_date = ext.commit_date or 1577836800
shared.log.debug(f'Extensions no update available: {ext.name} {ext.commit_hash[:8]} {extensions.format_dt(extensions.ts2utc(commit_date), seconds=True)}')
except FileNotFoundError as e:
if 'FETCH_HEAD' not in str(e):
raise
except Exception as e:
errors.display(e, f'extensions check update: {ext.name}')
shared.state.nextjob()
return create_html(search_text, sort_column), "Extension update complete | Restart required"
def normalize_git_url(url: str | None) -> str:
return '' if url is None else url.strip().removesuffix('.git')
def install_extension_from_url(dirname, url, branch_name, search_text, sort_column):
if shared.cmd_opts.disable_extension_access:
shared.log.error('Extension: apply changes disallowed because public access is enabled and insecure is not specified')
return ['', '']
url = normalize_git_url(url)
if not url:
shared.log.error('Extension: url is not specified')
return ['', '']
if not dirname:
dirname = url.split('/')[-1]
target_dir = os.path.join(extensions.extensions_dir, dirname)
shared.log.info(f'Installing extension: {url} into {target_dir}')
if os.path.exists(target_dir):
shared.log.error(f'Extension: path="{target_dir}" directory already exists')
return ['', '']
if any(normalize_git_url(x.remote) == url for x in extensions.extensions):
return ['', "Extension with this URL is already installed"]
tmpdir = os.path.join(paths.data_path, "tmp", dirname)
try:
import git
shutil.rmtree(tmpdir, True)
args = {
'url': url,
'to_path': tmpdir,
'allow_unsafe_protocols': True,
'allow_unsafe_options': True,
'filter': ['blob:none'],
}
if branch_name:
args['branch'] = branch_name
ssh = os.environ.get('GIT_SSH_COMMAND', None)
if ssh:
args['env'] = {'GIT_SSH_COMMAND':ssh}
shared.log.debug(f'GIT: {args}')
with git.Repo.clone_from(**args) as repo:
repo.remote().fetch(verbose=True)
for submodule in repo.submodules:
submodule.update()
try:
os.rename(tmpdir, target_dir)
except OSError as err:
if err.errno == errno.EXDEV:
shutil.move(tmpdir, target_dir)
else:
raise err
from launch import run_extension_installer
run_extension_installer(target_dir)
shutil.rmtree(tmpdir, True)
extensions.list_extensions()
return [create_html(search_text, sort_column), html.escape(f"Extension installed: {target_dir} | Restart required")]
except Exception as e:
# errors.display(e, 'GIT')
shutil.rmtree(tmpdir, True)
shared.log.error(f'Error installing extension: {url} {e}')
return ['', str(e).replace('\n', '
')]
def install_extension(extension_to_install, search_text, sort_column):
shared.log.info(f'Extension install: {extension_to_install}')
code, message = install_extension_from_url(None, extension_to_install, None, search_text, sort_column)
return code, message
def uninstall_extension(extension_path, search_text, sort_column):
def errorRemoveReadonly(func, path, exc):
import stat
excvalue = exc[1]
shared.log.debug(f'Exception during cleanup: {func} {path} {excvalue.strerror}')
if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES:
shared.log.debug(f'Retrying cleanup: {path}')
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
func(path)
found = [extension for extension in extensions.extensions if os.path.abspath(extension.path) == os.path.abspath(extension_path)]
if len(found) > 0 and os.path.isdir(extension_path):
found = found[0]
try:
shutil.rmtree(found.path, ignore_errors=False, onerror=errorRemoveReadonly) # pylint: disable=deprecated-argument
# extensions.extensions = [extension for extension in extensions.extensions if os.path.abspath(found.path) != os.path.abspath(extension_path)]
except Exception as e:
shared.log.warning(f'Extension uninstall failed: {found.path} {e}')
list_extensions()
global extensions_list # pylint: disable=global-statement
extensions_list = [ext for ext in extensions_list if ext['name'] != found.name]
shared.log.info(f'Extension uninstalled: {found.path}')
code = create_html(search_text, sort_column)
return code, f"Extension uninstalled: {found.path} | Restart required"
else:
shared.log.warning(f'Extension uninstall cannot find extension: {extension_path}')
code = create_html(search_text, sort_column)
return code, f"Extension uninstalled failed: {extension_path}"
def update_extension(extension_path, search_text, sort_column):
exts = [extension for extension in extensions.extensions if os.path.abspath(extension.path) == os.path.abspath(extension_path)]
shared.state.job_count = len(exts)
for ext in exts:
shared.log.debug(f'Extensions update start: {ext.name} {ext.commit_hash} {ext.commit_date}')
shared.state.textinfo = ext.name
try:
ext.check_updates()
if ext.can_update:
ext.git_fetch()
ext.read_info()
commit_date = ext.commit_date or 1577836800
shared.log.info(f'Extensions updated: {ext.name} {ext.commit_hash[:8]} {extensions.format_dt(extensions.ts2utc(commit_date), seconds=True)}')
else:
commit_date = ext.commit_date or 1577836800
shared.log.info(f'Extensions no update available: {ext.name} {ext.commit_hash[:8]} {extensions.format_dt(extensions.ts2utc(commit_date), seconds=True)}')
except FileNotFoundError as e:
if 'FETCH_HEAD' not in str(e):
raise
except Exception as e:
shared.log.error(f'Extensions update failed: {ext.name}')
errors.display(e, f'extensions check update: {ext.name}')
shared.log.debug(f'Extensions update finish: {ext.name} {ext.commit_hash} {ext.commit_date}')
shared.state.nextjob()
return create_html(search_text, sort_column), f"Extension updated | {extension_path} | Restart required"
def refresh_extensions_list(search_text, sort_column):
global extensions_list # pylint: disable=global-statement
import ssl
import urllib.request
try:
shared.log.debug(f'Updating extensions list: url={extensions_index}')
context = ssl._create_unverified_context() # pylint: disable=protected-access
with urllib.request.urlopen(extensions_index, timeout=3.0, context=context) as response:
text = response.read()
extensions_list = json.loads(text)
with open(extensions_data_file, "w", encoding="utf-8") as outfile:
json_object = json.dumps(extensions_list, indent=2)
outfile.write(json_object)
shared.log.info(f'Updated extensions list: items={len(extensions_list)} url={extensions_index}')
except Exception as e:
shared.log.warning(f'Updated extensions list failed: {extensions_index} {e}')
list_extensions()
code = create_html(search_text, sort_column)
return code, f'Extensions | {len(extensions.extensions)} registered | {len(extensions_list)} available'
def search_extensions(search_text, sort_column):
code = create_html(search_text, sort_column)
return code, f'Search | {search_text} | {sort_column}'
def make_wrappable_html(text: str) -> str:
text = html.escape(text)
text = re_snake_case.sub("
| Extension | Description | Type | Current version | |||
|---|---|---|---|---|---|---|
| {enabled_code} | {status} | {make_wrappable_html(ext.get("name", "unknown"))} {tags_text} |
{html.escape(ext.get("description", ""))}
Created: {html.escape(dt('created'))} | Added: {html.escape(dt('added'))} | Pushed: {html.escape(dt('pushed'))} | Updated: {html.escape(dt('updated'))} {author} | Stars: {html.escape(str(ext.get('stars', 0)))} | Size: {html.escape(str(ext.get('size', 0)))} | Commits: {html.escape(str(ext.get('commits', 0)))} | Issues: {html.escape(str(ext.get('issues', 0)))} | Trending: {html.escape(str(ext['sort_trending']))} |
{type_code} | {version_code} | {install_code} |