From 6d94849e2dacea6f9722f240210dd9e92b0bf7f8 Mon Sep 17 00:00:00 2001 From: admin user Date: Wed, 12 Mar 2025 16:14:28 +0000 Subject: [PATCH] Graph lookup block --- README.md | 4 +- block.py | 87 ++++++++++++++++++++++++++++++++++---------- post_processing.py | 77 +++++++++++++++++++++++++++++++++++++++ pre_processing.py | 28 ++++++++++++++ processing.py | 28 ++++++++++++++ request_schema.json | 33 ++++++++++++++++- requirements.txt | 2 +- response_schema.json | 24 +++++++++++- 8 files changed, 260 insertions(+), 23 deletions(-) create mode 100644 post_processing.py create mode 100644 pre_processing.py create mode 100644 processing.py diff --git a/README.md b/README.md index 59a3efc..cc23abd 100644 --- a/README.md +++ b/README.md @@ -1 +1,3 @@ -**Hello world!!!** +# MLG Lookup + +Lookup Graph API diff --git a/block.py b/block.py index 3b227f9..74af462 100644 --- a/block.py +++ b/block.py @@ -1,21 +1,70 @@ -@flowx_block -def example_function(request: dict) -> dict: +import logging +from pre_processing import process_record +from processing import processing +from post_processing import post_processing, post_processing_duplicate - # Processing logic here... +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) - return { - "meta_info": [ - { - "name": "created_date", - "type": "string", - "value": "2024-11-05" - } - ], - "fields": [ - { - "name": "", - "type": "", - "value": "" - } - ] - } +# API URLs +# base_url = "http://localhost:8080/api/v1/clusters" +base_url = "http://centurion-mlgraph.default.svc.cluster.local:8080/api/v1/clusters" +cluster_name = "cluster_deviceid_email_fuzzydevice_direct_new" +url_post = f"{base_url}/{cluster_name}/records" +url_get = f"{base_url}/{cluster_name}/record" + +def __main__( + application_key: str, + application_timestamp: str, + deviceid: str, + fuzzydeviceid: str, + application_email_address: str, + hd_score_m1: float + ) -> dict: + + data = { + "application_key": application_key, + "application_timestamp": application_timestamp, + "deviceid": deviceid, + "fuzzydeviceid": fuzzydeviceid, + "application_email_address": application_email_address, + "hd_score_m1": hd_score_m1, + } + + # Step 1: Pre-process the record + pre_process_output = process_record(url_post, data) + + if not pre_process_output or "error" in pre_process_output: + logger.error("Pre-processing failed.") + return {"error": "Pre-processing failed"} + + # Step 2: Process the record (fetch required fields) + # post_process_output = post_processing(process_output, url_get) + # is_duplicate = pre_process_output.get("record_id") in pre_process_output.get("connected_records", []) + is_duplicate = str(pre_process_output.get("is_duplicate", "false")).strip().lower() == "true" + print(is_duplicate) + + # Run the appropriate processing function + if is_duplicate: + process_output = processing(url_get, pre_process_output) + post_process_output = post_processing_duplicate(process_output, url_get) + else: + post_process_output = post_processing(pre_process_output, url_get) + + # Conditionally override the keys if they are missing or None (or the string "null") + current_app_key = post_process_output.get("application_key") + if current_app_key is None: + post_process_output["application_key"] = application_key + + current_hd_score = post_process_output.get("hd_score_m1") + if current_hd_score is None: + post_process_output["hd_score_m1"] = hd_score_m1 + + logger.info("Post Processed Record") + logger.info(post_process_output) + + return post_process_output diff --git a/post_processing.py b/post_processing.py new file mode 100644 index 0000000..59fc1df --- /dev/null +++ b/post_processing.py @@ -0,0 +1,77 @@ +import requests +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def fetch_data(url, record_id): + """Fetches data for a given record_id from the API.""" + try: + response = requests.get(f"{url}/{record_id}", headers={"Content-Type": "application/json"}) + if response.status_code == 200: + output= response.json() + return output + else: + logger.error(f"Failed to fetch {record_id}: {response.status_code}") + return None + except requests.RequestException as e: + logger.error(f"Error fetching {record_id}: {e}") + return None + +def post_processing_duplicate(data, url): + """Main function to fetch and extract required fields.""" + record_id = data.get('record_id') + connected_records = data.get('connected_records', []) + + # Fetch main record data + record_data = fetch_data(url, record_id) + application_key = None + hd_score_m1 = None + if record_data: + application_key = record_data['data'].get('application_key') + hd_score_m1 = record_data['data'].get('hd_score_m1') + + # Fetch application_key for connected records + connected_keys = [ + fetch_data(url, rec_id)['data'].get('application_key') + for rec_id in connected_records + if fetch_data(url, rec_id) and fetch_data(url, rec_id)['data'].get('application_key') + ] + + return { + "application_key": application_key, + "hd_score_m1": hd_score_m1, + "connected_application_keys": connected_keys + } + + + +def post_processing(data, url): + """Main function to fetch and extract required fields.""" + record_id = data.get('hd_key') + connected_records = data.get('matches', []) + [record_id] # Matches + hd_key + + # Fetch main record data + record_data = fetch_data(url, record_id) + application_key = None + hd_score_m1 = None + if record_data: + application_key = record_data['data'].get('application_key') + hd_score_m1 = record_data['data'].get('hd_score_m1') + + # Fetch application_key for connected records + connected_keys = [ + fetch_data(url, rec_id)['data'].get('application_key') + for rec_id in connected_records + if fetch_data(url, rec_id) and fetch_data(url, rec_id)['data'].get('application_key') + ] + + return { + "application_key": application_key, + "hd_score_m1": hd_score_m1, + "connected_application_keys": connected_keys + } diff --git a/pre_processing.py b/pre_processing.py new file mode 100644 index 0000000..925a025 --- /dev/null +++ b/pre_processing.py @@ -0,0 +1,28 @@ +import requests +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def process_record(url, record): + try: + response = requests.post( + url, + json={"record": record}, + headers={"Content-Type": "application/json"} + ) + + if response.status_code == 200: + output = response.json() + logger.info(f"Pre Processed record: {record['application_key']} - Response: {output}") + return output + else: + logger.error(f"Failed to process record {record['application_key']}. Status: {response.status_code}, Response: {response.text}") + return {"error": response.text} + except Exception as e: + logger.error(f"Error processing record {record['application_key']}: {str(e)}") + return {"error": str(e)} diff --git a/processing.py b/processing.py new file mode 100644 index 0000000..a00886a --- /dev/null +++ b/processing.py @@ -0,0 +1,28 @@ +import requests +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def processing(url, data): + record_id = data.get('hd_key') + try: + response = requests.get( + f"{url}/{record_id}", + headers={"Content-Type": "application/json"} + ) + + if response.status_code == 200: + output = response.json() + logger.info(f"Processed record: {record_id} - Response: {output}") + return output + else: + logger.error(f"Failed to process record {record_id}. Status: {response.status_code}, Response: {response.text}") + return {"error": response.text} + except Exception as e: + logger.error(f"Error processing record {record_id}: {str(e)}") + return {"error": str(e)} diff --git a/request_schema.json b/request_schema.json index 0967ef4..8277bcc 100644 --- a/request_schema.json +++ b/request_schema.json @@ -1 +1,32 @@ -{} +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "application_key": { + "type": ["string", "null"], + "description": "Unique identifier for the application." + }, + "application_timestamp": { + "type": ["string", "null"], + "description": "Timestamp of the application." + }, + "deviceid": { + "type": ["string", "null"], + "description": "Unique identifier for the device used in the application." + }, + "fuzzydeviceid": { + "type": ["string", "null"], + "description": "Anonymized or hashed identifier for the device." + }, + "application_email_address": { + "type": ["string", "null"], + "description": "Email address associated with the application." + }, + "hd_score_m1": { + "type": ["number", "null"], + "description": "HD Score M1 value associated with the application." + } + }, + "required": [] + } + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0967ef4..b4c882a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -{} +requests == 2.32.3 \ No newline at end of file diff --git a/response_schema.json b/response_schema.json index 0967ef4..36f14ad 100644 --- a/response_schema.json +++ b/response_schema.json @@ -1 +1,23 @@ -{} +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "application_key": { + "type": ["string", "null"], + "description": "Unique identifier for the application." + }, + "hd_score_m1": { + "type": ["number", "null"], + "description": "HD fraud score M1." + }, + "connected_application_keys": { + "type": "array", + "items": { + "type": ["string", "null"] + }, + "description": "List of connected application keys associated with this application." + } + }, + "required": [] + } + \ No newline at end of file