import json import logging.config import random from typing import Any import requests from pydantic import HttpUrl, SecretStr from pydantic.fields import FieldInfo from pydantic_settings import ( BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, ) from requests.exceptions import HTTPError, RequestException, Timeout logging.config.fileConfig("logging.conf", disable_existing_loggers=False) logger = logging.getLogger(__name__) logger.info("Logger Configured: config_consul") class ConsulSettings(BaseSettings): model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8") tgx_consul_nodes: str tgx_consul_auth_user: str tgx_consul_auth_pass: SecretStr tgx_consul_token: SecretStr class ConsulClient: def __init__(self, settings: ConsulSettings): logger.info( f"Consul init:[{settings.tgx_consul_nodes}], " f"user:[{settings.tgx_consul_auth_user}]" ) # Random get a valid url for consul consul_nodes = settings.tgx_consul_nodes.split(";") consul_url: HttpUrl = random.choice(consul_nodes) self.headers = { "X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}" } self.auth = ( settings.tgx_consul_auth_user, settings.tgx_consul_auth_pass.get_secret_value(), ) self.url = consul_url self.timeout = 20 def get_key_value(self, key: str, dc: str) -> json: url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false" logger.info(f"Donwloading keys from Consul: [{url}, {key}]") response = requests.get( url=url, headers=self.headers, auth=self.auth, timeout=self.timeout ) response.raise_for_status() return response.json() class ConsulConfigSettingsSource(PydanticBaseSettingsSource): def get_field_value( self, field: FieldInfo, field_name: str ): pass def prepare_field_value( self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool ) -> Any: pass def __call__(self): """ Customize settings soucre from Consul Needs url and secrets to access consul to retrieve settings """ CONSUL_KEY = "mapping/apps/room/ml-api/config" consul_settings = ConsulSettings() consul_cli = ConsulClient(consul_settings) logger.info(f"Getting consul key value: [{CONSUL_KEY}]") ret_json = {} try: ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d") except (Timeout, HTTPError, RequestException) as err: logger.warn(f"Error get from Consul {err}") return {} if not ret_json: logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d") return {} else: logger.info("Consul key read succesfully") return ret_json # convert Service Account json to string # ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps( # ret_json["tgx_pubsub_events_credentials_service_account_json"])