from pydantic import BaseModel from .record import Record from storage import Storage from embedding import Embedding import time import json class Document(BaseModel): name: str description: str | None = None status: str = 'uploading' # uploading, processing, done, failed url: str | None = None _embedding: Embedding _storage: Storage def load_records(self) -> list[Record]: pass class PlainTextDocument(Document): def __init__( self, embedding: Embedding, storage: Storage, **kwargs): super().__init__(**kwargs) self._embedding = embedding self._storage = storage def _enhance_line(self, line: str) -> str: return line def load_records(self) -> list[Record]: str = self._storage.load(self.url) lines = str.split('\n') for i, line in enumerate(lines): # remove empty lines if len(line.strip()) == 0: continue enhance_line = self._enhance_line(line) embedding = self._embedding.generate_embedding(enhance_line) embedding_type = self._embedding.type meta_data = { 'embedding_type': embedding_type, 'document_id': self.name, 'line_number': i, 'source': line, } yield Record( embedding=embedding, meta_data=meta_data, content=line, document_id=self.name, timestamp=int(time.time())) class JsonDocument(Document): def __init__( self, embedding: Embedding, storage: Storage, **kwargs): super().__init__(**kwargs) self._embedding = embedding self._storage = storage def load_records(self) -> list[Record]: ''' json format: { 'content': str // the content of the record 'meta_data': dict // the meta data of the record } ''' str = self._storage.load(self.url) records = json.loads(str) for i, item in enumerate(records): # sleep 300ms time.sleep(0.3) embedding = self._embedding.generate_embedding(item['content']) embedding_type = self._embedding.type meta_data = { 'embedding_type': embedding_type, 'document_id': self.name, 'line_number': i, 'source': item['content'], } if 'meta_data' in item: # merge meta data meta_data = {**item['meta_data'], **meta_data} yield Record( embedding=embedding, meta_data=meta_data, content=item['content'], document_id=self.name, timestamp=int(time.time()))