Spaces:
Runtime error
Runtime error
import os | |
import ujson | |
from functools import partial | |
from colbert.utils.utils import print_message | |
from colbert.modeling.tokenization import QueryTokenizer, DocTokenizer, tensorize_triples | |
from colbert.utils.runs import Run | |
class EagerBatcher(): | |
def __init__(self, args, rank=0, nranks=1): | |
self.rank, self.nranks = rank, nranks | |
self.bsize, self.accumsteps = args.bsize, args.accumsteps | |
self.query_tokenizer = QueryTokenizer(args.query_maxlen) | |
self.doc_tokenizer = DocTokenizer(args.doc_maxlen) | |
self.tensorize_triples = partial(tensorize_triples, self.query_tokenizer, self.doc_tokenizer) | |
self.triples_path = args.triples | |
self._reset_triples() | |
def _reset_triples(self): | |
self.reader = open(self.triples_path, mode='r', encoding="utf-8") | |
self.position = 0 | |
def __iter__(self): | |
return self | |
def __next__(self): | |
queries, positives, negatives = [], [], [] | |
for line_idx, line in zip(range(self.bsize * self.nranks), self.reader): | |
if (self.position + line_idx) % self.nranks != self.rank: | |
continue | |
query, pos, neg = line.strip().split('\t') | |
queries.append(query) | |
positives.append(pos) | |
negatives.append(neg) | |
self.position += line_idx + 1 | |
if len(queries) < self.bsize: | |
raise StopIteration | |
return self.collate(queries, positives, negatives) | |
def collate(self, queries, positives, negatives): | |
assert len(queries) == len(positives) == len(negatives) == self.bsize | |
return self.tensorize_triples(queries, positives, negatives, self.bsize // self.accumsteps) | |
def skip_to_batch(self, batch_idx, intended_batch_size): | |
self._reset_triples() | |
Run.warn(f'Skipping to batch #{batch_idx} (with intended_batch_size = {intended_batch_size}) for training.') | |
_ = [self.reader.readline() for _ in range(batch_idx * intended_batch_size)] | |
return None | |