admin user 88b296a25a
All checks were successful
Build and Push Docker Image / test (push) Successful in 11s
Build and Push Docker Image / build_and_push (push) Successful in 24s
Upload files to "/"
2025-07-11 15:10:24 +00:00

90 lines
3.4 KiB
Python

import logging
import json
import os
from pre_processing import process_record
from post_processing import post_processing
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def load_schema(path: str):
"""Helper to read the JSON config file."""
with open(path, "r") as f:
return json.load(f)
def get_cluster_name(namespace: str) -> str:
"""
Fetches the clusterName for the given namespace from config.json.
Expects config.json to contain a list of { "namespace": "...", "clusterName": "..." }.
"""
response_schema = load_schema("/app/config.json") # Update path if needed
for item in response_schema:
if item.get("namespace") == namespace:
logger.info("Got the clusterName for namespace '%s'", namespace)
return item.get("clusterName")
logger.error("Provided Namespace '%s' not found in config.json.", namespace)
raise ValueError(f"Namespace '{namespace}' not found")
def __main__(
application_key: str,
application_timestamp: str,
deviceid: str,
fuzzydeviceid: str,
application_email_address: str,
hd_score_m1: float
) -> dict:
namespace = os.getenv("NAMESPACE", "staging")
base_url = "http://centurion-mlg.default.svc.cluster.local:8080/api/v1/clusters"
cluster_name = get_cluster_name(namespace)
url_post = f"{base_url}/{cluster_name}/records"
url_get = f"{base_url}/{cluster_name}/record"
url_get_for_graph = f"{base_url}/{cluster_name}/graph"
data = {
"application_key": application_key,
"application_timestamp": application_timestamp,
"deviceid": deviceid,
"fuzzydeviceid": fuzzydeviceid,
"application_email_address": application_email_address,
"hd_score_m1": hd_score_m1,
}
# Step 1: Pre-process the record
pre_process_output = process_record(url_post, data)
if not pre_process_output or "error" in pre_process_output:
logger.error("Pre-processing failed.")
return {"error": "Pre-processing failed"}
# Step 2: Process the record (fetch required fields)
# post_process_output = post_processing(process_output, url_get)
# is_duplicate = pre_process_output.get("record_id") in pre_process_output.get("connected_records", [])
is_duplicate = str(pre_process_output.get("is_duplicate", "false")).strip().lower() == "true"
# print(is_duplicate)
# Run the appropriate processing function
# if is_duplicate:
# process_output = processing(url_get, pre_process_output)
# post_process_output = post_processing_duplicate(process_output, url_get)
# else:
# post_process_output = post_processing(pre_process_output, url_get)
post_process_output = post_processing(pre_process_output, url_get, url_get_for_graph)
# Conditionally override the keys if they are missing or None (or the string "null")
current_app_key = post_process_output.get("application_key")
if current_app_key is None:
post_process_output["application_key"] = application_key
current_hd_score = post_process_output.get("hd_score_m1")
if current_hd_score is None:
post_process_output["hd_score_m1"] = hd_score_m1
logger.info("Post Processed Record")
logger.info(post_process_output)
return post_process_output