import logging from typing import Dict import numpy as np import pandas as pd from pre_processing import THX_FIELDS # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger(__name__) def post_processing_m1(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() try: df["hd_score_m1"] = np.round( np.minimum(df["prediction"] * 100 + 0.00001, 1) * 85 + np.maximum(np.log2(df["prediction"] * 100 + 0.000001) * 185, 0), 0, ) logging.info("hd_score_m1 calculated: %s", df["hd_score_m1"].iloc[0]) except Exception as e: logging.error("Error processing hd_score_m1 calculations: %s", e) return df def post_processing_m2(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() try: df["hd_score_m2"] = np.round( np.minimum(df["pd_m2"] * 100.0 + 0.00001, 1.0) * 75.0 + np.maximum(np.log2(df["pd_m2"] * 100.0 + 0.000001) * 180.0, 0.0), 0, ) df["hd_score_iso_m2"] = np.round( np.minimum(df["pd_m2_iso"] * 100.0 + 0.00001, 1.0) * 97.0 + np.maximum(np.log2(df["pd_m2_iso"] * 100.0 + 0.000001) * 246.0, 0.0), 0, ) logging.info("hd_score_m2 calculated: %s", df["hd_score_m2"].iloc[0]) logging.info("hd_score_iso_m2 calculated: %s", df["hd_score_iso_m2"].iloc[0]) except Exception as e: logging.error("Error processing hd_score_m2 calculations: %s", e) return df def _safe_get(df: pd.DataFrame, column: str): """Return scalar from single-row DataFrame, normalizing NaN/None to None.""" if column not in df.columns: return None val = df[column].iloc[0] if isinstance(val, (list, dict)): return val try: if pd.isna(val): return None except TypeError: pass return val def post_processing_all(df_m1: pd.DataFrame, df_m2: pd.DataFrame, df_thx: pd.DataFrame) -> Dict[str, object]: df_m1_scored = post_processing_m1(df_m1) df_m2_scored = post_processing_m2(df_m2) row_m1 = df_m1_scored.iloc[0] row_m2 = df_m2_scored.iloc[0] result = { "application_key": row_m1.get("application_key"), "application_timestamp": str(row_m1.get("application_timestamp")) if row_m1.get("application_timestamp") is not None else None, "deviceid": row_m1.get("deviceid"), "fuzzydeviceid": row_m1.get("fuzzydeviceid"), "application_email_address": row_m1.get("application_email_address"), "hd_score_m1": row_m1.get("hd_score_m1"), "hd_score_m2": row_m2.get("hd_score_m2"), "hd_score_iso_m2": row_m2.get("hd_score_iso_m2"), "action": None, } flattened_thx = {field: _safe_get(df_thx, field) for field in THX_FIELDS if field not in result} result.update(flattened_thx) return result # Legacy entry point for backward compatibility def post_processing(df: pd.DataFrame) -> Dict[str, object]: df_scored = post_processing_m1(df) row = df_scored.iloc[0] return { "application_key": row.get("application_key"), "application_timestamp": str(row.get("application_timestamp")) if row.get("application_timestamp") is not None else None, "deviceid": row.get("deviceid"), "fuzzydeviceid": row.get("fuzzydeviceid"), "application_email_address": row.get("application_email_address"), "hd_score_m1": row.get("hd_score_m1"), }