Source code for gluoncv.utils.filesystem

"""Filesystem utility functions."""
import os
import errno
import contextlib
from pathlib import Path
import tarfile
import zipfile

[docs]def makedirs(path): """Create directory recursively if not exists. Similar to `makedir -p`, you can skip checking existence before this function. Parameters ---------- path : str Path of the desired dir """ try: os.makedirs(path) except OSError as exc: if exc.errno != errno.EEXIST: raise
def try_import(package, message=None, fromlist=None): """Try import specified package, with custom message support. Parameters ---------- package : str The name of the targeting package. message : str, default is None If not None, this function will raise customized error message when import error is found. Returns ------- module if found, raise ImportError otherwise """ try: return __import__(package, fromlist=fromlist) except ImportError as e: if not message: raise e raise ImportError(message)
[docs]def try_import_cv2(): """Try import cv2 at runtime. Returns ------- cv2 module if found. Raise ImportError otherwise """ msg = "cv2 is required, you can install by package manager, e.g. 'apt-get', \ or `pip install opencv-python --user` (note that this is unofficial PYPI package)." return try_import('cv2', msg)
def try_import_munkres(): """Try import munkres at runtime. Returns ------- munkres module if found. Raise ImportError otherwise Munkres (Hungarian) algorithm for the Assignment Problem """ msg = "munkres is required, you can install by `pip install munkres --user`. " return try_import('munkres', msg) def try_import_colorama(): """Try import colorama at runtime. Returns ------- colorama module if found. Raise ImportError otherwise """ msg = "colorama is required, you can install by `pip install colorama --user` \ (note that this is unofficial PYPI package)." return try_import('colorama', msg) def try_import_decord(): """Try import decord at runtime. Returns ------- Decord module if found. Raise ImportError otherwise """ msg = "Decord is required, you can install by `pip install decord --user` \ (note that this is unofficial PYPI package)." return try_import('decord', msg) def try_import_mmcv(): """Try import mmcv at runtime. Returns ------- mmcv module if found. Raise ImportError otherwise """ msg = "mmcv is required, you can install by first `pip install Cython --user` \ and then `pip install mmcv --user` (note that this is unofficial PYPI package)." return try_import('mmcv', msg) def try_import_rarfile(): """Try import rarfile at runtime. Returns ------- rarfile module if found. Raise ImportError otherwise """ msg = "rarfile is required, you can install by first `sudo apt-get install unrar` \ and then `pip install rarfile --user` (note that this is unofficial PYPI package)." return try_import('rarfile', msg) def import_try_install(package, extern_url=None): """Try import the specified package. If the package not installed, try use pip to install and import if success. Parameters ---------- package : str The name of the package trying to import. extern_url : str or None, optional The external url if package is not hosted on PyPI. For example, you can install a package using: "pip install git+http://github.com/user/repo/tarball/master/egginfo=xxx". In this case, you can pass the url to the extern_url. Returns ------- <class 'Module'> The imported python module. """ import tempfile import portalocker lockfile = os.path.join(tempfile.gettempdir(), package + '_install.lck') with portalocker.Lock(lockfile): try: return __import__(package) except ImportError: try: from pip import main as pipmain except ImportError: from pip._internal import main as pipmain from types import ModuleType # fix for pip 19.3 if isinstance(pipmain, ModuleType): from pip._internal.main import main as pipmain # trying to install package url = package if extern_url is None else extern_url pipmain(['install', '--user', url]) # will raise SystemExit Error if fails # trying to load again try: return __import__(package) except ImportError: import sys import site user_site = site.getusersitepackages() if user_site not in sys.path: sys.path.append(user_site) return __import__(package) return __import__(package)
[docs]def try_import_dali(): """Try import NVIDIA DALI at runtime. """ try: dali = __import__('nvidia.dali', fromlist=['pipeline', 'ops', 'types']) dali.Pipeline = dali.pipeline.Pipeline except (ImportError, RuntimeError) as e: if isinstance(e, ImportError): msg = "DALI not found, please check if you installed it correctly." elif isinstance(e, RuntimeError): msg = "No CUDA-capable device is detected ({}).".format(e) class dali: class Pipeline: def __init__(self): raise NotImplementedError(msg) return dali
def try_import_html5lib(): """Try import html5lib at runtime. Returns ------- html5lib module if found. Raise ImportError otherwise """ msg = "html5lib is required, you can install by package manager, " \ "e.g. pip install html5lib --user` (note that this is unofficial PYPI package)." return try_import('html5lib', msg) def try_import_gdfDownloader(): """Try import googleDriveFileDownloader at runtime. Returns ------- googleDriveFileDownloader module if found. Raise ImportError otherwise """ msg = "googleDriveFileDownloader is required, you can install by package manager, " \ "e.g. pip install googleDriveFileDownloader --user` " \ "(note that this is unofficial PYPI package)." return try_import('googleDriveFileDownloader', msg) def unzip(zip_file_path, root='./', strict=False): """Unzips files located at `zip_file_path` into parent directory specified by `root`. """ root = os.path.expanduser(root) with zipfile.ZipFile(zip_file_path) as zf: if strict or not os.path.exists(os.path.join(root, zf.namelist()[-1])): zf.extractall(root) folder = os.path.commonprefix(zf.namelist()) return os.path.join(root, folder) def untar(tar_file_path, root='./', strict=False): """Untars files located at `tar_file_path` into parent directory specified by `root`. """ root = os.path.expanduser(root) with tarfile.open(tar_file_path, 'r:gz') as zf: if strict or not os.path.exists(os.path.join(root, zf.getnames()[-1])): zf.extractall(root) folder = os.path.commonprefix(zf.getnames()) return os.path.join(root, folder) @contextlib.contextmanager def temporary_filename(suffix=None): """Context that introduces a temporary file. Creates a temporary file, yields its name, and upon context exit, deletes it. (In contrast, tempfile.NamedTemporaryFile() provides a 'file' object and deletes the file as soon as that file object is closed, so the temporary file cannot be safely re-opened by another library or process.) Parameters ---------- suffix: desired filename extension (e.g. '.mp4'). Yields ---------- The name of the temporary file. """ import tempfile try: f = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) tmp_name = f.name f.close() yield tmp_name finally: os.unlink(tmp_name) class _DisplayablePath: """A util class for displaying the tree structure of root path. Example: >>> paths = _DisplayablePath.make_tree(Path('doc')) >>> for path in paths: >>> print(path.displayable()) Parameters ---------- path : str The path. parent_path : str The parent parth. is_last : bool Whether it's the last node in this depth. """ display_filename_prefix_middle = '├──' display_filename_prefix_last = '└──' display_parent_prefix_middle = ' ' display_parent_prefix_last = '│ ' def __init__(self, path, parent_path, is_last): self.path = Path(str(path)) self.parent = parent_path self.is_last = is_last if self.parent: self.depth = self.parent.depth + 1 else: self.depth = 0 # pylint: disable=inconsistent-return-statements @classmethod def make_tree(cls, root, parent=None, is_last=False, criteria=None, max_depth=1): """Make tree structure from root. Parameters ---------- root : str The root dir. parent : _DisplayablePath The parent displayable path. is_last : bool Whether it's the last in this level. criteria : function The criteria used to filter dir/path. max_depth : int Maximum depth for search. """ root = Path(str(root)) criteria = criteria or cls._default_criteria displayable_root = cls(root, parent, is_last) if displayable_root.depth > max_depth: return displayable_root yield displayable_root children = sorted(list(path for path in root.iterdir() if criteria(path)), key=lambda s: str(s).lower()) count = 1 for path in children: is_last = count == len(children) if path.is_dir() and displayable_root.depth < max_depth - 1: yield from cls.make_tree(path, parent=displayable_root, is_last=is_last, criteria=criteria, max_depth=max_depth) else: yield cls(path, displayable_root, is_last) count += 1 @classmethod def _default_criteria(cls, path): _ = path return True @property def displayname(self): if self.path.is_dir(): return self.path.name + '/' return self.path.name def displayable(self): """Display string""" if self.parent is None: return self.displayname _filename_prefix = (self.display_filename_prefix_last if self.is_last else self.display_filename_prefix_middle) parts = ['{!s} {!s}'.format(_filename_prefix, self.displayname)] parent = self.parent while parent and parent.parent is not None: parts.append(self.display_parent_prefix_middle if parent.is_last else self.display_parent_prefix_last) parent = parent.parent return ''.join(reversed(parts)) class PathTree: """A directory tree structure viewer. Parameters ---------- root : str or pathlib.Path The root directory. max_depth : int Max depth for recursive sub-folders, please be conservative to not spam the filesystem. """ def __init__(self, root, max_depth=1): self._disp_path = _DisplayablePath.make_tree(Path(root), max_depth=max_depth) def __str__(self): s = '\n'.join([p.displayable() for p in self._disp_path]) return s def try_import_skimage(): """Try import scikit-image at runtime. Returns ------- scikit-image module if found. Raise ImportError otherwise """ msg = "skimage is required, you can install by package manager, e.g. " \ "`pip install scikit-image --user` (note that this is unofficial PYPI package)." return try_import('skimage', msg)