TomatoCocotree
上传
6a62ffb
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import typing as tp
def _safe_readline(fd) -> str:
pos = fd.tell()
while True:
try:
return fd.readline()
except UnicodeDecodeError:
pos -= 1
fd.seek(pos) # search where this character begins
def find_offsets(filename: str, num_chunks: int) -> tp.List[int]:
"""
given a file and a number of chuncks, find the offsets in the file
to be able to chunk around full lines.
"""
with open(filename, "r", encoding="utf-8") as f:
size = os.fstat(f.fileno()).st_size
chunk_size = size // num_chunks
offsets = [0 for _ in range(num_chunks + 1)]
for i in range(1, num_chunks):
f.seek(chunk_size * i)
_safe_readline(f)
offsets[i] = f.tell()
offsets[-1] = size
return offsets
class ChunkLineIterator:
"""
Iterator to properly iterate over lines of a file chunck.
"""
def __init__(self, fd, start_offset: int, end_offset: int):
self._fd = fd
self._start_offset = start_offset
self._end_offset = end_offset
def __iter__(self) -> tp.Iterable[str]:
self._fd.seek(self._start_offset)
# next(f) breaks f.tell(), hence readline() must be used
line = _safe_readline(self._fd)
while line:
pos = self._fd.tell()
# f.tell() does not always give the byte position in the file
# sometimes it skips to a very large number
# it is unlikely that through a normal read we go from
# end bytes to end + 2**32 bytes (4 GB) and this makes it unlikely
# that the procedure breaks by the undeterministic behavior of
# f.tell()
if (
self._end_offset > 0
and pos > self._end_offset
and pos < self._end_offset + 2**32
):
break
yield line
line = self._fd.readline()
class Chunker:
"""
contextmanager to read a chunck of a file line by line.
"""
def __init__(self, path: str, start_offset: int, end_offset: int):
self.path = path
self.start_offset = start_offset
self.end_offset = end_offset
def __enter__(self) -> ChunkLineIterator:
self.fd = open(self.path, "r", encoding="utf-8")
return ChunkLineIterator(self.fd, self.start_offset, self.end_offset)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.fd.close()