Spaces:
Paused
Paused
import json | |
import logging.config | |
import random | |
from typing import Any | |
import requests | |
from pydantic import HttpUrl, SecretStr | |
from pydantic.fields import FieldInfo | |
from pydantic_settings import ( | |
BaseSettings, | |
PydanticBaseSettingsSource, | |
SettingsConfigDict, | |
) | |
from requests.exceptions import HTTPError, RequestException, Timeout | |
logging.config.fileConfig("logging.conf", disable_existing_loggers=False) | |
logger = logging.getLogger(__name__) | |
logger.info("Logger Configured: config_consul") | |
class ConsulSettings(BaseSettings): | |
model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8") | |
tgx_consul_nodes: str | |
tgx_consul_auth_user: str | |
tgx_consul_auth_pass: SecretStr | |
tgx_consul_token: SecretStr | |
class ConsulClient: | |
def __init__(self, settings: ConsulSettings): | |
logger.info( | |
f"Consul init:[{settings.tgx_consul_nodes}], " | |
f"user:[{settings.tgx_consul_auth_user}]" | |
) | |
# Random get a valid url for consul | |
consul_nodes = settings.tgx_consul_nodes.split(";") | |
consul_url: HttpUrl = random.choice(consul_nodes) | |
self.headers = { | |
"X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}" | |
} | |
self.auth = ( | |
settings.tgx_consul_auth_user, | |
settings.tgx_consul_auth_pass.get_secret_value(), | |
) | |
self.url = consul_url | |
self.timeout = 20 | |
def get_key_value(self, key: str, dc: str) -> json: | |
url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false" | |
logger.info(f"Donwloading keys from Consul: [{url}, {key}]") | |
response = requests.get( | |
url=url, headers=self.headers, auth=self.auth, timeout=self.timeout | |
) | |
response.raise_for_status() | |
return response.json() | |
class ConsulConfigSettingsSource(PydanticBaseSettingsSource): | |
def get_field_value( | |
self, field: FieldInfo, field_name: str | |
): | |
pass | |
def prepare_field_value( | |
self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool | |
) -> Any: | |
pass | |
def __call__(self): | |
""" | |
Customize settings soucre from Consul | |
Needs url and secrets to access consul to retrieve settings | |
""" | |
CONSUL_KEY = "mapping/apps/room/ml-api/config" | |
consul_settings = ConsulSettings() | |
consul_cli = ConsulClient(consul_settings) | |
logger.info(f"Getting consul key value: [{CONSUL_KEY}]") | |
ret_json = {} | |
try: | |
ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d") | |
except (Timeout, HTTPError, RequestException) as err: | |
logger.warn(f"Error get from Consul {err}") | |
return {} | |
if not ret_json: | |
logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d") | |
return {} | |
else: | |
logger.info("Consul key read succesfully") | |
return ret_json | |
# convert Service Account json to string | |
# ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps( | |
# ret_json["tgx_pubsub_events_credentials_service_account_json"]) | |