Compare commits

..

4 Commits

Author SHA1 Message Date
Ankur Malik
c98f0148e3 Add Scores S4 review handling
All checks were successful
Build and Push Docker Image / test (push) Successful in 10s
Build and Push Docker Image / build_and_push (push) Successful in 19s
2026-05-20 13:24:17 -04:00
Ankur Malik
b3be1ebf66 Add hd_score_g2 support and reason-based scoring
All checks were successful
Build and Push Docker Image / test (push) Successful in 1m5s
Build and Push Docker Image / build_and_push (push) Successful in 21s
2025-11-26 11:58:53 -05:00
Ankur Malik
20b2f4afdb Update Scores block processing and schema
All checks were successful
Build and Push Docker Image / test (push) Successful in 8s
Build and Push Docker Image / build_and_push (push) Successful in 17s
2025-11-23 23:41:46 -05:00
d1e90e8a14 Composite score block
All checks were successful
Build and Push Docker Image / test (push) Successful in 10s
Build and Push Docker Image / build_and_push (push) Successful in 20s
2025-03-12 16:16:45 +00:00
7 changed files with 309 additions and 23 deletions

View File

@ -1 +1 @@
**Hello world!!!** # Block for assigning a composite score

View File

@ -1,21 +1,39 @@
@flowx_block import logging
def example_function(request: dict) -> dict: from score_processing import processing
# Processing logic here... # Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
return { def __main__(
"meta_info": [ hd_score_m1: float,
{ hd_score_g1: float,
"name": "created_date", hd_score_s1: float,
"type": "string", hd_score_s2: float,
"value": "2024-11-05" hd_score_s3: float,
} hd_score_iso_m2: float,
], hd_score_g2: float,
"fields": [ hd_score_s4: float = 0
{ ) -> dict:
"name": "",
"type": "", data = {
"value": "" "hd_score_m1": hd_score_m1,
} "hd_score_g1": hd_score_g1,
] "hd_score_s1": hd_score_s1,
"hd_score_s2": hd_score_s2,
"hd_score_s3": hd_score_s3,
"hd_score_iso_m2": hd_score_iso_m2,
"hd_score_g2": hd_score_g2,
"hd_score_s4": hd_score_s4
} }
final = processing(data)
logger.info(f"Scores of application: {final}")
return final
# testing :
# __main__

View File

@ -1 +1,39 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"hd_score_m1": {
"type": ["number", "null"],
"description": "HD Fraud Score M1"
},
"hd_score_g1": {
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"hd_score_g2": {
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"hd_score_s1": {
"type": ["number", "null"],
"description": "HD Fraud Score S1"
},
"hd_score_s2": {
"type": ["number", "null"],
"description": "HD Fraud Score S2"
},
"hd_score_s3": {
"type": ["number", "null"],
"description": "HD Fraud Score S3"
},
"hd_score_s4": {
"type": ["number", "null"],
"description": "HD Fraud Score S4"
},
"hd_score_iso_m2": {
"type": ["number", "null"],
"description": "HD Fraud Score M2"
}
},
"required": []
}

View File

@ -1 +1 @@
{}

View File

@ -1 +1,25 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"hd_score": {
"type": ["number", "null"],
"description": "HD fraud Score"
},
"recommended_action": {
"type": ["string", "null"],
"description": "Recommended Action"
},
"description": {
"type": ["string", "null"],
"description": "Description of Recommended Action"
},
"hd_action_reasoncode": {
"type": ["string", "null"],
"description": "HD Action Reasoncode"
}
}
}

117
score_processing.py Normal file
View File

