''' Created By Lewis Kamau Kimaru Sema translator api backend January 2024 Docker deployment ''' from fastapi import FastAPI, HTTPException, Request from fastapi_middleware import Middleware from fastapi_middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse import gradio as gr import ctranslate2 import sentencepiece as spm import fasttext import uvicorn import pytz from datetime import datetime import os app = FastAPI() origins = ["*"] app.add_middleware( Middleware( CORSMiddleware, allow_origins=origins, allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) ) fasttext.FastText.eprint = lambda x: None # Get time of request def get_time(): nairobi_timezone = pytz.timezone('Africa/Nairobi') current_time_nairobi = datetime.now(nairobi_timezone) curr_day = current_time_nairobi.strftime('%A') curr_date = current_time_nairobi.strftime('%Y-%m-%d') curr_time = current_time_nairobi.strftime('%H:%M:%S') full_date = f"{curr_day} | {curr_date} | {curr_time}" return full_date, curr_time # Load the model and tokenizer ..... only once! beam_size = 1 # change to a smaller value for faster inference device = "cpu" # or "cuda" # Language Prediction model print("\nimporting Language Prediction model") lang_model_file = "lid218e.bin" lang_model_full_path = os.path.join(os.path.dirname(__file__), lang_model_file) lang_model = fasttext.load_model(lang_model_full_path) # Load the source SentencePiece model print("\nimporting SentencePiece model") sp_model_file = "spm.model" sp_model_full_path = os.path.join(os.path.dirname(__file__), sp_model_file) sp = spm.SentencePieceProcessor() sp.load(sp_model_full_path) # Import The Translator model print("\nimporting Translator model") ct_model_file = "sematrans-3.3B" ct_model_full_path = os.path.join(os.path.dirname(__file__), ct_model_file) translator = ctranslate2.Translator(ct_model_full_path, device) print('\nDone importing models\n') def translate_detect(userinput: str, target_lang: str): source_sents = [userinput] source_sents = [sent.strip() for sent in source_sents] target_prefix = [[target_lang]] * len(source_sents) # Predict the source language predictions = lang_model.predict(source_sents[0], k=1) source_lang = predictions[0][0].replace('__label__', '') # Subword the source sentences source_sents_subworded = sp.encode(source_sents, out_type=str) source_sents_subworded = [[source_lang] + sent + [""] for sent in source_sents_subworded] # Translate the source sentences translations = translator.translate_batch( source_sents_subworded, batch_type="tokens", max_batch_size=2024, beam_size=beam_size, target_prefix=target_prefix, ) translations = [translation[0]['tokens'] for translation in translations] # Desubword the target sentences translations_desubword = sp.decode(translations) translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] # Return the source language and the translated text return source_lang, translations_desubword def translate_enter(userinput: str, source_lang: str, target_lang: str): source_sents = [userinput] source_sents = [sent.strip() for sent in source_sents] target_prefix = [[target_lang]] * len(source_sents) # Subword the source sentences source_sents_subworded = sp.encode(source_sents, out_type=str) source_sents_subworded = [[source_lang] + sent + [""] for sent in source_sents_subworded] # Translate the source sentences translations = translator.translate_batch(source_sents_subworded, batch_type="tokens", max_batch_size=2024, beam_size=beam_size, target_prefix=target_prefix) translations = [translation[0]['tokens'] for translation in translations] # Desubword the target sentences translations_desubword = sp.decode(translations) translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] # Return the source language and the translated text return translations_desubword[0] @app.get("/") async def read_root(): gradio_interface = """ Sema """ return HTMLResponse(content=gradio_interface) @app.post("/translate_detect/") async def translate_detect_endpoint(request: Request): datad = await request.json() userinputd = datad.get("userinput") target_langd = datad.get("target_lang") dfull_date = get_time()[0] print(f"\nrequest: {dfull_date}\nTarget Language; {target_langd}, User Input: {userinputd}\n") if not userinputd or not target_langd: raise HTTPException(status_code=422, detail="Both 'userinput' and 'target_lang' are required.") source_langd, translated_text_d = translate_detect(userinputd, target_langd) dcurrent_time = get_time()[1] print(f"\nresponse: {dcurrent_time}; ... Source_language: {source_langd}, Translated Text: {translated_text_d}\n\n") return { "source_language": source_langd, "translated_text": translated_text_d[0], } @app.post("/translate_enter/") async def translate_enter_endpoint(request: Request): datae = await request.json() userinpute = datae.get("userinput") source_lange = datae.get("source_lang") target_lange = datae.get("target_lang") efull_date = get_time()[0] print(f"\nrequest: {efull_date}\nSource_language; {source_lange}, Target Language; {target_lange}, User Input: {userinpute}\n") if not userinpute or not target_lange: raise HTTPException(status_code=422, detail="'userinput' 'sourc_lang'and 'target_lang' are required.") translated_text_e = translate_enter(userinpute, source_lange, target_lange) ecurrent_time = get_time()[1] print(f"\nresponse: {ecurrent_time}; ... Translated Text: {translated_text_e}\n\n") return { "translated_text": translated_text_e, } print("\nAPI starting .......\n")