diff --git a/block.py b/block.py index 3b227f9..3571899 100644 --- a/block.py +++ b/block.py @@ -1,21 +1,111 @@ -@flowx_block -def example_function(request: dict) -> dict: - - # Processing logic here... - - return { - "meta_info": [ - { - "name": "created_date", - "type": "string", - "value": "2024-11-05" - } - ], - "fields": [ - { - "name": "", - "type": "", - "value": "" - } - ] - } +import pandas as pd +import json +import jmespath +import xgboost as xgb +import math +import joblib +import logging +from typing import Optional, List, Dict +from pre_processing import pre_processing +from processing import processing +from post_processing import post_processing + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +def __main__(application_key: str, application_timestamp: str, application_source_name: str, application_date_of_birth: str, ownhome: str, employmentstatus: str, lengthatjob: float, payfrequency: str, lengthatbank: str, inputipaddress: str, deviceid: str, fuzzydeviceid: str, trueip: str, dnsip: str, requestid: str, riskrating: str, tmxsummaryreasoncode: str, digitalidconfidence: str, results: Optional[List[Dict]] = None) -> dict: + + data = { + "application_key": application_key, + "application_timestamp": application_timestamp, + "application_source_name": application_source_name, + "application_date_of_birth": application_date_of_birth, + "ownhome": ownhome, + "employmentstatus": employmentstatus, + "lengthatjob": lengthatjob, + "payfrequency": payfrequency, + "lengthatbank": lengthatbank, + "inputipaddress": inputipaddress, + "deviceid": deviceid, + "fuzzydeviceid": fuzzydeviceid, + "trueip": trueip, + "dnsip": dnsip, + "requestid": requestid, + "riskrating": riskrating, + "tmxsummaryreasoncode": tmxsummaryreasoncode, + "digitalidconfidence": digitalidconfidence + } + + data = pd.DataFrame([data]) + + expected_schema = { + "ea_score": str, + "ip_net_speed_cell": str, + "ip_country_confidence": str, + "ip_region_confidence": str, + "fraud_risk": str, + "first_seen_days": str, + "domain_creation_days": str + } + + expressions = { + "ea_score": "EAScore", + "ip_net_speed_cell": "ip_netSpeedCell", + "ip_country_confidence": "ip_countryconf", + "ip_region_confidence": "ip_regionconf", + "fraud_risk": "fraudRisk", + "first_seen_days": "first_seen_days", + "domain_creation_days": "domain_creation_days", + } + + + if results: + first_result = results[0] + for column, expression in expressions.items(): + try: + extracted_value = jmespath.search(expression, first_result) + expected_type = expected_schema[column] + + if extracted_value is not None and not isinstance(extracted_value, expected_type): + try: + extracted_value = expected_type(extracted_value) + except (ValueError, TypeError) as cast_error: + logger.error(f"Failed to cast {column} value to {expected_type.__name__}: {cast_error}") + extracted_value = None + + data[column] = extracted_value + except Exception as e: + logger.error(f"Error extracting value for {column}: {e}") + data[column] = None + else: + for column in expressions.keys(): + data[column] = None + + + logger.info(f"pre_pre_processed_data: {data.to_dict(orient='records')}") + + pre_processed_data = pre_processing(data) + + logger.info(f"pre_processed_data: {pre_processed_data}") + + prediction = processing(pre_processed_data) + + logger.info("prediction: %.8f", float(prediction)) + + result = post_processing(prediction[0]) + + logger.info("Score: %.0f", float(result["score"])) + logger.info("Action: %s", result["action"]) + logger.info("Description: %s", result["description"]) + + return { + # 'prediction': prediction, + 'score': result["score"], + 'action': result["action"], + 'description': result["description"] + } diff --git a/category_orders_train.json b/category_orders_train.json new file mode 100644 index 0000000..4eb16b0 --- /dev/null +++ b/category_orders_train.json @@ -0,0 +1,11 @@ +{ + "application_source_name": ["arrowshade organic low", "arrowshade pr", "arrowshade pr hq", "upab5010", "upab5016", "upab5555", "uplm100pr", "uplm5555", "upr new lead - s1", "uprpadmc1", "uprreact1", "uprrefi1", "uprwebnew", "uprwebrfi", "uprwebvip"], + "ownhome": ["false", "none", "true"], + "employmentstatus": ["disability", "fixed income", "full time employed", "none", "other", "part time employment", "retired benefits", "self employed", "student", "unemployed", "welfare"], + "payfrequency": ["bi weekly", "bi-weekly", "bw", "none", "semi-monthly", "semimonthly"], + "fraud_risk": ["low", "moderate", "none", "review", "very high", "very low"], + "ip_net_speed_cell": ["broadband", "cable", "dialup", "mobile", "none", "satellite", "t1", "wireless", "xdsl"], + "riskrating": ["high", "low", "medium", "neutral", "none", "trusted"] +} + + diff --git a/post_processing.py b/post_processing.py new file mode 100644 index 0000000..aa5d01d --- /dev/null +++ b/post_processing.py @@ -0,0 +1,27 @@ +import logging +import math + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def post_processing(value): + try: + part1 = min(value * 100 + 0.00001, 1) * 85 + part2 = max(math.log(value * 100 + 0.000001, 2) * 185, 0) + score = round((part1 + part2), 0) + score_threshold = 1230 + action = "Application Decline" if score >= score_threshold else "Application Pass" + description = ( + f"HD Fraud Score is above the risk threshold {score_threshold}, Recommended action: {action}." + if score >= score_threshold + else f"HD Fraud Score is below the risk threshold {score_threshold}, Recommended action: {action}." + ) + # logger.info({'score': score, 'action': action, 'description': description}) + return {'score': score, 'action': action, 'description': description} + except Exception as e: + logger.error(f"Error in post_processing: {e}") + return {'score': None, 'action': 'Unknown', 'description': 'Error processing the score'} diff --git a/pre_processing.py b/pre_processing.py new file mode 100644 index 0000000..ee51ed1 --- /dev/null +++ b/pre_processing.py @@ -0,0 +1,95 @@ +import pandas as pd +import json +import jmespath +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def pre_processing(input_data): + + # combined_df = pd.DataFrame([input_data]) + combined_df = input_data + combined_df["app_age"] = combined_df.apply( + lambda row: pd.to_datetime(row["application_timestamp"]).year - pd.to_datetime(row["application_date_of_birth"]).year + if pd.notnull(row["application_timestamp"]) and pd.notnull(row["application_date_of_birth"]) else None, + axis=1 + ) + + # for col in ["requestid", "inputipaddress", "deviceid", "fuzzydeviceid", "trueip", "dnsip"]: + # combined_df[f"{col}_consistency"] = combined_df.groupby("application_key")[col].transform("nunique") + + for col in ["requestid", "inputipaddress", "deviceid", "fuzzydeviceid", "trueip", "dnsip"]: + combined_df[f"{col}_consistency"] = combined_df[col].apply( + lambda x: 1 if pd.notnull(x) and str(x).lower() not in ("nan", "none", None) else 0 + ) + + combined_df.rename(columns={'inputipaddress_consistency': 'inputip_consistency'}, inplace=True) + combined_df.rename(columns={'requestid_consistency': 'request_consistency'}, inplace=True) + + combined_df['digitalidconfidence'] = pd.to_numeric(combined_df['digitalidconfidence'], errors='coerce').astype('Int64') + + for col in ["digitalidconfidence"]: + combined_df[f"avg_{col}"] = combined_df.groupby("application_key")[col].transform("mean") + combined_df[f"min_{col}"] = combined_df.groupby("application_key")[col].transform("min") + combined_df[f"max_{col}"] = combined_df.groupby("application_key")[col].transform("max") + + combined_df['Level_1_Link_Accept'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Level_1_Link_Accept', na=False, regex=True).astype(int) + combined_df['Identity_Negative_History_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Identity_Negative_History', na=False, regex=True).astype(int) + combined_df['Level_1_Link_Accept_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Level_1_Link_Accept', na=False, regex=True).astype(int) + combined_df['Device_Negative_History_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Device_Negative_History', na=False, regex=True).astype(int) + combined_df['Level_1_Link_Reject_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Level_1_Link_Reject', na=False, regex=True).astype(int) + combined_df['IP_Negative_History_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('IP_Negative_History', na=False, regex=True).astype(int) + combined_df['Identity_Spoofing_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Identity_Spoofing', na=False, regex=True).astype(int) + combined_df['Bot_Max'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Bot', na=False, regex=True).astype(int) + + def map_fraud_risk(risk): + risk = str(risk).lower() + if "very low" in risk: + return "Very Low" + elif "low" in risk: + return "Low" + elif "moderate" in risk: + return "Moderate" + elif "review" in risk: + return "Review" + elif "very high" in risk: + return "Very High" + else: + return None + + combined_df["fraud_risk"] = combined_df["fraud_risk"].apply(map_fraud_risk) + + # combined_df.replace({'nan': None, 'None': None}, inplace=True) + combined_df.replace({'nan': None, 'None': None}, inplace=True) + + dtype_dict = { + 'app_age': 'int64', 'first_seen_days': 'int64', 'request_consistency': 'int64', + 'application_source_name': str, 'fuzzydeviceid_consistency': 'int64', + 'domain_creation_days': 'int64', 'employmentstatus': str, 'Identity_Spoofing_Max': 'int64', + 'trueip_consistency': 'int64', 'inputip_consistency': 'int64', 'ea_score': 'int64', + 'lengthatbank': float, 'lengthatjob': float, 'max_digitalidconfidence': float, + 'Identity_Negative_History_Max': 'int64', 'digitalidconfidence': 'int64', + 'IP_Negative_History_Max': 'int64', 'Device_Negative_History_Max': 'int64', + 'Bot_Max': 'int64', 'avg_digitalidconfidence': float, 'min_digitalidconfidence': float, + 'Level_1_Link_Reject_Max': 'int64', 'dnsip_consistency': 'int64', 'ip_country_confidence': 'int64', + 'riskrating': str, 'ownhome': str, 'deviceid_consistency': 'int64', + 'payfrequency': str, 'fraud_risk': str, 'Level_1_Link_Accept': 'int64', + 'ip_net_speed_cell': str, 'ip_region_confidence': 'int64', 'Level_1_Link_Accept_Max': 'int64' + } + + output_columns = list(dtype_dict.keys()) + filtered_df = combined_df[output_columns] + + int_columns = [col for col, dtype in dtype_dict.items() if dtype == int] + + for col in int_columns: + filtered_df[col] = pd.to_numeric(filtered_df[col], errors='coerce') + + filtered_df = filtered_df.astype(dtype_dict, errors='ignore') + + return filtered_df.to_dict(orient="records") diff --git a/processing.py b/processing.py new file mode 100644 index 0000000..c985077 --- /dev/null +++ b/processing.py @@ -0,0 +1,41 @@ +import pandas as pd +import xgboost as xgb +import math +import joblib +import json + +def processing(input_data): + df = pd.DataFrame(input_data) + model = joblib.load("./xgboost_model.joblib") + with open('./category_orders_train.json', 'r') as f: + category_orders = json.load(f) + + if df.empty: + raise ValueError("Input DataFrame is empty.") + + categorical_columns = ["application_source_name", "ownhome", "employmentstatus", "payfrequency", "fraud_risk", "ip_net_speed_cell", "riskrating"] + for col in categorical_columns: + if col in df.columns: + df[col] = df[col].str.lower() + df[col].replace([None, "", "null", math.isnan, pd.NA], "none", inplace=True) + df[col] = pd.Categorical(df[col], categories=category_orders[col]) + else: + df[col] = pd.Categorical(["none"], categories=category_orders.get(col, ["none"])) + + non_categorical_columns = [col for col in df.columns if col not in categorical_columns] + for col in non_categorical_columns: + if col in df.columns: + df[col] = df[col].astype(str).str.lower().replace(["null", "nan", "", None], pd.NA) + df[col] = pd.to_numeric(df[col], errors="coerce") + else: + df[col] = pd.NA + + expected_features = model.feature_names + missing_features = [feature for feature in expected_features if feature not in df.columns] + for feature in missing_features: + df[feature] = None + + dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True) + predictions = model.predict(dmatrix) + + return predictions diff --git a/request_schema.json b/request_schema.json index 0967ef4..efc03a6 100644 --- a/request_schema.json +++ b/request_schema.json @@ -1 +1,84 @@ -{} +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "application_key": { + "type": ["string", "null"], + "description": "Unique identifier for the application." + }, + "application_timestamp": { + "type": ["string", "null"], + "description": "Timestamp of the application in UTC." + }, + "application_source_name": { + "type": ["string", "null"], + "description": "Source from which the application was submitted." + }, + "application_date_of_birth": { + "type": ["string", "null"], + "description": "Date of birth of the applicant." + }, + "ownhome": { + "type": ["string", "null"], + "description": "Indicates if the applicant owns a home." + }, + "employmentstatus": { + "type": ["string", "null"], + "description": "Employment status of the applicant." + }, + "lengthatjob": { + "type": ["number", "null"], + "description": "Length of time the applicant has been at their current job." + }, + "payfrequency": { + "type": ["string", "null"], + "description": "Frequency of pay for the applicant." + }, + "lengthatbank": { + "type": ["string", "null"], + "description": "Length of time the applicant has been with their bank." + }, + "inputipaddress": { + "type": ["string", "null"], + "description": "IP address of the device used to submit the application." + }, + "deviceid": { + "type": ["string", "null"], + "description": "Unique identifier for the device used to submit the application." + }, + "fuzzydeviceid": { + "type": ["string", "null"], + "description": "Hashed or partially anonymized identifier for the device." + }, + "trueip": { + "type": ["string", "null"], + "description": "Actual IP address of the applicant's device." + }, + "dnsip": { + "type": ["string", "null"], + "description": "DNS IP address of the device used to submit the application." + }, + "requestid": { + "type": ["string", "null"], + "description": "Unique identifier for the application request." + }, + "riskrating": { + "type": ["string", "null"], + "description": "Risk rating assigned to the application." + }, + "tmxsummaryreasoncode": { + "type": ["string", "null"], + "description": "Reason code summary from third-party risk assessment." + }, + "digitalidconfidence": { + "type": ["string", "null"], + "description": "Confidence score for the digital identity of the applicant." + }, + "results": { + "type": ["array", "null"], + "items": {"type": "object"}, + "description": "ThreatMetrixResponse emailage blob." + } + }, + "required": [] +} diff --git a/requirements.txt b/requirements.txt index 0967ef4..bc2511b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,5 @@ -{} +pandas ==2.2.2 +joblib ==1.3.2 +xgboost ==1.7.5 +jmespath==1.0.1 +numpy==1.23.5 \ No newline at end of file diff --git a/response_schema.json b/response_schema.json index 0967ef4..6137a2c 100644 --- a/response_schema.json +++ b/response_schema.json @@ -1 +1,18 @@ -{} +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "score": { + "type": "number", + "description": "HD Fraud Score." + }, + "action": { + "type": "string", + "description": "Recommended Action." + }, + "description": { + "type": "string", + "description": "Description" + } + } + } diff --git a/xgboost_model.joblib b/xgboost_model.joblib new file mode 100644 index 0000000..72b5520 Binary files /dev/null and b/xgboost_model.joblib differ