Spaces:
Running
Running
#!/usr/bin/env python3 | |
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import logging | |
import os | |
import shutil | |
from typing import List, Optional | |
logger = logging.getLogger(__file__) | |
try: | |
from iopath.common.file_io import g_pathmgr as IOPathManager | |
try: | |
# [FB only - for now] AWS PathHandler for PathManager | |
from .fb_pathhandlers import S3PathHandler | |
IOPathManager.register_handler(S3PathHandler()) | |
except KeyError: | |
logging.warning("S3PathHandler already registered.") | |
except ImportError: | |
logging.debug( | |
"S3PathHandler couldn't be imported. Either missing fb-only files, or boto3 module." | |
) | |
except ImportError: | |
IOPathManager = None | |
class PathManager: | |
""" | |
Wrapper for insulating OSS I/O (using Python builtin operations) from | |
iopath's PathManager abstraction (for transparently handling various | |
internal backends). | |
""" | |
def open( | |
path: str, | |
mode: str = "r", | |
buffering: int = -1, | |
encoding: Optional[str] = None, | |
errors: Optional[str] = None, | |
newline: Optional[str] = None, | |
): | |
if IOPathManager: | |
return IOPathManager.open( | |
path=path, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
errors=errors, | |
newline=newline, | |
) | |
return open( | |
path, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
errors=errors, | |
newline=newline, | |
) | |
def copy(src_path: str, dst_path: str, overwrite: bool = False) -> bool: | |
if IOPathManager: | |
return IOPathManager.copy( | |
src_path=src_path, dst_path=dst_path, overwrite=overwrite | |
) | |
return shutil.copyfile(src_path, dst_path) | |
def get_local_path(path: str, **kwargs) -> str: | |
if IOPathManager: | |
return IOPathManager.get_local_path(path, **kwargs) | |
return path | |
def exists(path: str) -> bool: | |
if IOPathManager: | |
return IOPathManager.exists(path) | |
return os.path.exists(path) | |
def isfile(path: str) -> bool: | |
if IOPathManager: | |
return IOPathManager.isfile(path) | |
return os.path.isfile(path) | |
def ls(path: str) -> List[str]: | |
if IOPathManager: | |
return IOPathManager.ls(path) | |
return os.listdir(path) | |
def mkdirs(path: str) -> None: | |
if IOPathManager: | |
return IOPathManager.mkdirs(path) | |
os.makedirs(path, exist_ok=True) | |
def rm(path: str) -> None: | |
if IOPathManager: | |
return IOPathManager.rm(path) | |
os.remove(path) | |
def chmod(path: str, mode: int) -> None: | |
if not PathManager.path_requires_pathmanager(path): | |
os.chmod(path, mode) | |
def register_handler(handler) -> None: | |
if IOPathManager: | |
return IOPathManager.register_handler(handler=handler) | |
def copy_from_local( | |
local_path: str, dst_path: str, overwrite: bool = False, **kwargs | |
) -> None: | |
if IOPathManager: | |
return IOPathManager.copy_from_local( | |
local_path=local_path, dst_path=dst_path, overwrite=overwrite, **kwargs | |
) | |
return shutil.copyfile(local_path, dst_path) | |
def path_requires_pathmanager(path: str) -> bool: | |
"""Do we require PathManager to access given path?""" | |
if IOPathManager: | |
for p in IOPathManager._path_handlers.keys(): | |
if path.startswith(p): | |
return True | |
return False | |
def supports_rename(path: str) -> bool: | |
# PathManager doesn't yet support renames | |
return not PathManager.path_requires_pathmanager(path) | |
def rename(src: str, dst: str): | |
os.rename(src, dst) | |
""" | |
ioPath async PathManager methods: | |
""" | |
def opena( | |
path: str, | |
mode: str = "r", | |
buffering: int = -1, | |
encoding: Optional[str] = None, | |
errors: Optional[str] = None, | |
newline: Optional[str] = None, | |
): | |
""" | |
Return file descriptor with asynchronous write operations. | |
""" | |
global IOPathManager | |
if not IOPathManager: | |
logging.info("ioPath is initializing PathManager.") | |
try: | |
from iopath.common.file_io import PathManager | |
IOPathManager = PathManager() | |
except Exception: | |
logging.exception("Failed to initialize ioPath PathManager object.") | |
return IOPathManager.opena( | |
path=path, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
errors=errors, | |
newline=newline, | |
) | |
def async_close() -> bool: | |
""" | |
Wait for files to be written and clean up asynchronous PathManager. | |
NOTE: `PathManager.async_close()` must be called at the end of any | |
script that uses `PathManager.opena(...)`. | |
""" | |
global IOPathManager | |
if IOPathManager: | |
return IOPathManager.async_close() | |
return False | |