Pass IP velocity context through g1
All checks were successful
Build and Push Docker Image / build_and_push (push) Successful in 4m55s
Build and Push Docker Image / test (push) Successful in 2m45s

This commit is contained in:
Ankur Malik 2026-05-20 13:11:44 -04:00
parent 9f3cb9ca4f
commit d7a7b3d761
3 changed files with 74 additions and 0 deletions

View File

@ -11,6 +11,15 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
VELOCITY_RULE_FIELDS = [
"application_customer_type",
"input_ip_address",
"input_ip_connection_type",
"input_ip_isp",
"input_ip_distinct_ssn_24h",
"input_ip_distinct_zip_24h",
]
def __main__(results: List[Dict]) -> Dict: def __main__(results: List[Dict]) -> Dict:
logger.info("data receiving in g1v1 block: %s", results) logger.info("data receiving in g1v1 block: %s", results)
@ -36,6 +45,8 @@ def __main__(results: List[Dict]) -> Dict:
final_g1 = post_processing_g1(g1_processed) final_g1 = post_processing_g1(g1_processed)
final_g2 = post_processing_g2(g2_processed) final_g2 = post_processing_g2(g2_processed)
final = {**final_g1, **final_g2} final = {**final_g1, **final_g2}
for field in VELOCITY_RULE_FIELDS:
final[field] = results[0].get(field)
logger.info(final) logger.info(final)
return final return final

View File

@ -37,6 +37,30 @@
"hd_score_iso_m2": { "hd_score_iso_m2": {
"type": ["number", "null"], "type": ["number", "null"],
"description": "HD fraud Score M2" "description": "HD fraud Score M2"
},
"application_customer_type": {
"type": ["string", "null"],
"description": "Application customer type for downstream rules."
},
"input_ip_address": {
"type": ["string", "null"],
"description": "Current application input IP address for downstream rules."
},
"input_ip_connection_type": {
"type": ["string", "null"],
"description": "Current application input IP connection type for downstream rules."
},
"input_ip_isp": {
"type": ["string", "null"],
"description": "Current application input IP ISP for downstream rules."
},
"input_ip_distinct_ssn_24h": {
"type": ["number", "null"],
"description": "Distinct SSN count for the same input IP in the past 24 hours."
},
"input_ip_distinct_zip_24h": {
"type": ["number", "null"],
"description": "Distinct ZIP count for the same input IP in the past 24 hours."
} }
} }
} }

View File

@ -1,7 +1,11 @@
import json
import unittest import unittest
from pathlib import Path
from block import __main__ from block import __main__
BASE_DIR = Path(__file__).resolve().parent
data = [{ data = [{
"application_key": "A3CDD39F-10F8-40B0-A4C9-0E1558B75131", "application_key": "A3CDD39F-10F8-40B0-A4C9-0E1558B75131",
"hd_score_m1": 1101.0, "hd_score_m1": 1101.0,
@ -42,6 +46,41 @@ class TestBlock(unittest.TestCase):
self.assertIn("hd_score_g1", block_result) self.assertIn("hd_score_g1", block_result)
self.assertIn("hd_score_g2", block_result) self.assertIn("hd_score_g2", block_result)
def test_main_passes_velocity_rule_fields_through(self):
row = dict(data[0])
row.update({
"cluster_size": 1,
"application_customer_type": "Direct New",
"input_ip_address": "203.0.113.10",
"input_ip_connection_type": "broadband",
"input_ip_isp": "local isp",
"input_ip_distinct_ssn_24h": 3,
"input_ip_distinct_zip_24h": 2,
})
block_result = __main__([row])
self.assertEqual(block_result["application_customer_type"], "Direct New")
self.assertEqual(block_result["input_ip_address"], "203.0.113.10")
self.assertEqual(block_result["input_ip_connection_type"], "broadband")
self.assertEqual(block_result["input_ip_isp"], "local isp")
self.assertEqual(block_result["input_ip_distinct_ssn_24h"], 3)
self.assertEqual(block_result["input_ip_distinct_zip_24h"], 2)
def test_response_schema_includes_velocity_rule_fields(self):
schema = json.loads((BASE_DIR / "response_schema.json").read_text())
expected_types = {
"application_customer_type": ["string", "null"],
"input_ip_address": ["string", "null"],
"input_ip_connection_type": ["string", "null"],
"input_ip_isp": ["string", "null"],
"input_ip_distinct_ssn_24h": ["number", "null"],
"input_ip_distinct_zip_24h": ["number", "null"],
}
for field, expected_type in expected_types.items():
self.assertEqual(schema["properties"][field]["type"], expected_type)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()