Spaces:
Paused
Paused
File size: 3,180 Bytes
f006f31 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
import json
import logging.config
import random
from typing import Any
import requests
from pydantic import HttpUrl, SecretStr
from pydantic.fields import FieldInfo
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
)
from requests.exceptions import HTTPError, RequestException, Timeout
logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
logger.info("Logger Configured: config_consul")
class ConsulSettings(BaseSettings):
model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8")
tgx_consul_nodes: str
tgx_consul_auth_user: str
tgx_consul_auth_pass: SecretStr
tgx_consul_token: SecretStr
class ConsulClient:
def __init__(self, settings: ConsulSettings):
logger.info(
f"Consul init:[{settings.tgx_consul_nodes}], "
f"user:[{settings.tgx_consul_auth_user}]"
)
# Random get a valid url for consul
consul_nodes = settings.tgx_consul_nodes.split(";")
consul_url: HttpUrl = random.choice(consul_nodes)
self.headers = {
"X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}"
}
self.auth = (
settings.tgx_consul_auth_user,
settings.tgx_consul_auth_pass.get_secret_value(),
)
self.url = consul_url
self.timeout = 20
def get_key_value(self, key: str, dc: str) -> json:
url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false"
logger.info(f"Donwloading keys from Consul: [{url}, {key}]")
response = requests.get(
url=url, headers=self.headers, auth=self.auth, timeout=self.timeout
)
response.raise_for_status()
return response.json()
class ConsulConfigSettingsSource(PydanticBaseSettingsSource):
def get_field_value(
self, field: FieldInfo, field_name: str
):
pass
def prepare_field_value(
self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool
) -> Any:
pass
def __call__(self):
"""
Customize settings soucre from Consul
Needs url and secrets to access consul to retrieve settings
"""
CONSUL_KEY = "mapping/apps/room/ml-api/config"
consul_settings = ConsulSettings()
consul_cli = ConsulClient(consul_settings)
logger.info(f"Getting consul key value: [{CONSUL_KEY}]")
ret_json = {}
try:
ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d")
except (Timeout, HTTPError, RequestException) as err:
logger.warn(f"Error get from Consul {err}")
return {}
if not ret_json:
logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d")
return {}
else:
logger.info("Consul key read succesfully")
return ret_json
# convert Service Account json to string
# ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps(
# ret_json["tgx_pubsub_events_credentials_service_account_json"])
|