blocks-transformer/pre_processing.py
Ankur Malik d0f4d225ee
All checks were successful
Build and Push Docker Image / test (push) Successful in 2m45s
Build and Push Docker Image / build_and_push (push) Successful in 4m16s
Sync m-1-v-1 block with local updates
2025-11-23 23:22:32 -05:00

577 lines
28 KiB
Python

import logging
import math
import re
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, Union
import numpy as np
import pandas as pd
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parent
M2_LATLONG_REF_PATH = BASE_DIR / "latitute_longitute_reference.csv"
THX_FIELDS = [
"application_key",
"application_timestamp",
"digital_id_first_seen",
"summary_risk_score",
"cpu_clock",
"account_login_first_seen",
"account_telephone_first_seen",
"true_ip_first_seen",
"ssn_hash_first_seen",
"account_email_attributes",
"tps_ip_latitude",
"tps_ip_longitude",
]
# Hardcoded M2 data dictionary (replaces file lookup)
M2_DATA_DICTIONARY: Dict[str, Dict[str, Union[float, str, None]]] = {
"account_email_attributes_challenge_passed": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_email_attributes_challenged": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_email_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_login_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_name_activities_trusted_prob": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_telephone_attributes_challenge_failed": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_telephone_attributes_loan_app": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_telephone_attributes_trusted": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"account_telephone_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"accphone_gbl_velocity_hour": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"applicant_age": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"browser_spoof_score": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"day": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"day_cos": {"data_type": "float", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"day_sin": {"data_type": "float", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"di_autofill_count_login": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_attributes_challenged": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_attributes_trusted": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_day_diff": {"data_type": "int", "valid_min": -999999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_month_diff": {"data_type": "int", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digital_id_trust_score": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"digitalidconfidence": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_dnsip_ref_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_em_ip_ref_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_inputip_ref_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_proxyip_ref_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_trueip_dnsip_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_trueip_em_ip_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"dist_trueip_ref_km": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"domain_creation_days": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"emailtofullnameconfidence": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"emailtolastnameconfidence": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"first_seen_days": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"fraudrisk": {"data_type": "string", "valid_min": None, "valid_max": None, "observed_cap_min": None, "observed_cap_max": None},
"fuzzy_device_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"fuzzy_device_id_confidence": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"hour": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"input_ip_connection_type": {"data_type": "string", "valid_min": None, "valid_max": None, "observed_cap_min": None, "observed_cap_max": None},
"iptophoneconfidence": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"national_id_first_seen_age": {"data_type": "int", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"overalldigitalidentityscore": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"policy_score": {"data_type": "float", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"proxy_ip_first_seen_age": {"data_type": "int", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"proxy_ip_worst_score": {"data_type": "float", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"proxy_score": {"data_type": "float", "valid_min": 0.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"reasoncode_new_smartid_lt_1wk_global": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"riskrating": {"data_type": "string", "valid_min": None, "valid_max": None, "observed_cap_min": None, "observed_cap_max": None},
"ssn_hash_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"totalhits": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"true_ip_activities_trusted_prob": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"true_ip_attributes_trusted": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"true_ip_attributes_trusted_conf": {"data_type": "int", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"true_ip_first_seen_age": {"data_type": "int", "valid_min": -999.0, "valid_max": 999999.0, "observed_cap_min": None, "observed_cap_max": None},
"true_ip_worst_score": {"data_type": "float", "valid_min": -999.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
"uniquehits": {"data_type": "float", "valid_min": 0.0, "valid_max": 999.0, "observed_cap_min": None, "observed_cap_max": None},
}
# Hardcoded one-hot config (parsed_feature, model_var, contains)
M2_ONEHOT_CONFIG: List[Tuple[str, str, str]] = [
("reasoncode", "reasoncode_new_smartid_lt_1wk_global", "new_smartid_lt_1wk_global"),
("account_name_activities", "account_name_activities_trusted_prob", "trusted_prob"),
("account_email_attributes", "account_email_attributes_challenged", "challenged"),
("account_email_attributes", "account_email_attributes_challenge_passed", "challenge_passed"),
("true_ip_attributes", "true_ip_attributes_trusted", "trusted"),
("true_ip_attributes", "true_ip_attributes_trusted_conf", "trusted_conf"),
("digital_id_attributes", "digital_id_attributes_challenged", "challenged"),
("digital_id_attributes", "digital_id_attributes_trusted", "trusted"),
("account_telephone_attributes", "account_telephone_attributes_challenge_failed", "challenge_failed"),
("account_telephone_attributes", "account_telephone_attributes_loan_app", "loan_app"),
("account_telephone_attributes", "account_telephone_attributes_trusted", "trusted"),
("true_ip_activities", "true_ip_activities_trusted_prob", "trusted_prob"),
]
# ----------------------------
# Helpers
# ----------------------------
def _handle_unknowns(X: pd.DataFrame, column: str, known_values: Iterable[str], default_treatment=None):
if column not in X.columns:
return X
known_values = {str(val).lower() for val in known_values}
invalid_values = {None, "none", "nan", pd.NA}
X[column] = X[column].apply(
lambda x: str(x).lower()
if pd.notna(x) and str(x).lower() in known_values
else (default_treatment if pd.notna(x) and str(x).lower() not in invalid_values else np.nan)
)
return X
def _haversine_km(lat1, lon1, lat2, lon2):
if None in (lat1, lon1, lat2, lon2):
return None
try:
rlat1 = float(lat1) * math.pi / 180.0
rlat2 = float(lat2) * math.pi / 180.0
dlat = (float(lat2) - float(lat1)) * math.pi / 180.0
dlon = (float(lon2) - float(lon1)) * math.pi / 180.0
except Exception:
return None
a = (
math.sin(dlat / 2.0) ** 2
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2.0) ** 2
)
a = min(1.0, max(0.0, a))
return 2 * 6371.0088 * math.asin(math.sqrt(a))
def _prep_latlong_ref():
if not M2_LATLONG_REF_PATH.exists():
logger.warning("latitute_longitute_reference.csv missing at %s", M2_LATLONG_REF_PATH)
return pd.DataFrame()
try:
ref = pd.read_csv(M2_LATLONG_REF_PATH, usecols=["postal_code_ref", "latitute_ref", "longitude_ref"])
except Exception:
ref = pd.read_csv(M2_LATLONG_REF_PATH)
# keep lower string version for matching
if "postal_code_ref" in ref.columns:
ref["postal_code_ref"] = ref["postal_code_ref"].astype(str).str.lower()
return ref
def _normalize_zip_for_ref(zip_val):
"""
Normalize zip/postal code values so they match reference CSV keys.
- Floats like 89503.0 -> "89503"
- Int-like strings "89503.0" -> "89503"
Note: we intentionally avoid zero-filling to preserve behaviour seen in UAT references
where leading-zero ZIPs are not matched to the reference table.
"""
if pd.isna(zip_val):
return None
if isinstance(zip_val, (int, float)) and not isinstance(zip_val, bool):
return str(int(zip_val)).lower()
zip_str = str(zip_val).strip()
if zip_str.replace(".", "", 1).isdigit():
try:
return str(int(float(zip_str))).lower()
except Exception:
pass
return zip_str.lower() if zip_str else None
# ----------------------------
# M1 Pre-processing (existing behaviour)
# ----------------------------
def pre_processing_m1(data_df: pd.DataFrame) -> pd.DataFrame:
combined_df = data_df.copy()
combined_df["applicant_age"] = combined_df.apply(
lambda row: pd.to_datetime(row["application_timestamp"]).year - pd.to_datetime(row["application_date_of_birth"]).year
if pd.notnull(row["application_timestamp"]) and pd.notnull(row["application_date_of_birth"])
else None,
axis=1,
)
combined_df["application_timestamp"] = pd.to_datetime(combined_df["application_timestamp"])
combined_df.loc[:, "application_time"] = pd.to_datetime(combined_df["application_timestamp"]).dt.time
combined_df["day"] = combined_df["application_timestamp"].dt.day
combined_df["day_of_week"] = combined_df["application_timestamp"].dt.weekday
combined_df["day_sin"] = np.sin(2 * np.pi * combined_df["day"] / 31)
combined_df["day_cos"] = np.cos(2 * np.pi * combined_df["day"] / 31)
combined_df["day_of_week_sin"] = np.sin(2 * np.pi * combined_df["day_of_week"] / 7)
combined_df["day_of_week_cos"] = np.cos(2 * np.pi * combined_df["day_of_week"] / 7)
def classify_day_night(hour):
if 6 <= hour < 18:
return "Day"
return "Night"
combined_df["hour"] = combined_df["application_time"].apply(lambda x: x.hour if pd.notnull(x) else np.nan)
combined_df["day_night"] = combined_df["hour"].apply(lambda hour: classify_day_night(hour) if pd.notnull(hour) else "Unknown")
combined_df["os_version"] = combined_df["os_version"].apply(
lambda x: x.split(".")[0] if isinstance(x, str) and "." in x else x.split("_")[0] if isinstance(x, str) and "_" in x else x
)
combined_df["Identity_Negative_History"] = combined_df["tmxsummaryreasoncode"].astype(str).str.contains(
"Identity_Negative_History", na=False, regex=True
).astype(int)
combined_df["Device_Negative_History"] = combined_df["tmxsummaryreasoncode"].astype(str).str.contains(
"Device_Negative_History", na=False, regex=True
).astype(int)
combined_df["Level_1_Link_Reject"] = combined_df["tmxsummaryreasoncode"].astype(str).str.contains(
"Level_1_Link_Reject", na=False, regex=True
).astype(int)
combined_df["IP_Negative_History"] = combined_df["tmxsummaryreasoncode"].astype(str).str.contains(
"IP_Negative_History", na=False, regex=True
).astype(int)
combined_df["Identity_Spoofing"] = combined_df["tmxsummaryreasoncode"].astype(str).str.contains(
"Identity_Spoofing", na=False, regex=True
).astype(int)
combined_df["digitalidconfidence"] = pd.to_numeric(combined_df["digitalidconfidence"], errors="coerce").astype("Int64")
combined_df.rename(
columns={
"DigitalIdConfidence": "digitalidconfidence",
},
inplace=True,
)
dtype_dict = {
"applicant_age": int,
"digitalidconfidence": float,
"first_seen_days": float,
"employmentstatus": str,
"ea_score": float,
"trueipgeo": str,
"hour": int,
"email_creation_days": float,
"lengthatjob": float,
"day_cos": float,
"summary_risk_score": float,
"digital_id_trust_score_rating": str,
"day": "int32",
"lengthatbank": float,
"day_of_week_cos": float,
"Level_1_Link_Reject": int,
"Identity_Negative_History": int,
"educationlevel": str,
"os_version": str,
"account_email_worst_score": float,
"true_ip_score": float,
"ip_net_speed_cell": str,
"account_email_score": float,
"day_of_week": "int32",
"true_ip_worst_score": float,
"proxy_ip_worst_score": float,
"day_night": str,
"proxy_ip_score": float,
"monthsatresidence": float,
"Device_Negative_History": int,
"fuzzy_device_score": float,
"day_sin": float,
"ip_region_confidence": float,
"true_ip_state_confidence": float,
"IP_Negative_History": int,
"fuzzy_device_worst_score": float,
"day_of_week_sin": float,
"riskrating": str,
"payfrequency": str,
"ownhome": str,
"Identity_Spoofing": int,
}
next_block_cols = ["application_key", "application_timestamp", "deviceid", "fuzzydeviceid", "application_email_address"]
cols_to_keep = [col for col in dtype_dict.keys() if col in combined_df.columns]
final_cols = list(set(next_block_cols).union(set(cols_to_keep)))
for col, dtype in dtype_dict.items():
if col in combined_df.columns:
if dtype == int:
combined_df[col] = pd.to_numeric(combined_df[col], errors="coerce", downcast="integer")
elif dtype == float:
combined_df[col] = pd.to_numeric(combined_df[col], errors="coerce", downcast="float")
elif dtype == str:
combined_df[col] = combined_df[col].astype(str)
capping_dict = {
"applicant_age": (18, 93),
"digitalidconfidence": (0, 9017),
"first_seen_days": (0, 10486),
"ea_score": (1, 930),
"hour": (0, 23),
"email_creation_days": (2438, 9661),
"lengthatjob": (1, 24),
"day_cos": (-0.9948693234, 1),
"summary_risk_score": (-100, 30),
"day": (1, 31),
"lengthatbank": (0, 25),
"day_of_week_cos": (-0.9009688679, 1),
"Level_1_Link_Reject": (0, 1),
"Identity_Negative_History": (0, 1),
"account_email_worst_score": (-52, 0),
"true_ip_score": (-38, 49),
"account_email_score": (-18, 9),
"day_of_week": (0, 6),
"true_ip_worst_score": (-100, 0),
"proxy_ip_worst_score": (-100, 0),
"proxy_ip_score": (-29, 60),
"monthsatresidence": (0, 25),
"Device_Negative_History": (0, 1),
"fuzzy_device_score": (-29, 14),
"day_sin": (-0.9987165072, 0.9987165072),
"ip_region_confidence": (75, 99),
"IP_Negative_History": (0, 1),
"fuzzy_device_worst_score": (-100, 0),
"day_of_week_sin": (-0.9749279122, 0.9749279122),
"Identity_Spoofing": (0, 1),
}
for column, (cap_min, cap_max) in capping_dict.items():
if column in combined_df.columns:
combined_df[column] = combined_df[column].clip(lower=cap_min, upper=cap_max)
unknown_treatments = {
"employmentstatus": {
"valid_values": [
"disability",
"fixed income",
"full time employed",
"part time employment",
"retired benefits",
"self employed",
"student",
"unemployed",
"welfare",
],
"default_treatment": "other",
},
"trueipgeo": {"valid_values": ["US"], "default_treatment": "other"},
"digital_id_trust_score_rating": {"valid_values": ["very_high", "high", "neutral", "low"], "default_treatment": "very_low"},
"educationlevel": {
"valid_values": ["associate's degree", "bachelor's degree", "doctorate", "high school", "master's degree"],
"default_treatment": "other",
},
"os_version": {
"valid_values": ["18", "17", "16", "15", "14", "13", "12", "11", "10", "9", "8"],
"default_treatment": "unknown",
},
"ip_net_speed_cell": {
"valid_values": [
"broadband",
"cable",
"dialup",
"dsl",
"fixed wireless",
"mobile",
"mobile wireless",
"ocx",
"satellite",
"t1",
"tx",
"wireless",
"xdsl",
],
"default_treatment": "mobile",
},
"digital_id_confidence_rating": {"valid_values": ["high", "medium", "very_high"], "default_treatment": "very_low"},
"riskrating": {"valid_values": ["low", "medium", "neutral", "trusted"], "default_treatment": "high"},
"ownhome": {"valid_values": ["true", "false"], "default_treatment": np.nan},
}
for column, treatment in unknown_treatments.items():
combined_df = _handle_unknowns(combined_df, column, treatment["valid_values"], treatment["default_treatment"])
payfrequency_map = {"biweekly": ["biweekly", "bi-weekly", "bi weekly", "bw"], "semimonthly": ["semi-monthly", "semimonthly"]}
combined_df["payfrequency"] = combined_df["payfrequency"].apply(
lambda x: next((key for key, values in payfrequency_map.items() if str(x).lower() in values), np.nan)
)
return combined_df[final_cols]
# ----------------------------
# M2 Pre-processing
# ----------------------------
def _apply_onehot_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for parsed_feature, model_var, contains_val in M2_ONEHOT_CONFIG:
value = df.get(parsed_feature, pd.Series([None])).iloc[0]
flag = 0
if isinstance(value, list):
flag = int(any(contains_val in str(v).lower() or re.sub(r"[^a-z0-9]+", " ", contains_val) in re.sub(r"[^a-z0-9]+", " ", str(v).lower()) for v in value))
elif isinstance(value, str):
val_norm = re.sub(r"[^a-z0-9]+", " ", value.lower())
contains_norm = re.sub(r"[^a-z0-9]+", " ", contains_val)
flag = int(contains_val in value.lower() or contains_norm in val_norm)
df[model_var] = flag
return df
def _extract_first_seen_days(ts_value, app_ts):
ts = pd.to_datetime(ts_value, errors="coerce", utc=True)
app = pd.to_datetime(app_ts, errors="coerce", utc=True)
# align to naive for subtraction
if isinstance(ts, pd.Timestamp) and ts.tzinfo is not None:
ts = ts.tz_localize(None)
if isinstance(app, pd.Timestamp) and app.tzinfo is not None:
app = app.tz_localize(None)
if pd.isna(ts) or pd.isna(app):
return None
return (app.normalize() - ts.normalize()).days
def _to_naive_ts(val):
ts = pd.to_datetime(val, errors="coerce", utc=True)
if isinstance(ts, pd.Timestamp) and ts.tzinfo is not None:
ts = ts.tz_localize(None)
return ts
def _month_diff(earlier, later):
"""Month difference (earlier - later) using year/month buckets."""
ts_earlier = _to_naive_ts(earlier)
ts_later = _to_naive_ts(later)
if pd.isna(ts_earlier) or pd.isna(ts_later):
return None
return (ts_earlier.year - ts_later.year) * 12 + (ts_earlier.month - ts_later.month)
def pre_processing_m2(data_df: pd.DataFrame) -> pd.DataFrame:
df = data_df.copy()
df.columns = df.columns.str.lower()
# Timestamp-derived features (align with M1 behaviour to keep probabilities consistent)
df["application_timestamp"] = pd.to_datetime(df["application_timestamp"], errors="coerce", utc=True)
df["day"] = df["application_timestamp"].dt.day
df["hour"] = df["application_timestamp"].dt.hour
df["day_sin"] = np.sin(2 * np.pi * df["day"] / 31)
df["day_cos"] = np.cos(2 * np.pi * df["day"] / 31)
def _classify_day_night(hour_val):
if pd.isna(hour_val):
return np.nan
return "day" if 6 <= hour_val < 18 else "night"
df["day_night"] = df["hour"].apply(_classify_day_night)
# Apply onehot flags from attributes
df = _apply_onehot_features(df)
# Distances
lat_ref = _prep_latlong_ref()
if not lat_ref.empty and "zip" in df.columns:
zip_value = df["zip"].iloc[0]
zip_lookup = _normalize_zip_for_ref(zip_value)
ref_row = lat_ref[lat_ref["postal_code_ref"] == zip_lookup] if zip_lookup else pd.DataFrame()
lat_ref_val = ref_row["latitute_ref"].iloc[0] if not ref_row.empty else None
lon_ref_val = ref_row["longitude_ref"].iloc[0] if not ref_row.empty else None
else:
lat_ref_val = None
lon_ref_val = None
df["dist_inputip_ref_km"] = df.apply(
lambda r: _haversine_km(lat_ref_val, lon_ref_val, r.get("input_ip_latitude"), r.get("input_ip_longitude")), axis=1
)
df["dist_em_ip_ref_km"] = df.apply(
lambda r: _haversine_km(lat_ref_val, lon_ref_val, r.get("ip_latitude") or r.get("tps_ip_latitude"), r.get("ip_longitude") or r.get("tps_ip_longitude")),
axis=1,
)
df["dist_proxyip_ref_km"] = df.apply(
lambda r: _haversine_km(lat_ref_val, lon_ref_val, r.get("proxy_ip_latitude"), r.get("proxy_ip_longitude")), axis=1
)
df["dist_dnsip_ref_km"] = df.apply(
lambda r: _haversine_km(lat_ref_val, lon_ref_val, r.get("dns_ip_latitude"), r.get("dns_ip_longitude")), axis=1
)
df["dist_trueip_ref_km"] = df.apply(
lambda r: _haversine_km(lat_ref_val, lon_ref_val, r.get("true_ip_latitude"), r.get("true_ip_longitude")), axis=1
)
df["dist_trueip_em_ip_km"] = df.apply(
lambda r: _haversine_km(r.get("true_ip_latitude"), r.get("true_ip_longitude"), r.get("ip_latitude") or r.get("tps_ip_latitude"), r.get("ip_longitude") or r.get("tps_ip_longitude")),
axis=1,
)
df["dist_trueip_dnsip_km"] = df.apply(
lambda r: _haversine_km(r.get("true_ip_latitude"), r.get("true_ip_longitude"), r.get("dns_ip_latitude"), r.get("dns_ip_longitude")),
axis=1,
)
# Ages
app_ts_val = df.get("application_timestamp", pd.Series([None])).iloc[0]
def _safe_day_diff(row):
if not row.get("digital_id_first_seen"):
return None
val = _extract_first_seen_days(row.get("digital_id_first_seen"), app_ts_val)
return -val if val is not None else None
df["digital_id_day_diff"] = df.apply(_safe_day_diff, axis=1)
df["digital_id_month_diff"] = df.apply(lambda r: _month_diff(r.get("digital_id_first_seen"), app_ts_val), axis=1)
for col_name in [
"digital_id_first_seen",
"account_email_first_seen",
"account_login_first_seen",
"account_telephone_first_seen",
"true_ip_first_seen",
"ssn_hash_first_seen",
"fuzzy_device_first_seen",
"national_id_first_seen",
"proxy_ip_first_seen",
]:
out_col = f"{col_name}_age"
df[out_col] = df.apply(lambda r: _extract_first_seen_days(r.get(col_name), app_ts_val), axis=1)
# applicant_age for consistency if not present
if "applicant_age" not in df.columns:
df["applicant_age"] = df.apply(
lambda row: pd.to_datetime(row["application_timestamp"]).year - pd.to_datetime(row["application_date_of_birth"]).year
if pd.notnull(row.get("application_timestamp")) and pd.notnull(row.get("application_date_of_birth"))
else None,
axis=1,
)
# Safe casting and capping using data dictionary
for var_name, rules in M2_DATA_DICTIONARY.items():
if var_name not in df.columns:
continue
col = pd.to_numeric(df[var_name], errors="coerce") if rules.get("data_type") in ["float", "int"] else df[var_name]
if rules.get("data_type") == "int":
col = col.astype("float")
valid_min = rules.get("valid_min")
valid_max = rules.get("valid_max")
observed_min = rules.get("observed_cap_min")
observed_max = rules.get("observed_cap_max")
if observed_min is not None or observed_max is not None:
col = col.clip(lower=observed_min, upper=observed_max)
# if valid_min is not None:
# col = col.where(col >= valid_min, np.nan)
# if valid_max is not None:
# col = col.where(col <= valid_max, np.nan)
df[var_name] = col
return df
def pre_processing_all(data_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# Ensure requested THX fields exist so downstream packaging always has keys
df_base = data_df.copy()
for field in THX_FIELDS:
if field in df_base.columns:
df_base[field] = df_base[field].astype(str)
else:
df_base[field] = None
df_thx = df_base[THX_FIELDS].copy()
df_m1 = pre_processing_m1(df_base.copy())
df_m2 = pre_processing_m2(df_base.copy())
return df_m1, df_m2, df_thx
# Backwards compatible entry point (used by legacy code/tests if any)
def pre_processing(data_df: pd.DataFrame) -> pd.DataFrame:
df_m1, _, _ = pre_processing_all(data_df)
return df_m1