Spaces:
Paused
Paused
import logging.config | |
import re | |
from typing import List | |
import psutil | |
from langdetect import LangDetectException, detect | |
from mappingservice.constants import ENGLISH_KEYWORDS, SPANISH_KEYWORDS | |
from mappingservice.models import BedData, BedType | |
logging.basicConfig( | |
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
def log_memory_usage(): | |
process = psutil.Process() | |
memory_info = process.memory_info() | |
logging.info(f"Memory usage: RSS={memory_info.rss}, VMS={memory_info.vms}") | |
def safe_round(value, decimals=0): | |
if isinstance(value, int) or isinstance(value, float): | |
return round(value, decimals) | |
return 0 | |
def process_predictions(predictions, score_key="score", label_key="label"): | |
return [ | |
{"label": pred.get(label_key, "No data"), "score": round(pred.get(score_key, 0), 3)} # noqa: E501 | |
for pred in predictions if isinstance(pred, dict) and pred.get(score_key, 0) > 0 | |
] or None | |
def parse_model_output(predictions): | |
bed_data_list = [] | |
threshold = 0.5 | |
if not isinstance(predictions, list) or not all( | |
isinstance(pred, dict) for pred in predictions | |
): | |
return bed_data_list | |
for prediction in predictions: | |
type = prediction.get("type") | |
count = prediction.get("count", 0) | |
score = prediction.get("score", 0) | |
if score > threshold and count > 0: | |
if type in BedType._member_names_: | |
bed_data = BedData(type=BedType[type], count=count) | |
bed_data_list.append(bed_data) | |
else: | |
logger.debug(f"Unsupported bed type: {type}") | |
return bed_data_list | |
def get_bed_predictions(description: str): | |
logger.debug(f"Extracting bed predictions from description: {description}") | |
description = description.lower() | |
bed_pattern = re.compile( | |
r"(\d+)?\s*\b(king|queen|double|single|twin|bunk|sofa|rollaway|futon|" | |
r"cama\s*king|cama\s*queen|cama\s*double|cama\s*single|cama\s*twin|" | |
r"cama\s*bunk|cama\s*sofa|cama\s*rollaway|cama\s*futon)\b\s*(beds?|camas?)", re.IGNORECASE) # noqa: E501 | |
or_pattern = re.compile( | |
r"\b(king|queen|double|single|twin)\s+or\s+(king|queen|double|single|twin)\b", re.IGNORECASE) # noqa: E501 | |
# default_bed_type_pattern = re.compile( | |
# r"\b(double|twin|king|queen|single|doble|individual|gemela|matrimonial)\b\s+" | |
# r"\b(room|apartment|suite|habitacion|apartamento)\b", re.IGNORECASE) | |
matches = bed_pattern.findall(description) | |
or_matches = or_pattern.findall(description) | |
# default_matches = default_bed_type_pattern.findall(description) | |
bed_data_list = [] | |
bed_type_counts = {} | |
for match in matches: | |
count = int(match[0]) if match[0] else 1 | |
bed_type_name = match[1] | |
normalized_bed_type = bed_type_name.strip().lower() | |
bed_type = BedType[normalized_bed_type] if normalized_bed_type in BedType._member_names_ else None # noqa: E501 | |
if bed_type: | |
bed_type_counts[bed_type] = count | |
for match in or_matches: | |
for bed_type_name in match: | |
normalized_bed_type = bed_type_name.strip().lower() | |
bed_type = BedType[normalized_bed_type] if normalized_bed_type in BedType._member_names_ else None # noqa: E501 | |
if bed_type: | |
bed_type_counts[bed_type] = bed_type_counts.get(bed_type, 0) + 1 | |
for bed_type, count in bed_type_counts.items(): | |
bed_data_list.append(BedData(type=bed_type, count=count)) | |
if not bed_data_list: | |
logger.warning("No valid bed data found from extracted information.") | |
return bed_data_list | |
def extract_bed_numbers(description: str): | |
bed_number_pattern = re.compile( | |
r"(\d+|\bone\b|\btwo\b|\bthree\b)\s*bed", re.IGNORECASE | |
) | |
numbers = [] | |
word_to_number = {"one": 1, "two": 2, "three": 3} | |
matches = bed_number_pattern.findall(description) | |
for match in matches: | |
number = word_to_number.get(match.lower(), match) | |
numbers.append(int(number)) | |
return numbers | |
def validate_bed_data(beds: List[BedData]) -> List[BedData]: | |
valid_beds = [bed for bed in beds if bed.type and bed.count > 0] | |
if not valid_beds: | |
logger.info(f"No valid beds found in {beds}") | |
return valid_beds | |
def is_list_of_lists(variable): | |
if not isinstance(variable, list): | |
return False | |
return all(isinstance(item, list) for item in variable) | |
def predict_language(text): | |
text_lower = text.lower() | |
for keyword in SPANISH_KEYWORDS: | |
if keyword in text_lower: | |
return "es" | |
for keyword in ENGLISH_KEYWORDS: | |
if keyword in text_lower: | |
return "en" | |
try: | |
language = detect(text) | |
if language in {"en", "es"}: | |
return language | |
except LangDetectException: | |
pass | |
return None | |