Spaces:
Running
Running
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import os | |
import typing as tp | |
def _safe_readline(fd) -> str: | |
pos = fd.tell() | |
while True: | |
try: | |
return fd.readline() | |
except UnicodeDecodeError: | |
pos -= 1 | |
fd.seek(pos) # search where this character begins | |
def find_offsets(filename: str, num_chunks: int) -> tp.List[int]: | |
""" | |
given a file and a number of chuncks, find the offsets in the file | |
to be able to chunk around full lines. | |
""" | |
with open(filename, "r", encoding="utf-8") as f: | |
size = os.fstat(f.fileno()).st_size | |
chunk_size = size // num_chunks | |
offsets = [0 for _ in range(num_chunks + 1)] | |
for i in range(1, num_chunks): | |
f.seek(chunk_size * i) | |
_safe_readline(f) | |
offsets[i] = f.tell() | |
offsets[-1] = size | |
return offsets | |
class ChunkLineIterator: | |
""" | |
Iterator to properly iterate over lines of a file chunck. | |
""" | |
def __init__(self, fd, start_offset: int, end_offset: int): | |
self._fd = fd | |
self._start_offset = start_offset | |
self._end_offset = end_offset | |
def __iter__(self) -> tp.Iterable[str]: | |
self._fd.seek(self._start_offset) | |
# next(f) breaks f.tell(), hence readline() must be used | |
line = _safe_readline(self._fd) | |
while line: | |
pos = self._fd.tell() | |
# f.tell() does not always give the byte position in the file | |
# sometimes it skips to a very large number | |
# it is unlikely that through a normal read we go from | |
# end bytes to end + 2**32 bytes (4 GB) and this makes it unlikely | |
# that the procedure breaks by the undeterministic behavior of | |
# f.tell() | |
if ( | |
self._end_offset > 0 | |
and pos > self._end_offset | |
and pos < self._end_offset + 2**32 | |
): | |
break | |
yield line | |
line = self._fd.readline() | |
class Chunker: | |
""" | |
contextmanager to read a chunck of a file line by line. | |
""" | |
def __init__(self, path: str, start_offset: int, end_offset: int): | |
self.path = path | |
self.start_offset = start_offset | |
self.end_offset = end_offset | |
def __enter__(self) -> ChunkLineIterator: | |
self.fd = open(self.path, "r", encoding="utf-8") | |
return ChunkLineIterator(self.fd, self.start_offset, self.end_offset) | |
def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
self.fd.close() | |