map-room / mappingservice /config_consul.py
Calin Rada
init
f006f31 unverified
raw
history blame
3.18 kB
import json
import logging.config
import random
from typing import Any
import requests
from pydantic import HttpUrl, SecretStr
from pydantic.fields import FieldInfo
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
)
from requests.exceptions import HTTPError, RequestException, Timeout
logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
logger.info("Logger Configured: config_consul")
class ConsulSettings(BaseSettings):
model_config = SettingsConfigDict(env_file="consul.env", env_file_encoding="utf-8")
tgx_consul_nodes: str
tgx_consul_auth_user: str
tgx_consul_auth_pass: SecretStr
tgx_consul_token: SecretStr
class ConsulClient:
def __init__(self, settings: ConsulSettings):
logger.info(
f"Consul init:[{settings.tgx_consul_nodes}], "
f"user:[{settings.tgx_consul_auth_user}]"
)
# Random get a valid url for consul
consul_nodes = settings.tgx_consul_nodes.split(";")
consul_url: HttpUrl = random.choice(consul_nodes)
self.headers = {
"X-Consul-Token": f"{settings.tgx_consul_token.get_secret_value()}"
}
self.auth = (
settings.tgx_consul_auth_user,
settings.tgx_consul_auth_pass.get_secret_value(),
)
self.url = consul_url
self.timeout = 20
def get_key_value(self, key: str, dc: str) -> json:
url = f"{self.url}/v1/kv/{key}?dc={dc}&raw=false"
logger.info(f"Donwloading keys from Consul: [{url}, {key}]")
response = requests.get(
url=url, headers=self.headers, auth=self.auth, timeout=self.timeout
)
response.raise_for_status()
return response.json()
class ConsulConfigSettingsSource(PydanticBaseSettingsSource):
def get_field_value(
self, field: FieldInfo, field_name: str
):
pass
def prepare_field_value(
self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool
) -> Any:
pass
def __call__(self):
"""
Customize settings soucre from Consul
Needs url and secrets to access consul to retrieve settings
"""
CONSUL_KEY = "mapping/apps/room/ml-api/config"
consul_settings = ConsulSettings()
consul_cli = ConsulClient(consul_settings)
logger.info(f"Getting consul key value: [{CONSUL_KEY}]")
ret_json = {}
try:
ret_json = consul_cli.get_key_value(CONSUL_KEY, "gc-we1d")
except (Timeout, HTTPError, RequestException) as err:
logger.warn(f"Error get from Consul {err}")
return {}
if not ret_json:
logger.warn(f"Consul key not found: [{CONSUL_KEY}]", "gc-we1d")
return {}
else:
logger.info("Consul key read succesfully")
return ret_json
# convert Service Account json to string
# ret_json["tgx_pubsub_events_credentials_service_account_json"] = json.dumps(
# ret_json["tgx_pubsub_events_credentials_service_account_json"])