90 lines
3.3 KiB
Python
90 lines
3.3 KiB
Python
import logging
|
|
import json
|
|
import os
|
|
from pre_processing import process_record
|
|
from processing import processing
|
|
from post_processing import post_processing, post_processing_duplicate
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def load_schema(path: str):
|
|
"""Helper to read the JSON config file."""
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
|
|
def get_cluster_name(namespace: str) -> str:
|
|
"""
|
|
Fetches the clusterName for the given namespace from config.json.
|
|
Expects config.json to contain a list of { "namespace": "...", "clusterName": "..." }.
|
|
"""
|
|
response_schema = load_schema("/app/config.json") # Update path if needed
|
|
for item in response_schema:
|
|
if item.get("namespace") == namespace:
|
|
logger.info("Got the clusterName for namespace '%s'", namespace)
|
|
return item.get("clusterName")
|
|
logger.error("Provided Namespace '%s' not found in config.json.", namespace)
|
|
raise ValueError(f"Namespace '{namespace}' not found")
|
|
|
|
def __main__(
|
|
application_key: str,
|
|
application_timestamp: str,
|
|
deviceid: str,
|
|
fuzzydeviceid: str,
|
|
application_email_address: str,
|
|
hd_score_m1: float
|
|
) -> dict:
|
|
|
|
namespace = os.getenv("NAMESPACE", "staging")
|
|
base_url = "http://centurion-mlg.default.svc.cluster.local:8080/api/v1/clusters"
|
|
cluster_name = get_cluster_name(namespace)
|
|
url_post = f"{base_url}/{cluster_name}/records"
|
|
url_get = f"{base_url}/{cluster_name}/record"
|
|
|
|
data = {
|
|
"application_key": application_key,
|
|
"application_timestamp": application_timestamp,
|
|
"deviceid": deviceid,
|
|
"fuzzydeviceid": fuzzydeviceid,
|
|
"application_email_address": application_email_address,
|
|
"hd_score_m1": hd_score_m1,
|
|
}
|
|
|
|
# Step 1: Pre-process the record
|
|
pre_process_output = process_record(url_post, data)
|
|
|
|
if not pre_process_output or "error" in pre_process_output:
|
|
logger.error("Pre-processing failed.")
|
|
return {"error": "Pre-processing failed"}
|
|
|
|
# Step 2: Process the record (fetch required fields)
|
|
# post_process_output = post_processing(process_output, url_get)
|
|
# is_duplicate = pre_process_output.get("record_id") in pre_process_output.get("connected_records", [])
|
|
is_duplicate = str(pre_process_output.get("is_duplicate", "false")).strip().lower() == "true"
|
|
print(is_duplicate)
|
|
|
|
# Run the appropriate processing function
|
|
if is_duplicate:
|
|
process_output = processing(url_get, pre_process_output)
|
|
post_process_output = post_processing_duplicate(process_output, url_get)
|
|
else:
|
|
post_process_output = post_processing(pre_process_output, url_get)
|
|
|
|
# Conditionally override the keys if they are missing or None (or the string "null")
|
|
current_app_key = post_process_output.get("application_key")
|
|
if current_app_key is None:
|
|
post_process_output["application_key"] = application_key
|
|
|
|
current_hd_score = post_process_output.get("hd_score_m1")
|
|
if current_hd_score is None:
|
|
post_process_output["hd_score_m1"] = hd_score_m1
|
|
|
|
logger.info("Post Processed Record")
|
|
logger.info(post_process_output)
|
|
|
|
return post_process_output
|