Add hd_score_g2 support and reason-based scoring
All checks were successful
Build and Push Docker Image / test (push) Successful in 1m5s
Build and Push Docker Image / build_and_push (push) Successful in 21s

This commit is contained in:
Ankur Malik 2025-11-26 11:58:53 -05:00
parent 20b2f4afdb
commit b3be1ebf66
4 changed files with 50 additions and 57 deletions

View File

@ -14,7 +14,8 @@ def __main__(
hd_score_s1: float,
hd_score_s2: float,
hd_score_s3: float,
hd_score_iso_m2: float
hd_score_iso_m2: float,
hd_score_g2: float
) -> dict:
data = {
@ -24,6 +25,7 @@ def __main__(
"hd_score_s2": hd_score_s2,
"hd_score_s3": hd_score_s3,
"hd_score_iso_m2": hd_score_iso_m2,
"hd_score_g2": hd_score_g2
}
final = processing(data)

View File

@ -10,6 +10,10 @@
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"hd_score_g2": {
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"hd_score_s1": {
"type": ["number", "null"],
"description": "HD Fraud Score S1"

View File

@ -12,26 +12,46 @@ def processing(data: dict) -> dict:
score_threshold = 1200
score_threshold2 = 1225
# ---- Compute hd_score1, hd_score2, final hd_score ----
# ---- Compute hd_score based on hierarchy ----
try:
hd_score1 = max([
data["hd_score_s1"],
data["hd_score_s2"],
data["hd_score_g1"],
data["hd_score_s3"],
data["hd_score_m1"],
])
reason_hierarchy = [
("hd_score_s1", "S1 Triggered", score_threshold, True),
("hd_score_s2", "S2 Triggered", score_threshold, True),
("hd_score_g1", "G1 Triggered", score_threshold, True),
("hd_score_s3", "S3 Triggered", score_threshold, True),
("hd_score_m1", "M1 Triggered", score_threshold, True),
("hd_score_iso_m2", "M2 Triggered", score_threshold2, False),
("hd_score_g2", "G2 Triggered", score_threshold2, False),
]
hd_score2 = data["hd_score_iso_m2"]
winning_score = None
winning_reason = "Pass"
winning_threshold = None
if hd_score1 > score_threshold:
hd_score = hd_score1
elif hd_score2 >= score_threshold2:
hd_score = hd_score2
else:
hd_score = max(hd_score1, hd_score2)
for key, reason, threshold, use_gt in reason_hierarchy:
score = data.get(key)
if score is None:
continue
passes = (score > threshold) if use_gt else (score >= threshold)
if passes:
winning_score = score
winning_reason = reason
winning_threshold = threshold
break
# logging.info(f"Computed hd_score1={hd_score1}, hd_score2={hd_score2}, final hd_score={hd_score}")
if winning_score is None:
winning_score = max([
data.get("hd_score_s1", 0),
data.get("hd_score_s2", 0),
data.get("hd_score_g1", 0),
data.get("hd_score_s3", 0),
data.get("hd_score_m1", 0),
data.get("hd_score_iso_m2", 0),
data.get("hd_score_g2", 0),
])
winning_reason = "Pass"
hd_score = winning_score
except Exception as e:
logging.error(f"Error calculating hd_score values: {e}")
@ -39,7 +59,7 @@ def processing(data: dict) -> dict:
# ---- recommended_action ----
try:
if hd_score1 > score_threshold or hd_score2 >= score_threshold2:
if winning_reason != "Pass":
recommended_action = "Decline Application"
else:
recommended_action = "Pass Application"
@ -52,14 +72,10 @@ def processing(data: dict) -> dict:
# ---- description ----
try:
if hd_score1 > score_threshold:
if winning_reason != "Pass":
threshold_used = winning_threshold if winning_threshold is not None else score_threshold
description = (
f"HD Fraud Score is above the risk threshold {score_threshold}, "
f"Recommended action: Decline Application"
)
elif hd_score2 >= score_threshold2:
description = (
f"HD Fraud Score is above the risk threshold {score_threshold2}, "
f"HD Fraud Score is above the risk threshold {threshold_used}, "
f"Recommended action: Decline Application"
)
else:
@ -74,35 +90,8 @@ def processing(data: dict) -> dict:
return {}
# ---- hd_action_reasoncode ----
try:
reason_hierarchy = [
("hd_score_s1", "S1 Triggered"),
("hd_score_s2", "S2 Triggered"),
("hd_score_g1", "G1 Triggered"),
("hd_score_s3", "S3 Triggered"),
("hd_score_m1", "M1 Triggered"),
("hd_score_iso_m2", "M2 Triggered"), # NEW
]
hd_action_reasoncode = "Pass"
for key, reason in reason_hierarchy:
score = data[key]
if key == "hd_score_iso_m2":
if score >= score_threshold2:
hd_action_reasoncode = reason
break
else:
if score > score_threshold:
hd_action_reasoncode = reason
break
logging.info(f"hd_action_reasoncode: {hd_action_reasoncode}")
except Exception as e:
logging.error(f"Error determining action_reasoncode: {e}")
return {}
hd_action_reasoncode = winning_reason
logging.info(f"hd_action_reasoncode: {hd_action_reasoncode}")
# ---- Return only original 4 fields ----
return {
@ -111,5 +100,3 @@ def processing(data: dict) -> dict:
"description": description,
"hd_action_reasoncode": hd_action_reasoncode,
}

View File

@ -1,7 +1,7 @@
import unittest
from block import __main__
data = {'hd_score_m1': 1173.0, 'hd_score_g1': 1203.0, 'hd_score_s1': 1240.0, 'hd_score_s2': 0, 'hd_score_s3': 0, 'hd_score_iso_m2': 1001.0}
data = {'hd_score_m1': 1173.0, 'hd_score_g1': 1203.0, 'hd_score_s1': 1240.0, 'hd_score_s2': 0, 'hd_score_s3': 0, 'hd_score_iso_m2': 1001.0, 'hd_score_g2': 0.0}
class TestBlock(unittest.TestCase):
def test_main_success(self):