File size: 3,180 Bytes
f006f31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import json
import logging.config
import random
from typing import Any

import requests
from pydantic import HttpUrl, SecretStr
from pydantic.fields import FieldInfo
from pydantic_settings import (
    BaseSettings,
    PydanticBaseSettingsSource,
    SettingsConfigDict,
)
from requests.exceptions import HTTPError, RequestException, Timeout

logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
logger.info("Logger Configured: config_consul")


class ConsulSettings(BaseSettings):
    model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8")
    tgx_consul_nodes: str
    tgx_consul_auth_user: str
    tgx_consul_auth_pass: SecretStr
    tgx_consul_token: SecretStr


class ConsulClient:
    def __init__(self, settings: ConsulSettings):
        logger.info(
            f"Consul init:[{settings.tgx_consul_nodes}], "
            f"user:[{settings.tgx_consul_auth_user}]"
        )

        # Random get a valid url for consul
        consul_nodes = settings.tgx_consul_nodes.split(";")
        consul_url: HttpUrl = random.choice(consul_nodes)

        self.headers = {
            "X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}"
        }
        self.auth = (
            settings.tgx_consul_auth_user,
            settings.tgx_consul_auth_pass.get_secret_value(),
        )
        self.url = consul_url
        self.timeout = 20

    def get_key_value(self, key: str, dc: str) -> json:
        url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false"
        logger.info(f"Donwloading keys from Consul: [{url}, {key}]")
        response = requests.get(
            url=url, headers=self.headers, auth=self.auth, timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()


class ConsulConfigSettingsSource(PydanticBaseSettingsSource):
    def get_field_value(
            self, field: FieldInfo, field_name: str
    ):
        pass

    def prepare_field_value(
            self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool
    ) -> Any:
        pass

    def __call__(self):
        """
        Customize settings soucre from Consul
        Needs url and secrets to access consul to retrieve settings
        """
        CONSUL_KEY = "mapping/apps/room/ml-api/config"

        consul_settings = ConsulSettings()
        consul_cli = ConsulClient(consul_settings)

        logger.info(f"Getting consul key value: [{CONSUL_KEY}]")
        ret_json = {}
        try:
            ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d")
        except (Timeout, HTTPError, RequestException) as err:
            logger.warn(f"Error get from Consul {err}")
            return {}

        if not ret_json:
            logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d")
            return {}
        else:
            logger.info("Consul key read succesfully")
            return ret_json
        # convert Service Account json to string
        # ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps(
        #    ret_json["tgx_pubsub_events_credentials_service_account_json"])