import pandas as pd import logging import json import jmespath import regex as re from pre_processing import pre_processing from processing import processing from post_processing import post_processing # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger(__name__) def extract_value(blob, expression): try: return jmespath.search(expression, blob) except Exception: return None # Coalesce function to return the first non-None value def coalesce(*args): for value in args: if value is not None: return value return None # New sanitize blob function def sanitize_blob(blob): try: blob = re.sub(r'"(\w+)":"(\{[^}]+\})"', r'"\1":\2', blob) blob = re.sub(r'"tps_vendor_raw_response"\s*:\s*"\?\{', '"tps_vendor_raw_response":{', blob) blob = blob.replace('\\"', '"') blob = blob.replace('\\n', '') blob = blob.replace('\\t', '') blob = blob.replace('\\\\', '') blob = re.sub(r'(\}\})"', r'\1', blob) blob = re.sub(r',\s*([\}\]])', r'\1', blob) return json.loads(blob) except json.JSONDecodeError as e: logger.error(f"JSON Decode Error: {e}") error_pos = e.pos snippet = blob[max(0, error_pos - 50): error_pos + 50] logger.error(f"Error near:\n{snippet}") return None #---------------- Sanitise ends here # Function to extract a value using JMESPath # Expressions to extract values expressions = { "first_seen_days": [ "tps_vendor_raw_response.query.results[0].first_seen_days", "emailage.emailriskscore.first_seen_days" ], "ea_score": [ "tps_vendor_raw_response.query.results[0].EAScore", "emailage.emailriskscore.eascore" ], "email_creation_days": [ "tps_vendor_raw_response.query.results[0].email_creation_days" ], "summary_risk_score": ["summary_risk_score"], "digital_id_trust_score_rating": ["digital_id_trust_score_rating"], "os_version": ["os_version"], "account_email_worst_score": ["account_email_worst_score"], "true_ip_score": ["true_ip_score"], "ip_net_speed_cell": [ "tps_vendor_raw_response.query.results[0].ip_netSpeedCell", # "true_ip_connection_type" ], "account_email_score": ["account_email_score"], "true_ip_worst_score": ["true_ip_worst_score"], "proxy_ip_worst_score": ["proxy_ip_worst_score"], "proxy_ip_score": ["proxy_ip_score"], "fuzzy_device_score": ["fuzzy_device_score"], "ip_region_confidence": ["tps_vendor_raw_response.query.results[0].ip_regionconf"], "true_ip_state_confidence": ["true_ip_state_confidence"], "fuzzy_device_worst_score": ["fuzzy_device_worst_score"], "digital_id_confidence_rating": ["digital_id_confidence_rating"] } def __main__( #Application-> application_key: str, application_timestamp: str, application_ssn : str, application_email_address: str, application_bank_account_number: str, application_is_rejected: str, application_date_of_birth: str, #uprovaloanapplication-> educationlevel:str, employmentstatus: str, lengthatbank: str, lengthatjob: str, ownhome: str, payfrequency: str, monthsatresidence: str, state: str, zip: str, #thxresponse-> EventType: str, DigitalIdConfidence: str, RiskRating: str, TmxSummaryReasonCode: str, TrueIpGeo: str, Blob:str, DeviceId:str, FuzzyDeviceId: str ) -> dict: # Convert input parameters into a flat dictionary data = { "application_key" : application_key, "application_timestamp" : application_timestamp, "application_ssn " : application_ssn , "application_email_address" : application_email_address, "application_bank_account_number" : application_bank_account_number, "application_is_rejected" : application_is_rejected, "application_date_of_birth" : application_date_of_birth, "educationlevel" : educationlevel, "employmentstatus" : employmentstatus, "lengthatbank" : lengthatbank, "lengthatjob" : lengthatjob, "ownhome" : ownhome, "payfrequency" : payfrequency, "monthsatresidence" : monthsatresidence, "state" : state, "zip" : zip, "EventType" : EventType, "DigitalIdConfidence" : DigitalIdConfidence, "RiskRating" : RiskRating, "TmxSummaryReasonCode" : TmxSummaryReasonCode, "TrueIpGeo" : TrueIpGeo, "Blob" : Blob, "DeviceId" : DeviceId, "FuzzyDeviceId" : FuzzyDeviceId } # Convert dictionary to a single-row DataFrame combined_df = pd.DataFrame([data]) combined_df.columns = combined_df.columns.str.lower() combined_df["application_email_address"] = combined_df["application_email_address"].str.lower() if Blob: combined_df["blob"] = combined_df["blob"].apply(sanitize_blob) # Step 2: Extract values using the expressions dictionary for column, expressions_list in expressions.items(): combined_df[column] = combined_df["blob"].apply(lambda x: coalesce(*[extract_value(x, expr) for expr in expressions_list])) logger.info("pre_flowx data") logger.info(combined_df.iloc[0].drop('blob').to_dict()) else: for column, expressions_list in expressions.items(): combined_df[column] = None logger.info("pre_flowx data") logger.info(combined_df.iloc[0].to_dict()) pre_processed_data = pre_processing(combined_df) # logger.info(f"pre_processed_data: {pre_processed_data}") logger.info("pre_processed data") logger.info(pre_processed_data.iloc[0].to_dict()) df = processing(pre_processed_data) logger.info("procesed_data") logger.info(df.iloc[0].to_dict()) df["application_timestamp"] = df["application_timestamp"].astype(str) # logger.info("prediction: %.8f", float(df['prediction'].iloc[0])) result = post_processing(df) logger.info("post_processed_data") logger.info(result) # State Check state_value = combined_df["state"].iloc[0] zip_value = combined_df["zip"].iloc[0] if (pd.notnull(state_value) and state_value == "ZZ") or (pd.notnull(zip_value) and zip_value == "86445"): result["hd_score_m1"] = 1250 logger.info("post_processed_data after state check") logger.info(result) return result # testing : # __main__