import logging import json import os from pre_processing import process_record from post_processing import post_processing # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger(__name__) def load_schema(path: str): """Helper to read the JSON config file.""" with open(path, "r") as f: return json.load(f) def get_cluster_name(namespace: str) -> str: """ Fetches the clusterName for the given namespace from config.json. Expects config.json to contain a list of { "namespace": "...", "clusterName": "..." }. """ response_schema = load_schema("/app/config.json") # Update path if needed for item in response_schema: if item.get("namespace") == namespace: logger.info("Got the clusterName for namespace '%s'", namespace) return item.get("clusterName") logger.error("Provided Namespace '%s' not found in config.json.", namespace) raise ValueError(f"Namespace '{namespace}' not found") def __main__( application_key: str, application_timestamp: str, deviceid: str, fuzzydeviceid: str, application_email_address: str, hd_score_m1: float, hd_score_iso_m2: float ) -> dict: namespace = os.getenv("NAMESPACE", "staging") base_url = "http://centurion-mlg.default.svc.cluster.local:8080/api/v1/clusters" # base_url = "http://localhost:8080//api/v1/clusters" cluster_name = get_cluster_name(namespace) # cluster_name = "cluster_deviceid_email_fuzzydevice_direct_new" url_post = f"{base_url}/{cluster_name}/records" url_get = f"{base_url}/{cluster_name}/record" url_get_for_graph = f"{base_url}/{cluster_name}/graph" data = { "application_key": application_key, "application_timestamp": application_timestamp, "deviceid": deviceid, "fuzzydeviceid": fuzzydeviceid, "application_email_address": application_email_address, "hd_score_m1": hd_score_m1, } # Step 1: Pre-process the record pre_process_output = process_record(url_post, data) if not pre_process_output or "error" in pre_process_output: logger.error("Pre-processing failed.") return {"error": "Pre-processing failed"} # Step 2: Process the record (fetch required fields) # post_process_output = post_processing(process_output, url_get) # is_duplicate = pre_process_output.get("record_id") in pre_process_output.get("connected_records", []) is_duplicate = str(pre_process_output.get("is_duplicate", "false")).strip().lower() == "true" # print(is_duplicate) # Run the appropriate processing function # if is_duplicate: # process_output = processing(url_get, pre_process_output) # post_process_output = post_processing_duplicate(process_output, url_get) # else: # post_process_output = post_processing(pre_process_output, url_get) post_process_output = post_processing(pre_process_output, url_get, url_get_for_graph) # Conditionally override the keys if they are missing or None (or the string "null") current_app_key = post_process_output.get("application_key") if current_app_key is None: post_process_output["application_key"] = application_key current_hd_score = post_process_output.get("hd_score_m1") if current_hd_score is None: post_process_output["hd_score_m1"] = hd_score_m1 post_process_output["hd_score_iso_m2"] = hd_score_iso_m2 logger.info("Post Processed Record") logger.info(post_process_output) return post_process_output