Advanced M series V1 model block
All checks were successful
Build and Push Docker Image / test (push) Successful in 55s
Build and Push Docker Image / build_and_push (push) Successful in 3m21s

This commit is contained in:
admin user 2025-03-12 16:12:18 +00:00
parent 4ca7b2486f
commit ead9a776da
11 changed files with 767 additions and 22 deletions

View File

@ -1 +1,3 @@
**Hello world!!!**
## Advanced M series V1 model block
M Series Model trained on historical data to identify fraudulent patterns.

189
block.py
View File

@ -1,21 +1,174 @@
@flowx_block
def example_function(request: dict) -> dict:
import pandas as pd
import logging
import json
import jmespath
import regex as re
from pre_processing import pre_processing
from processing import processing
from post_processing import post_processing
# Processing logic here...
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
return {
"meta_info": [
{
"name": "created_date",
"type": "string",
"value": "2024-11-05"
}
def extract_value(blob, expression):
try:
return jmespath.search(expression, blob)
except Exception:
return None
# Coalesce function to return the first non-None value
def coalesce(*args):
for value in args:
if value is not None:
return value
return None
# New sanitize blob function
def sanitize_blob(blob):
try:
blob = re.sub(r'"(\w+)":"(\{[^}]+\})"', r'"\1":\2', blob)
blob = re.sub(r'"tps_vendor_raw_response"\s*:\s*"\?\{', '"tps_vendor_raw_response":{', blob)
blob = blob.replace('\\"', '"')
blob = blob.replace('\\n', '')
blob = blob.replace('\\t', '')
blob = blob.replace('\\\\', '')
blob = re.sub(r'(\}\})"', r'\1', blob)
blob = re.sub(r',\s*([\}\]])', r'\1', blob)
return json.loads(blob)
except json.JSONDecodeError as e:
logger.error(f"JSON Decode Error: {e}")
error_pos = e.pos
snippet = blob[max(0, error_pos - 50): error_pos + 50]
logger.error(f"Error near:\n{snippet}")
return None
#---------------- Sanitise ends here
# Function to extract a value using JMESPath
# Expressions to extract values
expressions = {
"first_seen_days": [
"tps_vendor_raw_response.query.results[0].first_seen_days",
"emailage.emailriskscore.first_seen_days"
],
"fields": [
{
"name": "",
"type": "",
"value": ""
}
]
}
"ea_score": [
"tps_vendor_raw_response.query.results[0].EAScore",
"emailage.emailriskscore.eascore"
],
"email_creation_days": [
"tps_vendor_raw_response.query.results[0].email_creation_days"
],
"summary_risk_score": ["summary_risk_score"],
"digital_id_trust_score_rating": ["digital_id_trust_score_rating"],
"os_version": ["os_version"],
"account_email_worst_score": ["account_email_worst_score"],
"true_ip_score": ["true_ip_score"],
"ip_net_speed_cell": [
"tps_vendor_raw_response.query.results[0].ip_netSpeedCell",
# "true_ip_connection_type"
],
"account_email_score": ["account_email_score"],
"true_ip_worst_score": ["true_ip_worst_score"],
"proxy_ip_worst_score": ["proxy_ip_worst_score"],
"proxy_ip_score": ["proxy_ip_score"],
"fuzzy_device_score": ["fuzzy_device_score"],
"ip_region_confidence": ["tps_vendor_raw_response.query.results[0].ip_regionconf"],
"true_ip_state_confidence": ["true_ip_state_confidence"],
"fuzzy_device_worst_score": ["fuzzy_device_worst_score"],
"digital_id_confidence_rating": ["digital_id_confidence_rating"]
}
def __main__(
#Application->
application_key: str,
application_timestamp: str,
application_ssn : str,
application_email_address: str,
application_bank_account_number: str,
application_is_rejected: str,
application_date_of_birth: str,
#uprovaloanapplication->
educationlevel:str,
employmentstatus: str,
lengthatbank: str,
lengthatjob: str,
ownhome: str,
payfrequency: str,
monthsatresidence: str,
#thxresponse->
EventType: str,
DigitalIdConfidence: str,
RiskRating: str,
TmxSummaryReasonCode: str,
TrueIpGeo: str,
Blob:str,
DeviceId:str,
FuzzyDeviceId: str
) -> dict:
# Convert input parameters into a flat dictionary
data = {
"application_key" : application_key,
"application_timestamp" : application_timestamp,
"application_ssn " : application_ssn ,
"application_email_address" : application_email_address,
"application_bank_account_number" : application_bank_account_number,
"application_is_rejected" : application_is_rejected,
"application_date_of_birth" : application_date_of_birth,
"educationlevel" : educationlevel,
"employmentstatus" : employmentstatus,
"lengthatbank" : lengthatbank,
"lengthatjob" : lengthatjob,
"ownhome" : ownhome,
"payfrequency" : payfrequency,
"monthsatresidence" : monthsatresidence,
"EventType" : EventType,
"DigitalIdConfidence" : DigitalIdConfidence,
"RiskRating" : RiskRating,
"TmxSummaryReasonCode" : TmxSummaryReasonCode,
"TrueIpGeo" : TrueIpGeo,
"Blob" : Blob,
"DeviceId" : DeviceId,
"FuzzyDeviceId" : FuzzyDeviceId
}
# Convert dictionary to a single-row DataFrame
combined_df = pd.DataFrame([data])
combined_df.columns = combined_df.columns.str.lower()
combined_df["application_email_address"] = combined_df["application_email_address"].str.lower()
if Blob:
combined_df["blob"] = combined_df["blob"].apply(sanitize_blob)
# Step 2: Extract values using the expressions dictionary
for column, expressions_list in expressions.items():
combined_df[column] = combined_df["blob"].apply(lambda x: coalesce(*[extract_value(x, expr) for expr in expressions_list]))
logger.info("pre_flowx data")
logger.info(combined_df.iloc[0].drop('blob').to_dict())
else:
for column, expressions_list in expressions.items():
combined_df[column] = None
logger.info("pre_flowx data")
logger.info(combined_df.iloc[0].to_dict())
pre_processed_data = pre_processing(combined_df)
# logger.info(f"pre_processed_data: {pre_processed_data}")
logger.info("pre_processed data")
logger.info(pre_processed_data.iloc[0].to_dict())
df = processing(pre_processed_data)
logger.info("procesed_data")
logger.info(df.iloc[0].to_dict())
df["application_timestamp"] = df["application_timestamp"].astype(str)
# logger.info("prediction: %.8f", float(df['prediction'].iloc[0]))
result = post_processing(df)
# logger.info("Score: %.0f", float(result["hd_score_m1"]))
logger.info(result)
return result
# testing :
# __main__

View File

@ -0,0 +1,88 @@
{
"employmentstatus": [
"disability",
"fixed income",
"full time employed",
"other",
"part time employment",
"retired benefits",
"self employed",
"student",
"unemployed",
"welfare"
],
"TrueIpGeo": [
"other",
"us"
],
"digital_id_trust_score_rating": [
"high",
"low",
"neutral",
"very_high",
"very_low"
],
"educationlevel": [
"associate's degree",
"bachelor's degree",
"doctorate",
"high school",
"master's degree",
"other"
],
"os_version": [
"10",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"8",
"9",
"unknown"
],
"ip_net_speed_cell": [
"broadband",
"cable",
"dialup",
"dsl",
"fixed wireless",
"mobile",
"mobile wireless",
"ocx",
"satellite",
"t1",
"tx",
"wireless",
"xdsl"
],
"day_night": [
"Day",
"Night"
],
"digital_id_confidence_rating": [
"high",
"medium",
"very_high",
"very_low"
],
"RiskRating": [
"high",
"low",
"medium",
"neutral",
"trusted"
],
"payfrequency": [
"biweekly",
"semimonthly"
],
"ownhome": [
"false",
"true"
]
}

25
post_processing.py Normal file
View File

@ -0,0 +1,25 @@
import logging
import numpy as np
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def post_processing(df):
try:
df['hd_score_m1'] = np.round(
np.minimum(df['prediction'] * 100 + 0.00001, 1) * 85 +
np.maximum(np.log2(df['prediction'] * 100 + 0.000001) * 185, 0),
0
)
logging.info(f"hd_score_m1 calculated: {df['hd_score_m1'].iloc[0]}")
except Exception as e:
logging.error(f"Error processing hd_score_m1 calculations: {e}")
return df[['application_key', 'application_timestamp', 'deviceid', 'fuzzydeviceid', 'application_email_address', 'hd_score_m1']].iloc[0].to_dict()

254
pre_processing.py Normal file
View File

@ -0,0 +1,254 @@
import pandas as pd
import numpy as np
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def pre_processing(data_df):
# combined_df = pd.DataFrame([input_data])
# data = pd.DataFrame(data)
combined_df = data_df
combined_df["applicant_age"] = combined_df.apply(lambda row: pd.to_datetime(row["application_timestamp"]).year - pd.to_datetime(row["application_date_of_birth"]).year if pd.notnull(row["application_timestamp"]) and pd.notnull(row["application_date_of_birth"]) else None,axis=1
)
# Extracting Temporal features
combined_df['application_timestamp'] = pd.to_datetime(combined_df["application_timestamp"])
combined_df.loc[:, 'application_time'] = pd.to_datetime(combined_df['application_timestamp']).dt.time
combined_df['day'] = combined_df['application_timestamp'].dt.day
combined_df['day_of_week'] = combined_df['application_timestamp'].dt.weekday # 0=Monday, 6=Sunday
combined_df['day_sin'] = np.sin(2 * np.pi * combined_df['day'] / 31)
combined_df['day_cos'] = np.cos(2 * np.pi * combined_df['day'] / 31)
combined_df['day_of_week_sin'] = np.sin(2 * np.pi * combined_df['day_of_week'] / 7)
combined_df['day_of_week_cos'] = np.cos(2 * np.pi * combined_df['day_of_week'] / 7)
# combined_df['is_weekend'] = combined_df['day_of_week'].apply(lambda x: 1 if x >= 5 else 0)
# Create a day/night variable
def classify_day_night(hour):
if 6 <= hour < 18:
return 'Day'
else:
return 'Night'
# Extract hour from application_time
combined_df['hour'] = combined_df['application_time'].apply(lambda x: x.hour if pd.notnull(x) else np.nan)
combined_df['day_night'] = combined_df['hour'].apply(lambda hour: classify_day_night(hour) if pd.notnull(hour) else 'Unknown')
# combined_df['os_version'] = combined_df['os_version'].str.replace(r'[^a-zA-Z0-9]', '_', regex=True)
combined_df['os_version'] = combined_df['os_version'].apply(lambda x: x.split('.')[0] if isinstance(x, str) and '.' in x
else x.split('_')[0] if isinstance(x, str) and '_' in x
else x)
# Datatype conversions
# combined_df['Level_1_Link_Accept'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Level_1_Link_Accept', na=False, regex=True).astype(int)
combined_df['Identity_Negative_History'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Identity_Negative_History', na=False, regex=True).astype(int)
combined_df['Device_Negative_History'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Device_Negative_History', na=False, regex=True).astype(int)
combined_df['Level_1_Link_Reject'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Level_1_Link_Reject', na=False, regex=True).astype(int)
combined_df['IP_Negative_History'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('IP_Negative_History', na=False, regex=True).astype(int)
combined_df['Identity_Spoofing'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Identity_Spoofing', na=False, regex=True).astype(int)
# combined_df['Bot'] = combined_df['tmxsummaryreasoncode'].astype(str).str.contains('Bot', na=False, regex=True).astype(int)
combined_df['digitalidconfidence'] = pd.to_numeric(combined_df['digitalidconfidence'], errors='coerce').astype('Int64')
# Rename Columns if Required
combined_df.rename(columns={
'DigitalIdConfidence': 'digitalidconfidence',
# 'inputipaddress_consistency': 'inputip_consistency',
# 'requestid_consistency': 'request_consistency',
# Add others as required if present in your DataFrame and needing renaming.
}, inplace=True)
# #Testing : remove below
# combined_df.to_csv('op-pre-processing_intermediate.csv', index=False)
dtype_dict = {
"applicant_age" : int,
"digitalidconfidence" : float,
"first_seen_days" : float,
"employmentstatus" : str,
"ea_score" : float,
"trueipgeo" : str,
"hour" : int,
"email_creation_days" : float,
"lengthatjob" : float,
"day_cos" : float,
"summary_risk_score" : float,
"digital_id_trust_score_rating" : str,
"day" : 'int32',
"lengthatbank" : float,
"day_of_week_cos" : float,
"Level_1_Link_Reject" : int,
"Identity_Negative_History" : int,
"educationlevel" : str,
"os_version" : str,
"account_email_worst_score" : float,
"true_ip_score" : float,
"ip_net_speed_cell" : str,
"account_email_score" : float,
"day_of_week" : 'int32',
"true_ip_worst_score" : float,
"proxy_ip_worst_score" : float,
"day_night" : str,
"proxy_ip_score" : float,
"monthsatresidence" : float,
"Device_Negative_History" : int,
"fuzzy_device_score" : float,
"day_sin" : float,
"ip_region_confidence" : float,
"true_ip_state_confidence" : float,
"IP_Negative_History" : int,
"fuzzy_device_worst_score" : float,
"digital_id_confidence_rating" : str,
"day_of_week_sin" : float,
"riskrating" : str,
"payfrequency" : str,
"ownhome" : str,
"Identity_Spoofing" : int
}
next_block_cols = ['application_key', 'application_timestamp', 'deviceid', 'fuzzydeviceid', 'application_email_address']
cols_to_keep = [col for col in dtype_dict.keys() if col in combined_df.columns]
final_cols = list(set(next_block_cols).union(set(cols_to_keep)))
# Type casting
for col, dtype in dtype_dict.items():
if col in combined_df.columns:
if dtype == int:
combined_df[col] = pd.to_numeric(combined_df[col], errors='coerce', downcast='integer')
elif dtype == float:
combined_df[col] = pd.to_numeric(combined_df[col], errors='coerce', downcast='float')
elif dtype == str:
combined_df[col] = combined_df[col].astype(str)
# cross check data type
capping_dict = {
"applicant_age": (18, 93),
"digitalidconfidence": (0, 9017),
"first_seen_days": (0, 10486),
"ea_score": (1, 930),
"hour": (0, 23),
"email_creation_days": (2438, 9661),
"lengthatjob": (1, 24),
"day_cos": (-0.9948693234, 1),
"summary_risk_score": (-100, 30),
"day": (1, 31),
"lengthatbank": (0, 25),
"day_of_week_cos": (-0.9009688679, 1),
"Level_1_Link_Reject": (0, 1),
"Identity_Negative_History": (0, 1),
"account_email_worst_score": (-52, 0),
"true_ip_score": (-38, 49),
"account_email_score": (-18, 9),
"day_of_week": (0, 6),
"true_ip_worst_score": (-100, 0),
"proxy_ip_worst_score": (-100, 0),
"proxy_ip_score": (-29, 60),
"monthsatresidence": (0, 25),
"Device_Negative_History": (0, 1),
"fuzzy_device_score": (-29, 14),
"day_sin": (-0.9987165072, 0.9987165072),
"ip_region_confidence": (75, 99),
# "true_ip_state_confidence": (5, 98),
"IP_Negative_History": (0, 1),
"fuzzy_device_worst_score": (-100, 0),
"day_of_week_sin": (-0.9749279122, 0.9749279122),
"Identity_Spoofing": (0, 1),
}
# Apply capping
for column, (cap_min, cap_max) in capping_dict.items():
if column in combined_df.columns:
combined_df[column] = combined_df[column].clip(lower=cap_min, upper=cap_max)
def handle_unknowns(X, column, known_values, default_treatment=None):
if column not in X.columns:
return X # Return X to avoid NoneType error
known_values = {str(val).lower() for val in known_values}
invalid_values = {None, "none", "nan", pd.NA}
X[column] = X[column].apply(
lambda x: str(x).lower() if pd.notna(x) and str(x).lower() in known_values
else (default_treatment if pd.notna(x) and str(x).lower() not in invalid_values else np.nan)
)
return X # Always return the DataFrame
unknown_treatments = {
"employmentstatus": {
"valid_values": [
"disability", "fixed income", "full time employed", "part time employment",
"retired benefits", "self employed", "student", "unemployed", "welfare"
],
"default_treatment": "other"
},
"trueipgeo": {
"valid_values": ["US"],
"default_treatment": "other"
},
"digital_id_trust_score_rating": {
"valid_values": ["very_high", "high", "neutral", "low"],
"default_treatment": "very_low"
},
"educationlevel": {
"valid_values": ["associate's degree", "bachelor's degree", "doctorate", "high school", "master's degree"],
"default_treatment": "other"
},
"os_version": {
"valid_values": [
'18', '17', '16', '15', '14', '13', '12', '11', '10', '9', '8'
],
"default_treatment": 'unknown'
},
"ip_net_speed_cell": {
"valid_values": [
"broadband", "cable", "dialup", "dsl", "fixed wireless", "mobile", "mobile wireless", "ocx", "satellite",
"t1", "tx", "wireless", "xdsl"
],
"default_treatment": "mobile"
},
"digital_id_confidence_rating": {
"valid_values": ["high", "medium", "very_high"],
"default_treatment": "very_low"
},
"riskrating": {
"valid_values": ["low", "medium", "neutral", "trusted"],
"default_treatment": "high"
},
"ownhome": {
"valid_values": ["true", "false"],
"default_treatment": np.nan
},
}
for column, treatment in unknown_treatments.items():
combined_df = handle_unknowns(combined_df, column, treatment["valid_values"], treatment["default_treatment"])
payfrequency_map = {
"biweekly": ["biweekly", "bi-weekly", "bi weekly", "bw"],
"semimonthly": ["semi-monthly", "semimonthly"]
}
combined_df['payfrequency'] = combined_df['payfrequency'].apply(
lambda x: next((key for key, values in payfrequency_map.items() if str(x).lower() in values), np.nan)
)
return combined_df[final_cols]

46
processing.py Normal file
View File

@ -0,0 +1,46 @@
import pandas as pd
import numpy as np
import xgboost as xgb
import joblib
import json
def processing(input_data):
df = pd.DataFrame(input_data)
# Load Model
model_path = "./xgboost_model.joblib"
# model_path = "C:/Users/abinisha/habemco_flowx/m1_v1/xgboost_model.joblib"
model = joblib.load(model_path)
df.rename(columns={'riskrating': 'RiskRating', 'trueipgeo': 'TrueIpGeo'}, inplace=True)
# Load Category Orders
category_orders_path ="./category_orders_train.json"
# category_orders_path = "C:/Users/abinisha/habemco_flowx/m1_v1/category_orders_train.json"
with open(category_orders_path, 'r') as f:
category_orders = json.load(f)
if df.empty:
raise ValueError("Input DataFrame is empty.")
# Ensure all expected features exist
expected_features = model.feature_names
for col, categories in category_orders.items():
df[col].replace([None, "", "null", np.nan, "nan", " "], np.nan, inplace=True)
df[col] = pd.Categorical(df[col], categories=categories, ordered=True)
# missing_features = [feature for feature in expected_features if feature not in df.columns]
# for feature in missing_features:
# df[feature] = np.nan # Use NaN to avoid dtype issues
# Create XGBoost DMatrix
dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=np.nan)
# Make predictions
predictions = model.predict(dmatrix)
df['prediction'] = predictions
return df

View File

@ -1 +1,95 @@
{}
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"application_key": {
"type": ["string", "null"],
"description": "Unique identifier for the application."
},
"application_timestamp": {
"type": ["string", "null"],
"description": "Timestamp when the application was submitted in UTC."
},
"application_ssn": {
"type": ["string", "null"],
"description": "Social Security Number of the applicant."
},
"application_email_address": {
"type": ["string", "null"],
"description": "Email address of the applicant."
},
"application_bank_account_number": {
"type": ["string", "null"],
"description": "Bank account number of the applicant."
},
"application_is_rejected": {
"type": ["boolean", "null"],
"description": "Indicates whether the application was rejected."
},
"application_date_of_birth": {
"type": ["string", "null"],
"description": "Date of birth of the applicant."
},
"EventType": {
"type": ["string", "null"],
"description": "Type of event associated with the application."
},
"RiskRating": {
"type": ["string", "null"],
"description": "Risk rating assigned to the application."
},
"TmxSummaryReasonCode": {
"type": ["string", "null"],
"description": "Reason code summary from third-party risk assessment."
},
"DigitalIdConfidence": {
"type": ["string", "null"],
"description": "Confidence score for the digital identity of the applicant."
},
"TrueIpGeo": {
"type": ["string", "null"],
"description": "Geolocation information of the true IP address used in the application."
},
"Blob": {
"type": ["string", "null"],
"description": "Raw data blob containing additional information related to the application."
},
"DeviceId": {
"type": ["string", "null"],
"description": "Unique identifier for the device used to submit the application."
},
"FuzzyDeviceId": {
"type": ["string", "null"],
"description": "Hashed or partially anonymized identifier for the device."
},
"ownhome": {
"type": ["boolean", "null"],
"description": "Indicates whether the applicant owns a home."
},
"employmentstatus": {
"type": ["string", "null"],
"description": "Employment status of the applicant."
},
"lengthatjob": {
"type": ["number", "null"],
"description": "Length of time (in months) the applicant has been at their current job."
},
"payfrequency": {
"type": ["string", "null"],
"description": "Frequency of pay for the applicant (e.g., weekly, biweekly, monthly)."
},
"lengthatbank": {
"type": ["string", "null"],
"description": "Length of time the applicant has been with their bank."
},
"educationlevel": {
"type": ["string", "null"],
"description": "Highest level of education attained by the applicant."
},
"monthsatresidence": {
"type": ["number", "null"],
"description": "Number of months the applicant has lived at their current residence."
}
},
"required": []
}

View File

@ -1 +1,6 @@
{}
pandas == 2.2.3
numpy == 2.2.3
xgboost == 2.1.4
joblib == 1.4.2
jmespath == 1.0.1
regex == 2023.12.25

View File

@ -1 +1,34 @@
{}
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"application_key": {
"type": ["string", "null"],
"description": "Application Key"
},
"application_timestamp": {
"type": ["string", "null"],
"description": "Application Timestamp"
},
"deviceid": {
"type": ["string", "null"],
"description": "Deviceid"
},
"fuzzydeviceid": {
"type": ["string", "null"],
"description": "Fuzzy Deviceid"
},
"application_email_address": {
"type": ["string", "null"],
"description": "Application Email Address"
},
"hd_score_m1": {
"type": ["number", "null"],
"description": "HD Fraud Score M1"
},
"action": {
"type": ["string", "null"],
"description": "Recommended Action."
}
}
}

45
test_block.py Normal file

File diff suppressed because one or more lines are too long

BIN
xgboost_model.joblib Normal file

Binary file not shown.