mirror of
https://github.com/vladmandic/sdnext.git
synced 2026-01-27 15:02:48 +03:00
308 lines
12 KiB
Python
308 lines
12 KiB
Python
import itertools
|
|
import os
|
|
from collections import UserDict
|
|
from dataclasses import dataclass, field
|
|
from typing import Callable, Dict, Iterator, List, Optional, Union
|
|
from installer import print_dict, log
|
|
|
|
|
|
class Directory: # forward declaration
|
|
...
|
|
|
|
FilePathList = List[str]
|
|
FilePathIterator = Iterator[str]
|
|
DirectoryPathList = List[str]
|
|
DirectoryPathIterator = Iterator[str]
|
|
DirectoryList = List[Directory]
|
|
DirectoryIterator = Iterator[Directory]
|
|
DirectoryCollection = Dict[str, Directory]
|
|
ExtensionFilter = Callable
|
|
ExtensionList = list[str]
|
|
RecursiveType = Union[bool,Callable]
|
|
|
|
|
|
def real_path(directory_path:str) -> Union[str, None]:
|
|
try:
|
|
return os.path.abspath(os.path.expanduser(directory_path))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Directory(Directory): # pylint: disable=E0102
|
|
path: str = field(default_factory=str)
|
|
mtime: float = field(default_factory=float, init=False)
|
|
files: FilePathList = field(default_factory=list)
|
|
directories: DirectoryPathList = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
object.__setattr__(self, 'mtime', self.live_mtime)
|
|
|
|
@classmethod
|
|
def from_dict(cls, dict_object: dict) -> Directory:
|
|
directory = cls.__new__(cls)
|
|
object.__setattr__(directory, 'path', dict_object.get('path'))
|
|
object.__setattr__(directory, 'mtime', dict_object.get('mtime'))
|
|
object.__setattr__(directory, 'files', dict_object.get('files'))
|
|
object.__setattr__(directory, 'directories', dict_object.get('directories'))
|
|
return directory
|
|
|
|
def clear(self) -> None:
|
|
self._update(Directory.from_dict({
|
|
'path': None,
|
|
'mtime': float(),
|
|
'files': [],
|
|
'directories': []
|
|
}))
|
|
|
|
def update(self, source_directory: Directory) -> Directory:
|
|
if source_directory is not self:
|
|
self._update(source_directory)
|
|
return self
|
|
|
|
def _update(self, source:Directory) -> None:
|
|
assert not source.path or source.path == self.path, f'When updating a directory, the paths must match. Attemped to update Directory `{self.path}` with `{source.path}`'
|
|
for dead_path in self.directories:
|
|
if dead_path not in source.directories:
|
|
delete_cached_directory(dead_path)
|
|
self.directories[:] = source.directories
|
|
self.files[:] = source.files
|
|
object.__setattr__(self, 'mtime', source.mtime)
|
|
|
|
def __str__(self) -> str:
|
|
return str(print_dict(self, path=self.path, mtime=self.mtime, files=len(self.files), directories=len(self.directories))) # pylint: disable=unexpected-keyword-arg
|
|
|
|
@property
|
|
def exists(self) -> bool:
|
|
return self.path and os.path.exists(self.path)
|
|
|
|
@property
|
|
def is_directory(self) -> bool:
|
|
return self.exists and os.path.isdir(self.path)
|
|
|
|
@property
|
|
def live_mtime(self) -> float:
|
|
return os.path.getmtime(self.path) if self.is_directory else 0
|
|
|
|
@property
|
|
def is_stale(self) -> bool:
|
|
return not self.is_directory or self.mtime != self.live_mtime
|
|
|
|
|
|
|
|
|
|
class DirectoryCache(UserDict, DirectoryCollection):
|
|
def __delattr__(self, directory_path: str) -> None:
|
|
directory: Directory = get_directory(directory_path, fetch=False)
|
|
if directory:
|
|
map(delete_cached_directory, directory.directories)
|
|
directory.clear()
|
|
del self.data[directory_path]
|
|
|
|
|
|
def clean_directory(directory: Directory, /, recursive: RecursiveType=False) -> bool:
|
|
if not directory.is_directory:
|
|
is_clean = False
|
|
delete_cached_directory(directory.path)
|
|
else:
|
|
is_clean = not directory.is_stale
|
|
if not is_clean:
|
|
directory.update(fetch_directory(directory.path))
|
|
else:
|
|
for directory_path in directory.directories[:]:
|
|
try:
|
|
recurse = recursive and (not callable(recursive) or recursive(directory.path))
|
|
directory = get_directory(directory_path, fetch=recurse)
|
|
if directory:
|
|
if directory.is_directory:
|
|
if recurse:
|
|
is_clean = clean_directory(directory, recursive=recurse) and is_clean
|
|
continue
|
|
delete_cached_directory(directory_path)
|
|
# If we had intended to fetch this directory, but didn't, that means it doesn't exist. Purge.
|
|
if recurse:
|
|
directory.directories.remove(directory_path)
|
|
is_clean = False
|
|
except Exception:
|
|
pass
|
|
return is_clean
|
|
|
|
|
|
def get_directory(directory_or_path: str, /, fetch:bool=True) -> Union[Directory, None]:
|
|
if isinstance(directory_or_path, Directory):
|
|
if directory_or_path.is_directory:
|
|
return directory_or_path
|
|
else:
|
|
directory_or_path = directory_or_path.path
|
|
directory_or_path = real_path(directory_or_path)
|
|
if not cache_folders.get(directory_or_path, None):
|
|
if fetch:
|
|
directory = fetch_directory(directory_path=directory_or_path)
|
|
if directory:
|
|
cache_folders[directory_or_path] = directory
|
|
else:
|
|
clean_directory(cache_folders[directory_or_path])
|
|
return cache_folders[directory_or_path] if directory_or_path in cache_folders else None
|
|
|
|
|
|
def fetch_directory(directory_path: str) -> Union[Directory, None]:
|
|
directory: Directory
|
|
for directory in _walk(directory_path, recurse=False):
|
|
return directory # The return is intentional, we get a generator, we only need the one
|
|
return None
|
|
|
|
|
|
def _walk(top, recurse:RecursiveType=True) -> Directory:
|
|
# reimplemented `path.walk()`
|
|
nondirs = []
|
|
walk_dirs = []
|
|
try:
|
|
scandir_it = os.scandir(top)
|
|
entry = next(scandir_it)
|
|
except OSError:
|
|
return
|
|
with scandir_it:
|
|
while entry:
|
|
if not entry.is_dir():
|
|
nondirs.append(entry.path)
|
|
else:
|
|
if entry.is_symlink() and not os.path.exists(entry.path):
|
|
log.error(f'Files broken symlink: {entry.path}')
|
|
else:
|
|
walk_dirs.append(entry.path)
|
|
try:
|
|
entry = next(scandir_it)
|
|
except Exception:
|
|
entry = None
|
|
yield Directory(top, nondirs, walk_dirs)
|
|
if recurse:
|
|
for new_path in walk_dirs:
|
|
if callable(recurse) and not recurse(new_path):
|
|
continue
|
|
yield from _walk(new_path, recurse=recurse)
|
|
|
|
|
|
def _cached_walk(top, recurse:RecursiveType=True) -> Directory:
|
|
top = get_directory(top)
|
|
if not top:
|
|
return
|
|
yield top
|
|
if recurse:
|
|
for child_directory in top.directories:
|
|
if os.path.basename(child_directory).startswith('models--'):
|
|
continue
|
|
if callable(recurse) and not recurse(child_directory):
|
|
continue
|
|
yield from _cached_walk(child_directory, recurse=recurse)
|
|
|
|
|
|
def walk(top, recurse:RecursiveType=True, cached=True) -> Directory:
|
|
yield from _cached_walk(top, recurse=recurse) if cached else _walk(top, recurse=recurse)
|
|
|
|
|
|
def delete_cached_directory(directory_path:str) -> bool:
|
|
global cache_folders # pylint: disable=W0602
|
|
if directory_path in cache_folders:
|
|
del cache_folders[directory_path]
|
|
|
|
|
|
def is_directory(dir_path:str) -> bool:
|
|
return dir_path and os.path.exists(dir_path) and os.path.isdir(dir_path)
|
|
|
|
|
|
def directory_mtime(directory_path:str, /, recursive:RecursiveType=True) -> float:
|
|
return float(max(0, *[directory.mtime for directory in get_directories(directory_path, recursive=recursive)]))
|
|
|
|
|
|
def unique_directories(directories:DirectoryPathList, /, recursive:RecursiveType=True) -> DirectoryPathIterator:
|
|
'''Ensure no empty, or duplicates'''
|
|
'''If we are going recursive, then directories that are children of other directories are redundant'''
|
|
''' @todo this is incredibly inneficient. the hit is small, but it is ugly, no? '''
|
|
directories = sorted(unique_paths(directories), reverse=True)
|
|
while directories:
|
|
directory = directories.pop()
|
|
yield directory
|
|
if not recursive:
|
|
continue
|
|
_directory = os.path.join(directory, '')
|
|
child_directory = None
|
|
while directories and directories[-1].startswith(_directory):
|
|
if not callable(recursive) or not child_directory:
|
|
directories.pop()
|
|
continue
|
|
child_directory = directories[-1][len(directory):]
|
|
if child_directory:
|
|
next_directory = _directory
|
|
if not callable(recursive):
|
|
_remove_directory = next_directory
|
|
else:
|
|
for sub_directory in child_directory.split(os.path.sep):
|
|
next_directory = os.path.join(next_directory, sub_directory)
|
|
if recursive(next_directory):
|
|
_remove_directory = os.path.join(next_directory, '')
|
|
break
|
|
while _remove_directory and directories:
|
|
_d = directories.pop()
|
|
if not directories[-1].startswith(_remove_directory):
|
|
del _remove_directory
|
|
|
|
|
|
def unique_paths(directory_paths:DirectoryPathList) -> DirectoryPathIterator:
|
|
realpaths = (real_path(directory_path) for directory_path in filter(bool, directory_paths))
|
|
return {real_directory_path: True for real_directory_path in filter(bool, realpaths)}.keys()
|
|
|
|
|
|
def get_directories(*directory_paths: DirectoryPathList, fetch:bool=True, recursive:RecursiveType=True) -> DirectoryCollection:
|
|
directory_paths = unique_directories(directory_paths, recursive=recursive)
|
|
directories = (get_directory(directory_path, fetch=fetch) for directory_path in directory_paths)
|
|
return filter(bool, directories)
|
|
|
|
|
|
def directory_files(*directories_or_paths: Union[DirectoryPathList, DirectoryList], recursive: RecursiveType=True) -> FilePathIterator:
|
|
return itertools.chain.from_iterable(
|
|
itertools.chain(
|
|
directory_object.files,
|
|
[]
|
|
if not recursive
|
|
else itertools.chain.from_iterable(
|
|
directory_files(directory, recursive=recursive)
|
|
for directory
|
|
in filter(
|
|
bool,
|
|
map(get_directory, filter(((bool if recursive else False) if not callable(recursive) else recursive), directory_object.directories))
|
|
)
|
|
)
|
|
)
|
|
for directory_object
|
|
in filter(bool, map(get_directory, directories_or_paths))
|
|
)
|
|
|
|
|
|
def extension_filter(ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> ExtensionFilter:
|
|
if ext_filter:
|
|
ext_filter = [*map(str.upper, ext_filter)]
|
|
if ext_blacklist:
|
|
ext_blacklist = [*map(str.upper, ext_blacklist)]
|
|
def filter_functon(fp:str):
|
|
return (not ext_filter or any(fp.upper().endswith(ew) for ew in ext_filter)) and (not ext_blacklist or not any(fp.upper().endswith(ew) for ew in ext_blacklist))
|
|
return filter_functon
|
|
|
|
|
|
def not_hidden(filepath: str) -> bool:
|
|
return not os.path.basename(filepath).startswith('.')
|
|
|
|
|
|
def filter_files(file_paths: FilePathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> FilePathIterator:
|
|
return filter(extension_filter(ext_filter, ext_blacklist), file_paths)
|
|
|
|
|
|
def list_files(*directory_paths:DirectoryPathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None, recursive:RecursiveType=True) -> FilePathIterator:
|
|
return filter_files(itertools.chain.from_iterable(
|
|
directory_files(directory, recursive=recursive)
|
|
for directory in get_directories(*directory_paths, recursive=recursive)
|
|
), ext_filter, ext_blacklist)
|
|
|
|
|
|
cache_folders = DirectoryCache({})
|