File size: 4,951 Bytes
f006f31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ce8daa8
 
 
 
f006f31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import logging.config
import re
from typing import List

import psutil
from langdetect import LangDetectException, detect

from mappingservice.constants import ENGLISH_KEYWORDS, SPANISH_KEYWORDS
from mappingservice.models import BedData, BedType

logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def log_memory_usage():
    process = psutil.Process()
    memory_info = process.memory_info()
    logging.info(f"Memory usage: RSS={memory_info.rss}, VMS={memory_info.vms}")


def safe_round(value, decimals=0):
    if isinstance(value, int) or isinstance(value, float):
        return round(value, decimals)
    
    return 0


def process_predictions(predictions, score_key="score", label_key="label"):
    return [
        {"label": pred.get(label_key, "No data"), "score": round(pred.get(score_key, 0), 3)}  # noqa: E501
        for pred in predictions if isinstance(pred, dict) and pred.get(score_key, 0) > 0
    ] or None


def parse_model_output(predictions):
    bed_data_list = []
    threshold = 0.5

    if not isinstance(predictions, list) or not all(
            isinstance(pred, dict) for pred in predictions
    ):
        return bed_data_list

    for prediction in predictions:
        type = prediction.get("type")
        count = prediction.get("count", 0)
        score = prediction.get("score", 0)

        if score > threshold and count > 0:
            if type in BedType._member_names_:
                bed_data = BedData(type=BedType[type], count=count)
                bed_data_list.append(bed_data)
            else:
                logger.debug(f"Unsupported bed type: {type}")

    return bed_data_list


def get_bed_predictions(description: str):
    logger.debug(f"Extracting bed predictions from description: {description}")

    description = description.lower()

    bed_pattern = re.compile(
        r"(\d+)?\s*\b(king|queen|double|single|twin|bunk|sofa|rollaway|futon|"
        r"cama\s*king|cama\s*queen|cama\s*double|cama\s*single|cama\s*twin|"
        r"cama\s*bunk|cama\s*sofa|cama\s*rollaway|cama\s*futon)\b\s*(beds?|camas?)", re.IGNORECASE)  # noqa: E501

    or_pattern = re.compile(
        r"\b(king|queen|double|single|twin)\s+or\s+(king|queen|double|single|twin)\b", re.IGNORECASE)  # noqa: E501

    # default_bed_type_pattern = re.compile(
    #     r"\b(double|twin|king|queen|single|doble|individual|gemela|matrimonial)\b\s+"
    #     r"\b(room|apartment|suite|habitacion|apartamento)\b", re.IGNORECASE)


    matches = bed_pattern.findall(description)
    or_matches = or_pattern.findall(description)
    # default_matches = default_bed_type_pattern.findall(description)


    bed_data_list = []
    bed_type_counts = {}

    for match in matches:
        count = int(match[0]) if match[0] else 1
        bed_type_name = match[1]
        normalized_bed_type = bed_type_name.strip().lower()
        bed_type = BedType[normalized_bed_type] if normalized_bed_type in BedType._member_names_ else None  # noqa: E501

        if bed_type:
            bed_type_counts[bed_type] = count

    for match in or_matches:
        for bed_type_name in match:
            normalized_bed_type = bed_type_name.strip().lower()
            bed_type = BedType[normalized_bed_type] if normalized_bed_type in BedType._member_names_ else None  # noqa: E501
            if bed_type:
                bed_type_counts[bed_type] = bed_type_counts.get(bed_type, 0) + 1

    for bed_type, count in bed_type_counts.items():
        bed_data_list.append(BedData(type=bed_type, count=count))

    if not bed_data_list:
        logger.warning("No valid bed data found from extracted information.")

    return bed_data_list

def extract_bed_numbers(description: str):
    bed_number_pattern = re.compile(
        r"(\d+|\bone\b|\btwo\b|\bthree\b)\s*bed", re.IGNORECASE
    )
    numbers = []
    word_to_number = {"one": 1, "two": 2, "three": 3}

    matches = bed_number_pattern.findall(description)
    for match in matches:
        number = word_to_number.get(match.lower(), match)
        numbers.append(int(number))

    return numbers


def validate_bed_data(beds: List[BedData]) -> List[BedData]:
    valid_beds = [bed for bed in beds if bed.type and bed.count > 0]
    if not valid_beds:
        logger.info(f"No valid beds found in {beds}")
    return valid_beds


def is_list_of_lists(variable):
    if not isinstance(variable, list):
        return False
    return all(isinstance(item, list) for item in variable)


def predict_language(text):
    text_lower = text.lower()

    for keyword in SPANISH_KEYWORDS:
        if keyword in text_lower:
            return "es"

    for keyword in ENGLISH_KEYWORDS:
        if keyword in text_lower:
            return "en"

    try:
        language = detect(text)
        if language in {"en", "es"}:
            return language
    except LangDetectException:
        pass

    return None