blocks-transformer/graph_post_processing.py
Ankur Malik 9f3cb9ca4f
All checks were successful
Build and Push Docker Image / test (push) Successful in 25s
Build and Push Docker Image / build_and_push (push) Successful in 2m37s
Add G2 pipeline, models, and schema for g1_v1
2025-11-26 11:50:21 -05:00

56 lines
1.7 KiB
Python

import logging
import math
from decimal import Decimal, ROUND_HALF_UP
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def post_processing_g1(data):
try:
prediction = data.get("prediction", 0)
score_g1 = round(
min(prediction * 100 + 0.00001, 1) * 89 +
max(math.log2(prediction * 100 + 0.000001) * 193, 0),
0
)
data["hd_score_g1"] = score_g1
logger.info("score_g1 calculated: %s", score_g1)
except Exception as e:
logger.error("Error processing score_g1 calculations: %s", e)
return {
key: data.get(key, None)
for key in [
"hd_score_m1", "hd_score_g1", "cluster_size_users_v2",
"target_connected_30_sum", "email_cnt", "rejected_app_count",
"app_dt_day_cnt", "hd_score_iso_m2"
]
}
def post_processing_g2(data):
prediction = data.get("prediction_g2", data.get("prediction"))
hd_score_g2 = None
try:
if prediction is not None:
prediction_val = float(prediction)
raw_score = (prediction_val * 100) * 20 + math.log((prediction_val + 0.000001) * 100, 2) * 41.6
# SQL-like rounding (half up)
hd_score_g2 = int(Decimal(str(raw_score)).quantize(Decimal("1"), rounding=ROUND_HALF_UP))
hd_score_g2 = max(hd_score_g2, 0.0)
logger.info("score_g2 calculated: %s", hd_score_g2)
except Exception as e:
logger.error("Error processing score_g2 calculations: %s", e)
return {"hd_score_g2": hd_score_g2}
# Backward compatibility alias
post_processing = post_processing_g1