import pandas as pd import logging import json import jmespath import regex as re from pre_processing import pre_processing from processing import processing from post_processing import post_processing import json_repair # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger(__name__) _JSON_LIKE = re.compile(r'^\s*\?*[\{\[].*[\}\]]\s*$', re.DOTALL) def extract_value(blob, expression): try: return jmespath.search(expression, blob) except Exception: return None def coalesce(*args): for value in args: if value is not None: return value return None # New sanitize blob function def deep_repair(obj): # 1) If it's a string that *looks* like JSON (with or without one leading '?'), # strip exactly one leading '?', reparses, and recurse. if isinstance(obj, str): s = obj.strip() if _JSON_LIKE.match(s): # strip one leading '?' if present if s.startswith('?'): s = s[1:] parsed = json_repair.loads(s) return deep_repair(parsed) return obj # 2) Dict → recurse on each value if isinstance(obj, dict): return {k: deep_repair(v) for k, v in obj.items()} # 3) List → recurse on each element if isinstance(obj, list): return [deep_repair(v) for v in obj] # 4) Otherwise, leave it alone return obj def sanitize_blob(blob): try: return deep_repair(blob) except Exception as e: logger.error("Failed to sanitize blob: %s", e) return None # Expressions to extract values expressions = { "first_seen_days": [ # 1) any vendor under integration_hub_results → first_seen_days "(Blob.integration_hub_results.*.tps_vendor_raw_response.query.results[0].first_seen_days)[0]", # 2) the flat “dotted” key "Blob.\"emailage.emailriskscore.first_seen_days\"", # 3) fallback to the top level tps_vendor_raw_response path "Blob.tps_vendor_raw_response.query.results[0].first_seen_days", ], "ea_score": [ # 1) any vendor under integration_hub_results 'Blob.integration_hub_results.*.tps_vendor_raw_response.query.results[0].EAScore', # 2) the flat “dotted” key 'Blob."emailage.emailriskscore.eascore"', # 3) fallback to the top level tps_vendor_raw_response 'Blob.tps_vendor_raw_response.query.results[0].EAScore', ], "email_creation_days": [ # 1) any vendor under integration_hub_results → results[0].email_creation_days "(Blob.integration_hub_results.*" ".tps_vendor_raw_response.query.results[0].email_creation_days)[0]", # 2) fallback to the top level tps_vendor_raw_response path "Blob.tps_vendor_raw_response.query.results[0].email_creation_days", ], "summary_risk_score": ["Blob.summary_risk_score"], "digital_id_trust_score_rating": ["Blob.digital_id_trust_score_rating"], "os_version": ["Blob.os_version"], "account_email_worst_score": ["Blob.account_email_worst_score"], "true_ip_score": ["Blob.true_ip_score"], "ip_net_speed_cell": [ # 1) any vendor under integration_hub_results → results[0].ip_netSpeedCell "(Blob.integration_hub_results.*" ".tps_vendor_raw_response.query.results[0].ip_netSpeedCell)[0]", # 2) fallback to the top level tps_vendor_raw_response path "Blob.tps_vendor_raw_response.query.results[0].ip_netSpeedCell", ], "account_email_score": ["Blob.account_email_score"], "true_ip_worst_score": ["Blob.true_ip_worst_score"], "proxy_ip_worst_score": ["Blob.proxy_ip_worst_score"], "proxy_ip_score": ["Blob.proxy_ip_score"], "fuzzy_device_score": ["Blob.fuzzy_device_score"], "ip_region_confidence": [ # 1) any vendor under integration_hub_results → results[0].ip_regionconf "(Blob.integration_hub_results.*" ".tps_vendor_raw_response.query.results[0].ip_regionconf)[0]", # 2) fallback to the top level tps_vendor_raw_response path "Blob.tps_vendor_raw_response.query.results[0].ip_regionconf", ], "true_ip_state_confidence": ["Blob.true_ip_state_confidence"], "fuzzy_device_worst_score": ["Blob.fuzzy_device_worst_score"], "digital_id_confidence_rating": ["Blob.digital_id_confidence_rating"], "trueipgeo": ["TrueIpGeo","Blob.true_ip_geo"], } def __main__( # Application-> application_key: str, application_timestamp: str, application_ssn: str, application_email_address: str, application_bank_account_number: str, application_is_rejected: str, application_date_of_birth: str, # uprovaloanapplication-> educationlevel: str, employmentstatus: str, lengthatbank: str, lengthatjob: str, ownhome: str, payfrequency: str, monthsatresidence: str, state: str, zip: str, # thxresponse-> EventType: str, DigitalIdConfidence: str, RiskRating: str, TmxSummaryReasonCode: str, TrueIpGeo: str, Blob: str, DeviceId: str, FuzzyDeviceId: str ) -> dict: # Convert input parameters into a flat dictionary data = { "application_key": application_key, "application_timestamp": application_timestamp, "application_ssn ": application_ssn, "application_email_address": application_email_address, "application_bank_account_number": application_bank_account_number, "application_is_rejected": application_is_rejected, "application_date_of_birth": application_date_of_birth, "educationlevel": educationlevel, "employmentstatus": employmentstatus, "lengthatbank": lengthatbank, "lengthatjob": lengthatjob, "ownhome": ownhome, "payfrequency": payfrequency, "monthsatresidence": monthsatresidence, "state": state, "zip": zip, "EventType": EventType, "DigitalIdConfidence": DigitalIdConfidence, "RiskRating": RiskRating, "TmxSummaryReasonCode": TmxSummaryReasonCode, "TrueIpGeo": TrueIpGeo, "Blob": Blob, "DeviceId": DeviceId, "FuzzyDeviceId": FuzzyDeviceId } # Convert dictionary to a single-row DataFrame combined_df = pd.DataFrame([data]) combined_df.columns = combined_df.columns.str.lower() combined_df["application_email_address"] = combined_df["application_email_address"].str.lower() if Blob: combined_df["blob"] = combined_df["blob"].apply(sanitize_blob) # Step 2: Extract values using the expressions dictionary for column, expressions_list in expressions.items(): combined_df[column] = combined_df["blob"].apply(lambda x: coalesce( *[extract_value(x, expr) for expr in expressions_list])) logger.info("pre_flowx data") logger.info(combined_df.iloc[0].drop('blob').to_dict()) else: for column, expressions_list in expressions.items(): combined_df[column] = None logger.info("pre_flowx data") logger.info(combined_df.iloc[0].to_dict()) pre_processed_data = pre_processing(combined_df) # logger.info(f"pre_processed_data: {pre_processed_data}") logger.info("pre_processed data") logger.info(pre_processed_data.iloc[0].to_dict()) df = processing(pre_processed_data) logger.info("processed_data") logger.info(df.iloc[0].to_dict()) df["application_timestamp"] = df["application_timestamp"].astype(str) # logger.info("prediction: %.8f", float(df['prediction'].iloc[0])) result = post_processing(df) logger.info("post_processed_data") logger.info(result) # State Check state_value = combined_df["state"].iloc[0] zip_value = combined_df["zip"].iloc[0] if (pd.notnull(state_value) and state_value == "ZZ") or (pd.notnull(zip_value) and zip_value == "86445"): result["hd_score_m1"] = 1250 logger.info("post_processed_data after state check") logger.info(result) return result # testing : # __main__