Spaces:
Sleeping
Sleeping
import warnings | |
warnings.filterwarnings('ignore', category=DeprecationWarning) | |
import io | |
import os | |
from tqdm import tqdm | |
os.environ['MKL_SERVICE_FORCE_INTEL'] = '1' | |
from pathlib import Path | |
from collections import OrderedDict | |
import hydra | |
import numpy as np | |
import torch | |
import tools.utils as utils | |
from tools.replay import load_episode | |
torch.backends.cudnn.benchmark = True | |
if os.name == "nt": | |
import msvcrt | |
def portable_lock(fp): | |
fp.seek(0) | |
msvcrt.locking(fp, msvcrt.LK_LOCK, 1) | |
def portable_unlock(fp): | |
fp.seek(0) | |
msvcrt.locking(fp, msvcrt.LK_UNLCK, 1) | |
else: | |
import fcntl | |
def portable_lock(fp): | |
fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
def portable_unlock(fp): | |
fcntl.flock(fp, fcntl.LOCK_UN) | |
class Locker: | |
def __init__(self, lock_name): | |
# e.g. lock_name = "./lockfile.lck" | |
self.lock_name = lock_name | |
def __enter__(self,): | |
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC | |
self.fd = os.open(self.lock_name, open_mode) | |
portable_lock(self.fd) | |
def __exit__(self, _type, value, tb): | |
portable_unlock(self.fd) | |
os.close(self.fd) | |
try: | |
os.remove(self.lock_name) | |
except: | |
pass | |
class Workspace: | |
def __init__(self, cfg, savedir=None, workdir=None,): | |
self.workdir = Path.cwd() if workdir is None else workdir | |
print(f'workspace: {self.workdir}') | |
assert int(cfg.viclip_encode) == 1, "encoding only one (video or img)" | |
if cfg.viclip_encode: | |
self.key_to_add = 'clip_video' | |
self.key_to_process = getattr(cfg, 'key_to_process', 'observation') | |
self.cfg = cfg | |
self.device = torch.device(cfg.device) | |
# create envs | |
task = cfg.task | |
self.task = task | |
img_size = cfg.img_size | |
import envs.main as envs | |
self.train_env = envs.make(task, cfg.obs_type, cfg.action_repeat, cfg.seed, img_size=img_size, viclip_encode=cfg.viclip_encode, device='cuda') | |
self.dataset_path = Path(cfg.dataset_dir) | |
self.timer = utils.Timer() | |
self._global_step = 0 | |
self._global_episode = 0 | |
def process(self): | |
filenames = sorted(self.dataset_path.glob('**/*.npz')) | |
print(f"Found {len(filenames)} files") | |
episodes_to_process = {} | |
for idx, fname in tqdm(enumerate(filenames)): | |
lockname = str(fname.absolute()) + ".lck" | |
try: | |
with Locker(lockname): | |
episode = load_episode(fname) | |
# validate before continuing | |
if type(episode[self.key_to_add]) == np.ndarray and episode[self.key_to_add].size > 1 and episode[self.key_to_add].shape[0] == episode[self.key_to_process].shape[0]: | |
continue | |
else: | |
del episode[self.key_to_add] | |
add_data = self.train_env.process_episode(episode[self.key_to_process]) # .cpu().numpy() | |
if idx == 0: | |
print(add_data.shape) | |
episode[self.key_to_add] = add_data | |
# save episode | |
with io.BytesIO() as f1: | |
np.savez_compressed(f1, **episode) | |
f1.seek(0) | |
with fname.open('wb') as f2: | |
f2.write(f1.read()) | |
except BlockingIOError: | |
print(f"File busy: {str(fname)}") | |
continue | |
def start_processing(cfg, savedir, workdir): | |
from process_dataset import Workspace as W | |
root_dir = Path.cwd() | |
cfg.workdir = str(root_dir) | |
workspace = W(cfg, savedir, workdir) | |
workspace.root_dir = root_dir | |
snapshot = workspace.root_dir / 'last_snapshot.pt' | |
if snapshot.exists(): | |
print(f'resuming: {snapshot}') | |
workspace.load_snapshot(workspace.root_dir) | |
workspace.process() | |
def main(cfg): | |
start_processing(cfg, None, None) | |
if __name__ == '__main__': | |
main() |