Compare commits
7 Commits
main
...
mlg-lookup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d067ca5cb | ||
| 204f2d50c7 | |||
|
|
2e162d9813 | ||
| 88b296a25a | |||
| a0e78fac57 | |||
| 43e4c2f412 | |||
| 6d94849e2d |
111
block.py
111
block.py
@ -1,21 +1,94 @@
|
|||||||
@flowx_block
|
import logging
|
||||||
def example_function(request: dict) -> dict:
|
import json
|
||||||
|
import os
|
||||||
|
from pre_processing import process_record
|
||||||
|
from post_processing import post_processing
|
||||||
|
|
||||||
# Processing logic here...
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
return {
|
def load_schema(path: str):
|
||||||
"meta_info": [
|
"""Helper to read the JSON config file."""
|
||||||
{
|
with open(path, "r") as f:
|
||||||
"name": "created_date",
|
return json.load(f)
|
||||||
"type": "string",
|
|
||||||
"value": "2024-11-05"
|
def get_cluster_name(namespace: str) -> str:
|
||||||
}
|
"""
|
||||||
],
|
Fetches the clusterName for the given namespace from config.json.
|
||||||
"fields": [
|
Expects config.json to contain a list of { "namespace": "...", "clusterName": "..." }.
|
||||||
{
|
"""
|
||||||
"name": "",
|
response_schema = load_schema("/app/config.json") # Update path if needed
|
||||||
"type": "",
|
for item in response_schema:
|
||||||
"value": ""
|
if item.get("namespace") == namespace:
|
||||||
}
|
logger.info("Got the clusterName for namespace '%s'", namespace)
|
||||||
]
|
return item.get("clusterName")
|
||||||
}
|
logger.error("Provided Namespace '%s' not found in config.json.", namespace)
|
||||||
|
raise ValueError(f"Namespace '{namespace}' not found")
|
||||||
|
|
||||||
|
def __main__(
|
||||||
|
application_key: str,
|
||||||
|
application_timestamp: str,
|
||||||
|
deviceid: str,
|
||||||
|
fuzzydeviceid: str,
|
||||||
|
application_email_address: str,
|
||||||
|
hd_score_m1: float,
|
||||||
|
hd_score_iso_m2: float
|
||||||
|
) -> dict:
|
||||||
|
|
||||||
|
namespace = os.getenv("NAMESPACE", "staging")
|
||||||
|
base_url = "http://centurion-mlg.default.svc.cluster.local:8080/api/v1/clusters"
|
||||||
|
# base_url = "http://localhost:8080//api/v1/clusters"
|
||||||
|
cluster_name = get_cluster_name(namespace)
|
||||||
|
# cluster_name = "cluster_deviceid_email_fuzzydevice_direct_new"
|
||||||
|
url_post = f"{base_url}/{cluster_name}/records"
|
||||||
|
url_get = f"{base_url}/{cluster_name}/record"
|
||||||
|
url_get_for_graph = f"{base_url}/{cluster_name}/graph"
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"application_key": application_key,
|
||||||
|
"application_timestamp": application_timestamp,
|
||||||
|
"deviceid": deviceid,
|
||||||
|
"fuzzydeviceid": fuzzydeviceid,
|
||||||
|
"application_email_address": application_email_address,
|
||||||
|
"hd_score_m1": hd_score_m1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Step 1: Pre-process the record
|
||||||
|
pre_process_output = process_record(url_post, data)
|
||||||
|
|
||||||
|
if not pre_process_output or "error" in pre_process_output:
|
||||||
|
logger.error("Pre-processing failed.")
|
||||||
|
return {"error": "Pre-processing failed"}
|
||||||
|
|
||||||
|
# Step 2: Process the record (fetch required fields)
|
||||||
|
# post_process_output = post_processing(process_output, url_get)
|
||||||
|
# is_duplicate = pre_process_output.get("record_id") in pre_process_output.get("connected_records", [])
|
||||||
|
is_duplicate = str(pre_process_output.get("is_duplicate", "false")).strip().lower() == "true"
|
||||||
|
# print(is_duplicate)
|
||||||
|
|
||||||
|
# Run the appropriate processing function
|
||||||
|
# if is_duplicate:
|
||||||
|
# process_output = processing(url_get, pre_process_output)
|
||||||
|
# post_process_output = post_processing_duplicate(process_output, url_get)
|
||||||
|
# else:
|
||||||
|
# post_process_output = post_processing(pre_process_output, url_get)
|
||||||
|
post_process_output = post_processing(pre_process_output, url_get, url_get_for_graph)
|
||||||
|
# Conditionally override the keys if they are missing or None (or the string "null")
|
||||||
|
current_app_key = post_process_output.get("application_key")
|
||||||
|
if current_app_key is None:
|
||||||
|
post_process_output["application_key"] = application_key
|
||||||
|
|
||||||
|
current_hd_score = post_process_output.get("hd_score_m1")
|
||||||
|
if current_hd_score is None:
|
||||||
|
post_process_output["hd_score_m1"] = hd_score_m1
|
||||||
|
|
||||||
|
post_process_output["hd_score_iso_m2"] = hd_score_iso_m2
|
||||||
|
|
||||||
|
logger.info("Post Processed Record")
|
||||||
|
logger.info(post_process_output)
|
||||||
|
|
||||||
|
return post_process_output
|
||||||
|
|||||||
14
config.json
Normal file
14
config.json
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"namespace": "staging",
|
||||||
|
"clusterName": "cluster_deviceid_email_fuzzydevice_direct_new"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"namespace": "production",
|
||||||
|
"clusterName": "cluster_deviceid_email_fuzzydevice_direct_new_prod"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"namespace": "production-beta",
|
||||||
|
"clusterName": "cluster_deviceid_email_fuzzydevice_direct_new_prod"
|
||||||
|
}
|
||||||
|
]
|
||||||
76
post_processing.py
Normal file
76
post_processing.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def fetch_data(url, record_id):
|
||||||
|
"""Fetches data for a given record_id from the API."""
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{url}/{record_id}", headers={"Content-Type": "application/json"})
|
||||||
|
if response.status_code == 200:
|
||||||
|
output= response.json()
|
||||||
|
return output
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to fetch {record_id}: {response.status_code}")
|
||||||
|
return None
|
||||||
|
except requests.RequestException as e:
|
||||||
|
logger.error(f"Error fetching {record_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fetch_all_nodes_data(url, cluster_id):
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{url}/{cluster_id}", headers={"Content-Type": "application/json"})
|
||||||
|
if response.status_code == 200:
|
||||||
|
output= response.json()
|
||||||
|
return output
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to fetch {cluster_id}: {response.status_code}")
|
||||||
|
return None
|
||||||
|
except requests.RequestException as e:
|
||||||
|
logger.error(f"Error fetching {cluster_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def post_processing(data, url, url_get_for_graph):
|
||||||
|
"""Main function to fetch and extract required fields."""
|
||||||
|
record_id = data.get('hd_key')
|
||||||
|
# connected_records = data.get('matches', []) + [record_id] # Matches + hd_key
|
||||||
|
cluster_id = data.get('cluster_id')
|
||||||
|
|
||||||
|
# Fetch main record data
|
||||||
|
application_key = None
|
||||||
|
hd_score_m1 = None
|
||||||
|
if record_id:
|
||||||
|
record_data = fetch_data(url, record_id)
|
||||||
|
if record_data and 'data' in record_data:
|
||||||
|
application_key = record_data['data'].get('application_key')
|
||||||
|
hd_score_m1 = record_data['data'].get('hd_score_m1')
|
||||||
|
|
||||||
|
# Fetch application_key for connected records
|
||||||
|
connected_keys = []
|
||||||
|
if cluster_id:
|
||||||
|
response_obj = fetch_all_nodes_data(url_get_for_graph, cluster_id)
|
||||||
|
if response_obj and "nodes" in response_obj:
|
||||||
|
# Extract and filter application_keys
|
||||||
|
all_keys = [
|
||||||
|
node.get('data', {}).get('application_key')
|
||||||
|
for node in response_obj['nodes']
|
||||||
|
if node.get('data') and node['data'].get('application_key') is not None
|
||||||
|
]
|
||||||
|
# Exclude the current application_key and limit to 500 entries
|
||||||
|
connected_keys = [
|
||||||
|
key for key in all_keys if key != application_key
|
||||||
|
][:500]
|
||||||
|
|
||||||
|
if application_key is not None and application_key not in connected_keys:
|
||||||
|
connected_keys.append(application_key)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"application_key": application_key,
|
||||||
|
"hd_score_m1": hd_score_m1,
|
||||||
|
"connected_application_keys": connected_keys
|
||||||
|
}
|
||||||
43
pre_processing.py
Normal file
43
pre_processing.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def process_record(url, record):
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
url,
|
||||||
|
json={"record": record},
|
||||||
|
headers={"Content-Type": "application/json"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
output = response.json()
|
||||||
|
logger.info(f"Pre Processed record: {record['application_key']} - Response: {output}")
|
||||||
|
return output
|
||||||
|
elif response.status_code == 404:
|
||||||
|
# Special-case handling for "CLUSTER_NOT_FOUND" (or any 404).
|
||||||
|
# Return a benign structure that includes 'hd_key' and 'matches'
|
||||||
|
# so post_processing does not fail when it does:
|
||||||
|
# record_id = data.get('hd_key')
|
||||||
|
# connected_records = data.get('matches', []) + [record_id]
|
||||||
|
logger.warning(
|
||||||
|
f"Ignoring 404 for record {record['application_key']}. "
|
||||||
|
f"Status: 404, Response: {response.text}"
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"application_key": record.get("application_key"),
|
||||||
|
"hd_key": None,
|
||||||
|
"matches": []
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to process record {record['application_key']}. Status: {response.status_code}, Response: {response.text}")
|
||||||
|
return {"error": response.text}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing record {record['application_key']}: {str(e)}")
|
||||||
|
return {"error": str(e)}
|
||||||
28
processing.py
Normal file
28
processing.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def processing(url, data):
|
||||||
|
record_id = data.get('hd_key')
|
||||||
|
try:
|
||||||
|
response = requests.get(
|
||||||
|
f"{url}/{record_id}",
|
||||||
|
headers={"Content-Type": "application/json"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
output = response.json()
|
||||||
|
logger.info(f"Processed record: {record_id} - Response: {output}")
|
||||||
|
return output
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to process record {record_id}. Status: {response.status_code}, Response: {response.text}")
|
||||||
|
return {"error": response.text}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing record {record_id}: {str(e)}")
|
||||||
|
return {"error": str(e)}
|
||||||
@ -1 +1,36 @@
|
|||||||
{}
|
{
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"application_key": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Unique identifier for the application."
|
||||||
|
},
|
||||||
|
"application_timestamp": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Timestamp of the application."
|
||||||
|
},
|
||||||
|
"deviceid": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Unique identifier for the device used in the application."
|
||||||
|
},
|
||||||
|
"fuzzydeviceid": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Anonymized or hashed identifier for the device."
|
||||||
|
},
|
||||||
|
"application_email_address": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Email address associated with the application."
|
||||||
|
},
|
||||||
|
"hd_score_m1": {
|
||||||
|
"type": ["number", "null"],
|
||||||
|
"description": "HD Score M1 value associated with the application."
|
||||||
|
},
|
||||||
|
"hd_score_iso_m2": {
|
||||||
|
"type": ["number", "null"],
|
||||||
|
"description": "HD Score M2 Isotonic value associated with the application."
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": []
|
||||||
|
}
|
||||||
|
|
||||||
@ -1 +1 @@
|
|||||||
{}
|
requests == 2.32.3
|
||||||
@ -1 +1,27 @@
|
|||||||
{}
|
{
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"application_key": {
|
||||||
|
"type": ["string", "null"],
|
||||||
|
"description": "Unique identifier for the application."
|
||||||
|
},
|
||||||
|
"hd_score_m1": {
|
||||||
|
"type": ["number", "null"],
|
||||||
|
"description": "HD fraud score M1."
|
||||||
|
},
|
||||||
|
"hd_score_iso_m2": {
|
||||||
|
"type": ["number", "null"],
|
||||||
|
"description": "HD fraud score M2."
|
||||||
|
},
|
||||||
|
"connected_application_keys": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": ["string", "null"]
|
||||||
|
},
|
||||||
|
"description": "List of connected application keys associated with this application."
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": []
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user