Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,172 Bytes
113884e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
from utils.dataset_utils import *
class SingleVideoDataset(Dataset):
def __init__(
self,
tokenizer = None,
width: int = 256,
height: int = 256,
n_sample_frames: int = 4,
frame_step: int = 1,
single_video_path: str = "",
single_video_prompt: str = "",
use_caption: bool = False,
use_bucketing: bool = False,
**kwargs
):
self.tokenizer = tokenizer
self.use_bucketing = use_bucketing
self.frames = []
self.index = 1
self.vid_types = (".mp4", ".avi", ".mov", ".webm", ".flv", ".mjpeg")
self.n_sample_frames = n_sample_frames
self.frame_step = frame_step
self.single_video_path = single_video_path
self.single_video_prompt = single_video_prompt
self.width = width
self.height = height
def create_video_chunks(self):
vr = decord.VideoReader(self.single_video_path)
vr_range = range(0, len(vr), self.frame_step)
self.frames = list(self.chunk(vr_range, self.n_sample_frames))
return self.frames
def chunk(self, it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def get_frame_batch(self, vr, resize=None):
index = self.index
frames = vr.get_batch(self.frames[self.index])
if type(frames) == decord.ndarray.NDArray:
frames = torch.from_numpy(frames.asnumpy())
video = rearrange(frames, "f h w c -> f c h w")
if resize is not None: video = resize(video)
return video
def get_frame_buckets(self, vr):
h, w, c = vr[0].shape
width, height = sensible_buckets(self.width, self.height, w, h)
resize = T.transforms.Resize((height, width), antialias=True)
return resize
def process_video_wrapper(self, vid_path):
video, vr = process_video(
vid_path,
self.use_bucketing,
self.width,
self.height,
self.get_frame_buckets,
self.get_frame_batch
)
return video, vr
def single_video_batch(self, index):
train_data = self.single_video_path
self.index = index
if train_data.endswith(self.vid_types):
video, _ = self.process_video_wrapper(train_data)
prompt = self.single_video_prompt
prompt_ids = get_prompt_ids(prompt, self.tokenizer)
return video, prompt, prompt_ids
else:
raise ValueError(f"Single video is not a video type. Types: {self.vid_types}")
@staticmethod
def __getname__(): return 'single_video'
def __len__(self):
return len(self.create_video_chunks())
def __getitem__(self, index):
video, prompt, prompt_ids = self.single_video_batch(index)
example = {
"pixel_values": (video / 127.5 - 1.0),
"prompt_ids": prompt_ids[0],
"text_prompt": prompt,
'dataset': self.__getname__()
}
return example |