From 41ee4aacf3c136c404ee70863d17ced5a1bfa10e Mon Sep 17 00:00:00 2001 From: Ankur Malik Date: Wed, 20 May 2026 13:17:42 -0400 Subject: [PATCH] Add IP velocity S4 rule --- block.py | 18 +++++++-- request_schema.json | 24 ++++++++++++ response_schema.json | 5 ++- rules_processing.py | 54 +++++++++++++++++++++++++- test_block.py | 92 ++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 185 insertions(+), 8 deletions(-) diff --git a/block.py b/block.py index 1bd4de3..a1f8426 100644 --- a/block.py +++ b/block.py @@ -17,7 +17,13 @@ def __main__( rejected_app_count: float, app_dt_day_cnt: int, hd_score_iso_m2: float, - hd_score_g2: float + hd_score_g2: float, + application_customer_type: str = None, + input_ip_address: str = None, + input_ip_connection_type: str = None, + input_ip_isp: str = None, + input_ip_distinct_ssn_24h: float = None, + input_ip_distinct_zip_24h: float = None ) -> dict: # Create a dictionary instead of using pandas DataFrame data = { @@ -29,10 +35,16 @@ def __main__( "rejected_app_count": rejected_app_count, "app_dt_day_cnt": app_dt_day_cnt, "hd_score_iso_m2": hd_score_iso_m2, - "hd_score_g2": hd_score_g2 + "hd_score_g2": hd_score_g2, + "application_customer_type": application_customer_type, + "input_ip_address": input_ip_address, + "input_ip_connection_type": input_ip_connection_type, + "input_ip_isp": input_ip_isp, + "input_ip_distinct_ssn_24h": input_ip_distinct_ssn_24h, + "input_ip_distinct_zip_24h": input_ip_distinct_zip_24h } final = processing(data) logger.info(f"scores of application: {final}") - return final \ No newline at end of file + return final diff --git a/request_schema.json b/request_schema.json index 77de915..70074cd 100644 --- a/request_schema.json +++ b/request_schema.json @@ -37,6 +37,30 @@ "hd_score_iso_m2": { "type": ["number", "null"], "description": "HD fraud Score M2" + }, + "application_customer_type": { + "type": ["string", "null"], + "description": "Application customer type for S4 velocity rule." + }, + "input_ip_address": { + "type": ["string", "null"], + "description": "Current application input IP address for S4 velocity rule." + }, + "input_ip_connection_type": { + "type": ["string", "null"], + "description": "Current application input IP connection type for S4 velocity rule." + }, + "input_ip_isp": { + "type": ["string", "null"], + "description": "Current application input IP ISP for S4 velocity rule." + }, + "input_ip_distinct_ssn_24h": { + "type": ["number", "null"], + "description": "Distinct SSN count for the same input IP in the past 24 hours." + }, + "input_ip_distinct_zip_24h": { + "type": ["number", "null"], + "description": "Distinct ZIP count for the same input IP in the past 24 hours." } }, "required": [] diff --git a/response_schema.json b/response_schema.json index 09d11c1..e0558a6 100644 --- a/response_schema.json +++ b/response_schema.json @@ -26,6 +26,10 @@ "type": ["number", "null"], "description": "HD Fraud Score S3" }, + "hd_score_s4": { + "type": ["number", "null"], + "description": "HD Fraud Score S4" + }, "hd_score_iso_m2": { "type": ["number", "null"], "description": "HD Fraud Score M2" @@ -34,4 +38,3 @@ } - diff --git a/rules_processing.py b/rules_processing.py index d68dd75..99fd041 100644 --- a/rules_processing.py +++ b/rules_processing.py @@ -1,4 +1,5 @@ import logging +import re # Configure logging logging.basicConfig( @@ -7,6 +8,50 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +INVALID_INPUT_IP_VALUES = {"", "nan", "null", "none", "n/a", "0.0.0.0", "unknown"} +BLOCKED_CONNECTION_TYPES = {"mobile wireless", "tx"} +BLOCKED_ISP_PATTERN = re.compile( + r"cloudflare|fastly|icloud|private relay|google llc|amazon technologies|datacamp|microsoft" +) + + +def _normalize_string(value) -> str: + if value is None: + return "" + return str(value).strip().lower() + + +def _normalize_count(value) -> int: + if value is None: + return 0 + try: + return int(float(value)) + except (TypeError, ValueError): + return 0 + + +def _calculate_s4(data: dict) -> int: + customer_type = _normalize_string(data.get("application_customer_type")) + input_ip_address = _normalize_string(data.get("input_ip_address")) + connection_type = _normalize_string(data.get("input_ip_connection_type")) + isp = _normalize_string(data.get("input_ip_isp")) + distinct_ssn = _normalize_count(data.get("input_ip_distinct_ssn_24h")) + distinct_zip = _normalize_count(data.get("input_ip_distinct_zip_24h")) + + if customer_type != "direct new": + return 0 + if input_ip_address in INVALID_INPUT_IP_VALUES: + return 0 + if connection_type in BLOCKED_CONNECTION_TYPES: + return 0 + if BLOCKED_ISP_PATTERN.search(isp): + return 0 + if distinct_ssn < 3 or distinct_zip < 2: + return 0 + + return min(1200, 1191 + (distinct_ssn - 3) + (distinct_zip - 2)) + + def processing(data: dict) -> dict: try: hd_score_s1 = ( @@ -42,6 +87,13 @@ def processing(data: dict) -> dict: logger.error(f"Error processing score_s3 calculations: {e}") return {} + try: + hd_score_s4 = _calculate_s4(data) + logger.info(f"score_s4 calculated: {hd_score_s4}") + except Exception as e: + logger.error(f"Error processing score_s4 calculations: {e}") + return {} + # Return the final results as a dictionary return { "hd_score_m1": data["hd_score_m1"], @@ -49,8 +101,8 @@ def processing(data: dict) -> dict: "hd_score_s1": hd_score_s1, "hd_score_s2": hd_score_s2, "hd_score_s3": hd_score_s3, + "hd_score_s4": hd_score_s4, "hd_score_iso_m2": data["hd_score_iso_m2"], "hd_score_g2": data["hd_score_g2"] } - diff --git a/test_block.py b/test_block.py index ae99298..95e922f 100644 --- a/test_block.py +++ b/test_block.py @@ -1,16 +1,102 @@ +import json import unittest +from pathlib import Path from block import __main__ -data = {'hd_score_m1': 1093.0, 'hd_score_g1': 0.0, 'cluster_size_users_v2': 1.0, 'target_connected_30_sum': 0.0, 'email_cnt': 1.0, 'rejected_app_count': 0.0, 'app_dt_day_cnt': 1.0, 'hd_score_iso_m2': 1001.0, "hd_score_g2": 0.0} +BASE_DIR = Path(__file__).resolve().parent + +data = { + "hd_score_m1": 1093.0, + "hd_score_g1": 0.0, + "cluster_size_users_v2": 1.0, + "target_connected_30_sum": 0.0, + "email_cnt": 1.0, + "rejected_app_count": 0.0, + "app_dt_day_cnt": 1.0, + "hd_score_iso_m2": 1001.0, + "hd_score_g2": 0.0 +} + + +def s4_data(**overrides): + values = { + **data, + "application_customer_type": "Direct New", + "input_ip_address": "203.0.113.10", + "input_ip_connection_type": "broadband", + "input_ip_isp": "local isp", + "input_ip_distinct_ssn_24h": 3, + "input_ip_distinct_zip_24h": 2, + } + values.update(overrides) + return values + class TestBlock(unittest.TestCase): def test_main_success(self): - blockResult = __main__(**data) + blockResult = __main__(**data) - # breakpoint() self.assertIsInstance(blockResult, dict, "Result should be a dictionary.") self.assertIn("hd_score_s1", blockResult, "Result dictionary should contain 'hd_score_s1' if success.") + def test_s4_triggers_for_direct_new_valid_ip_velocity(self): + block_result = __main__(**s4_data()) + + self.assertEqual(block_result["hd_score_s4"], 1191) + + def test_s4_score_caps_at_1200(self): + block_result = __main__(**s4_data( + input_ip_distinct_ssn_24h=30, + input_ip_distinct_zip_24h=30, + )) + + self.assertEqual(block_result["hd_score_s4"], 1200) + + def test_s4_requires_direct_new(self): + block_result = __main__(**s4_data(application_customer_type="Returning")) + + self.assertEqual(block_result["hd_score_s4"], 0) + + def test_s4_rejects_invalid_ip(self): + block_result = __main__(**s4_data(input_ip_address="0.0.0.0")) + + self.assertEqual(block_result["hd_score_s4"], 0) + + def test_s4_rejects_blocked_connection_type(self): + block_result = __main__(**s4_data(input_ip_connection_type="Mobile Wireless")) + + self.assertEqual(block_result["hd_score_s4"], 0) + + def test_s4_rejects_blocked_isp(self): + block_result = __main__(**s4_data(input_ip_isp="Cloudflare Inc.")) + + self.assertEqual(block_result["hd_score_s4"], 0) + + def test_s4_requires_distinct_ssn_and_zip_thresholds(self): + block_result = __main__(**s4_data(input_ip_distinct_zip_24h=1)) + + self.assertEqual(block_result["hd_score_s4"], 0) + + def test_s4_schema_contract(self): + request_schema = json.loads((BASE_DIR / "request_schema.json").read_text()) + response_schema = json.loads((BASE_DIR / "response_schema.json").read_text()) + + for field in ( + "application_customer_type", + "input_ip_address", + "input_ip_connection_type", + "input_ip_isp", + ): + self.assertEqual(request_schema["properties"][field]["type"], ["string", "null"]) + + for field in ( + "input_ip_distinct_ssn_24h", + "input_ip_distinct_zip_24h", + ): + self.assertEqual(request_schema["properties"][field]["type"], ["number", "null"]) + + self.assertEqual(response_schema["properties"]["hd_score_s4"]["type"], ["number", "null"]) + if __name__ == "__main__": unittest.main()