blocks-transformer/graph_pre_processing.py
Ankur Malik 9f3cb9ca4f
All checks were successful
Build and Push Docker Image / test (push) Successful in 25s
Build and Push Docker Image / build_and_push (push) Successful in 2m37s
Add G2 pipeline, models, and schema for g1_v1
2025-11-26 11:50:21 -05:00

92 lines
2.6 KiB
Python

import logging
import numpy as np
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
G2_PREDICTORS = [
"hd_score_m2",
"rejected_app_count",
"hd_score_m2_connected_max",
"hd_score_m2_connected_avg",
"applicant_age_connected_max",
"applicant_age_connected_avg",
"account_tel_first_seen_min_conn",
"account_tel_first_seen_max_conn",
"account_tel_first_seen_avg_conn",
"ssn_hash_first_seen_min_conn",
"ssn_hash_first_seen_avg_conn",
"account_login_first_seen_min_conn",
"digital_id_first_seen_max_conn",
"true_ip_first_seen_min_conn",
"true_ip_first_seen_max_conn",
"dist_em_ip_ref_km_min_conn",
"pct_acc_email_attr_challenged_1_conn",
"account_login_first_seen_range_conn",
"account_login_first_seen_stddev_conn",
"cpu_clock_range_conn",
"summary_risk_score_max_conn",
]
def _coerce_float(value):
if value is None:
return np.nan
try:
return float(value)
except (TypeError, ValueError):
return np.nan
def pre_processing_g1(results):
result = results[0]
dtypes = {
"hd_score_m1": float,
"cluster_size_users_v2": float,
"target_connected_30_sum": float,
"email_cnt": float,
"rejected_app_count": float,
"app_dt_day_cnt": float,
"hd_score_iso_m2": float
}
data = {
"hd_score_m1": result["hd_score_m1"],
"cluster_size_users_v2": result["cluster_size_users_v2"],
"target_connected_30_sum": result["target_connected_30_sum"],
"email_cnt": result["email_cnt"],
"rejected_app_count": result["rejected_app_count"],
"app_dt_day_cnt": result["app_dt_day_cnt"],
"cluster_size": result["cluster_size"],
"hd_score_iso_m2": result["hd_score_iso_m2"],
}
for col, dtype in dtypes.items():
if col in data:
value = str(data[col]).strip()
data[col] = dtype(value) if value.replace(".", "", 1).isdigit() else None
return data
def pre_processing_g2(results):
result = results[0]
working = dict(result)
if "rejected_app_count_g2" in working:
# Always prefer the G2-specific count for G2 preprocessing
working["rejected_app_count"] = working.get("rejected_app_count_g2")
data = {}
for feature in G2_PREDICTORS:
data[feature] = _coerce_float(working.get(feature))
data["cluster_size"] = working.get("cluster_size")
return data
# Backward compatibility alias
pre_processing = pre_processing_g1