map-room / mappingservice /config_consul.py
Calin Rada
init
f006f31 unverified
import json
import logging.config
import random
from typing import Any
import requests
from pydantic import HttpUrl, SecretStr
from pydantic.fields import FieldInfo
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
)
from requests.exceptions import HTTPError, RequestException, Timeout
logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
logger.info("Logger Configured: config_consul")
class ConsulSettings(BaseSettings):
model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8")
tgx_consul_nodes: str
tgx_consul_auth_user: str
tgx_consul_auth_pass: SecretStr
tgx_consul_token: SecretStr
class ConsulClient:
def __init__(self, settings: ConsulSettings):
logger.info(
f"Consul init:[{settings.tgx_consul_nodes}], "
f"user:[{settings.tgx_consul_auth_user}]"
)
# Random get a valid url for consul
consul_nodes = settings.tgx_consul_nodes.split(";")
consul_url: HttpUrl = random.choice(consul_nodes)
self.headers = {
"X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}"
}
self.auth = (
settings.tgx_consul_auth_user,
settings.tgx_consul_auth_pass.get_secret_value(),
)
self.url = consul_url
self.timeout = 20
def get_key_value(self, key: str, dc: str) -> json:
url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false"
logger.info(f"Donwloading keys from Consul: [{url}, {key}]")
response = requests.get(
url=url, headers=self.headers, auth=self.auth, timeout=self.timeout
)
response.raise_for_status()
return response.json()
class ConsulConfigSettingsSource(PydanticBaseSettingsSource):
def get_field_value(
self, field: FieldInfo, field_name: str
):
pass
def prepare_field_value(
self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool
) -> Any:
pass
def __call__(self):
"""
Customize settings soucre from Consul
Needs url and secrets to access consul to retrieve settings
"""
CONSUL_KEY = "mapping/apps/room/ml-api/config"
consul_settings = ConsulSettings()
consul_cli = ConsulClient(consul_settings)
logger.info(f"Getting consul key value: [{CONSUL_KEY}]")
ret_json = {}
try:
ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d")
except (Timeout, HTTPError, RequestException) as err:
logger.warn(f"Error get from Consul {err}")
return {}
if not ret_json:
logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d")
return {}
else:
logger.info("Consul key read succesfully")
return ret_json
# convert Service Account json to string
# ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps(
# ret_json["tgx_pubsub_events_credentials_service_account_json"])