@ -0,0 +1,117 @@
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def processing(data: dict) -> dict:
score_threshold = 1200
score_threshold2 = 1225
review_threshold = 1191
# ---- Compute hd_score based on hierarchy ----
try:
reason_hierarchy = [
("hd_score_s1", "S1 Triggered", score_threshold, True),
("hd_score_s2", "S2 Triggered", score_threshold, True),
("hd_score_g1", "G1 Triggered", score_threshold, True),
("hd_score_s3", "S3 Triggered", score_threshold, True),
("hd_score_m1", "M1 Triggered", score_threshold, True),
("hd_score_iso_m2", "M2 Triggered", score_threshold2, False),
("hd_score_g2", "G2 Triggered", score_threshold2, False),
]
winning_score = None
winning_reason = "Pass"
winning_threshold = None
for key, reason, threshold, use_gt in reason_hierarchy:
score = data.get(key)
if score is None:
continue
passes = (score > threshold) if use_gt else (score >= threshold)
if passes:
winning_score = score
winning_reason = reason
winning_threshold = threshold
break
if winning_score is None:
winning_score = max([
data.get("hd_score_s1", 0),
data.get("hd_score_s2", 0),
data.get("hd_score_g1", 0),
data.get("hd_score_s3", 0),
data.get("hd_score_m1", 0),
data.get("hd_score_iso_m2", 0),
data.get("hd_score_g2", 0),
])
hd_score_s4 = data.get("hd_score_s4", 0) or 0
if hd_score_s4 >= review_threshold:
winning_score = hd_score_s4
winning_reason = "S4 Velocity Triggered"
else:
if review_threshold <= winning_score <= score_threshold:
winning_score = winning_score - 10
winning_reason = "Pass"
hd_score = winning_score
except Exception as e:
logging.error(f"Error calculating hd_score values: {e}")
return {}
# ---- recommended_action ----
try:
if winning_reason == "S4 Velocity Triggered":
recommended_action = "Review Application"
elif winning_reason != "Pass":
recommended_action = "Decline Application"
else:
recommended_action = "Pass Application"
logging.info(f"recommended_action: {recommended_action}")
except Exception as e:
logging.error(f"Error assigning recommended_action: {e}")
return {}
# ---- description ----
try:
if winning_reason == "S4 Velocity Triggered":
description = (
"HD Fraud Score is in the review range, "
"Recommended action: Review Application"
)
elif winning_reason != "Pass":
threshold_used = winning_threshold if winning_threshold is not None else score_threshold
description = (
f"HD Fraud Score is above the risk threshold {threshold_used}, "
f"Recommended action: Decline Application"
)
else:
description = (
"HD Fraud Score is below the risk threshold, Recommended action: Pass Application"
)
logging.info(f"description: {description}")
except Exception as e:
logging.error(f"Error creating description: {e}")
return {}
# ---- hd_action_reasoncode ----
hd_action_reasoncode = winning_reason
logging.info(f"hd_action_reasoncode: {hd_action_reasoncode}")
# ---- Return only original 4 fields ----
return {
"hd_score": hd_score,
"recommended_action": recommended_action,
"description": description,
"hd_action_reasoncode": hd_action_reasoncode,
}

89
test_block.py Normal file
View File

@ -0,0 +1,89 @@
import json
import unittest
from pathlib import Path
from block import __main__
BASE_DIR = Path(__file__).resolve().parent
data = {
"hd_score_m1": 1173.0,
"hd_score_g1": 1203.0,
"hd_score_s1": 1240.0,
"hd_score_s2": 0,
"hd_score_s3": 0,
"hd_score_iso_m2": 1001.0,
"hd_score_g2": 0.0,
}
def score_data(**overrides):
values = {
"hd_score_m1": 1100.0,
"hd_score_g1": 0.0,
"hd_score_s1": 0.0,
"hd_score_s2": 0.0,
"hd_score_s3": 0.0,
"hd_score_iso_m2": 1001.0,
"hd_score_g2": 0.0,
"hd_score_s4": 0.0,
}
values.update(overrides)
return values
class TestBlock(unittest.TestCase):
def test_main_success(self):
block_result = __main__(**data)
self.assertIsInstance(block_result, dict, "Result should be a dictionary.")
self.assertIn("hd_score", block_result, "Result dictionary should contain 'hd_score' if success.")
def test_s4_sets_review_application_when_no_decline(self):
block_result = __main__(**score_data(hd_score_s4=1191))
self.assertEqual(block_result["hd_score"], 1191)
self.assertEqual(block_result["recommended_action"], "Review Application")
self.assertEqual(block_result["hd_action_reasoncode"], "S4 Velocity Triggered")
self.assertIn("Review Application", block_result["description"])
def test_s4_review_can_use_top_of_reserved_band(self):
block_result = __main__(**score_data(hd_score_s4=1200))
self.assertEqual(block_result["hd_score"], 1200)
self.assertEqual(block_result["recommended_action"], "Review Application")
def test_decline_precedence_beats_s4_review(self):
block_result = __main__(**score_data(hd_score_s1=1225, hd_score_s4=1200))
self.assertEqual(block_result["hd_score"], 1225)
self.assertEqual(block_result["recommended_action"], "Decline Application")
self.assertEqual(block_result["hd_action_reasoncode"], "S1 Triggered")
def test_non_s4_pass_score_in_reserved_band_shifts_down(self):
block_result = __main__(**score_data(hd_score_m1=1195, hd_score_s4=0))
self.assertEqual(block_result["hd_score"], 1185)
self.assertEqual(block_result["recommended_action"], "Pass Application")
self.assertEqual(block_result["hd_action_reasoncode"], "Pass")
def test_non_s4_pass_score_below_reserved_band_stays_same(self):
block_result = __main__(**score_data(hd_score_m1=1190, hd_score_s4=0))
self.assertEqual(block_result["hd_score"], 1190)
self.assertEqual(block_result["recommended_action"], "Pass Application")
def test_request_schema_adds_s4_and_response_schema_is_unchanged(self):
request_schema = json.loads((BASE_DIR / "request_schema.json").read_text())
response_schema = json.loads((BASE_DIR / "response_schema.json").read_text())
self.assertEqual(request_schema["properties"]["hd_score_s4"]["type"], ["number", "null"])
self.assertEqual(
set(response_schema["properties"]),
{"hd_score", "recommended_action", "description", "hd_action_reasoncode"},
)
if __name__ == "__main__":
unittest.main()