Add IP velocity S4 rule
All checks were successful
Build and Push Docker Image / test (push) Successful in 9s
Build and Push Docker Image / build_and_push (push) Successful in 19s

This commit is contained in:
Ankur Malik 2026-05-20 13:17:42 -04:00
parent 7aa04da902
commit 41ee4aacf3
5 changed files with 185 additions and 8 deletions

View File

@ -17,7 +17,13 @@ def __main__(
rejected_app_count: float, rejected_app_count: float,
app_dt_day_cnt: int, app_dt_day_cnt: int,
hd_score_iso_m2: float, hd_score_iso_m2: float,
hd_score_g2: float hd_score_g2: float,
application_customer_type: str = None,
input_ip_address: str = None,
input_ip_connection_type: str = None,
input_ip_isp: str = None,
input_ip_distinct_ssn_24h: float = None,
input_ip_distinct_zip_24h: float = None
) -> dict: ) -> dict:
# Create a dictionary instead of using pandas DataFrame # Create a dictionary instead of using pandas DataFrame
data = { data = {
@ -29,7 +35,13 @@ def __main__(
"rejected_app_count": rejected_app_count, "rejected_app_count": rejected_app_count,
"app_dt_day_cnt": app_dt_day_cnt, "app_dt_day_cnt": app_dt_day_cnt,
"hd_score_iso_m2": hd_score_iso_m2, "hd_score_iso_m2": hd_score_iso_m2,
"hd_score_g2": hd_score_g2 "hd_score_g2": hd_score_g2,
"application_customer_type": application_customer_type,
"input_ip_address": input_ip_address,
"input_ip_connection_type": input_ip_connection_type,
"input_ip_isp": input_ip_isp,
"input_ip_distinct_ssn_24h": input_ip_distinct_ssn_24h,
"input_ip_distinct_zip_24h": input_ip_distinct_zip_24h
} }
final = processing(data) final = processing(data)

View File

@ -37,6 +37,30 @@
"hd_score_iso_m2": { "hd_score_iso_m2": {
"type": ["number", "null"], "type": ["number", "null"],
"description": "HD fraud Score M2" "description": "HD fraud Score M2"
},
"application_customer_type": {
"type": ["string", "null"],
"description": "Application customer type for S4 velocity rule."
},
"input_ip_address": {
"type": ["string", "null"],
"description": "Current application input IP address for S4 velocity rule."
},
"input_ip_connection_type": {
"type": ["string", "null"],
"description": "Current application input IP connection type for S4 velocity rule."
},
"input_ip_isp": {
"type": ["string", "null"],
"description": "Current application input IP ISP for S4 velocity rule."
},
"input_ip_distinct_ssn_24h": {
"type": ["number", "null"],
"description": "Distinct SSN count for the same input IP in the past 24 hours."
},
"input_ip_distinct_zip_24h": {
"type": ["number", "null"],
"description": "Distinct ZIP count for the same input IP in the past 24 hours."
} }
}, },
"required": [] "required": []

View File

@ -26,6 +26,10 @@
"type": ["number", "null"], "type": ["number", "null"],
"description": "HD Fraud Score S3" "description": "HD Fraud Score S3"
}, },
"hd_score_s4": {
"type": ["number", "null"],
"description": "HD Fraud Score S4"
},
"hd_score_iso_m2": { "hd_score_iso_m2": {
"type": ["number", "null"], "type": ["number", "null"],
"description": "HD Fraud Score M2" "description": "HD Fraud Score M2"
@ -34,4 +38,3 @@
} }

View File

@ -1,4 +1,5 @@
import logging import logging
import re
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@ -7,6 +8,50 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
INVALID_INPUT_IP_VALUES = {"", "nan", "null", "none", "n/a", "0.0.0.0", "unknown"}
BLOCKED_CONNECTION_TYPES = {"mobile wireless", "tx"}
BLOCKED_ISP_PATTERN = re.compile(
r"cloudflare|fastly|icloud|private relay|google llc|amazon technologies|datacamp|microsoft"
)
def _normalize_string(value) -> str:
if value is None:
return ""
return str(value).strip().lower()
def _normalize_count(value) -> int:
if value is None:
return 0
try:
return int(float(value))
except (TypeError, ValueError):
return 0
def _calculate_s4(data: dict) -> int:
customer_type = _normalize_string(data.get("application_customer_type"))
input_ip_address = _normalize_string(data.get("input_ip_address"))
connection_type = _normalize_string(data.get("input_ip_connection_type"))
isp = _normalize_string(data.get("input_ip_isp"))
distinct_ssn = _normalize_count(data.get("input_ip_distinct_ssn_24h"))
distinct_zip = _normalize_count(data.get("input_ip_distinct_zip_24h"))
if customer_type != "direct new":
return 0
if input_ip_address in INVALID_INPUT_IP_VALUES:
return 0
if connection_type in BLOCKED_CONNECTION_TYPES:
return 0
if BLOCKED_ISP_PATTERN.search(isp):
return 0
if distinct_ssn < 3 or distinct_zip < 2:
return 0
return min(1200, 1191 + (distinct_ssn - 3) + (distinct_zip - 2))
def processing(data: dict) -> dict: def processing(data: dict) -> dict:
try: try:
hd_score_s1 = ( hd_score_s1 = (
@ -42,6 +87,13 @@ def processing(data: dict) -> dict:
logger.error(f"Error processing score_s3 calculations: {e}") logger.error(f"Error processing score_s3 calculations: {e}")
return {} return {}
try:
hd_score_s4 = _calculate_s4(data)
logger.info(f"score_s4 calculated: {hd_score_s4}")
except Exception as e:
logger.error(f"Error processing score_s4 calculations: {e}")
return {}
# Return the final results as a dictionary # Return the final results as a dictionary
return { return {
"hd_score_m1": data["hd_score_m1"], "hd_score_m1": data["hd_score_m1"],
@ -49,8 +101,8 @@ def processing(data: dict) -> dict:
"hd_score_s1": hd_score_s1, "hd_score_s1": hd_score_s1,
"hd_score_s2": hd_score_s2, "hd_score_s2": hd_score_s2,
"hd_score_s3": hd_score_s3, "hd_score_s3": hd_score_s3,
"hd_score_s4": hd_score_s4,
"hd_score_iso_m2": data["hd_score_iso_m2"], "hd_score_iso_m2": data["hd_score_iso_m2"],
"hd_score_g2": data["hd_score_g2"] "hd_score_g2": data["hd_score_g2"]
} }

View File

@ -1,16 +1,102 @@
import json
import unittest import unittest
from pathlib import Path
from block import __main__ from block import __main__
data = {'hd_score_m1': 1093.0, 'hd_score_g1': 0.0, 'cluster_size_users_v2': 1.0, 'target_connected_30_sum': 0.0, 'email_cnt': 1.0, 'rejected_app_count': 0.0, 'app_dt_day_cnt': 1.0, 'hd_score_iso_m2': 1001.0, "hd_score_g2": 0.0} BASE_DIR = Path(__file__).resolve().parent
data = {
"hd_score_m1": 1093.0,
"hd_score_g1": 0.0,
"cluster_size_users_v2": 1.0,
"target_connected_30_sum": 0.0,
"email_cnt": 1.0,
"rejected_app_count": 0.0,
"app_dt_day_cnt": 1.0,
"hd_score_iso_m2": 1001.0,
"hd_score_g2": 0.0
}
def s4_data(**overrides):
values = {
**data,
"application_customer_type": "Direct New",
"input_ip_address": "203.0.113.10",
"input_ip_connection_type": "broadband",
"input_ip_isp": "local isp",
"input_ip_distinct_ssn_24h": 3,
"input_ip_distinct_zip_24h": 2,
}
values.update(overrides)
return values
class TestBlock(unittest.TestCase): class TestBlock(unittest.TestCase):
def test_main_success(self): def test_main_success(self):
blockResult = __main__(**data) blockResult = __main__(**data)
# breakpoint()
self.assertIsInstance(blockResult, dict, "Result should be a dictionary.") self.assertIsInstance(blockResult, dict, "Result should be a dictionary.")
self.assertIn("hd_score_s1", blockResult, "Result dictionary should contain 'hd_score_s1' if success.") self.assertIn("hd_score_s1", blockResult, "Result dictionary should contain 'hd_score_s1' if success.")
def test_s4_triggers_for_direct_new_valid_ip_velocity(self):
block_result = __main__(**s4_data())
self.assertEqual(block_result["hd_score_s4"], 1191)
def test_s4_score_caps_at_1200(self):
block_result = __main__(**s4_data(
input_ip_distinct_ssn_24h=30,
input_ip_distinct_zip_24h=30,
))
self.assertEqual(block_result["hd_score_s4"], 1200)
def test_s4_requires_direct_new(self):
block_result = __main__(**s4_data(application_customer_type="Returning"))
self.assertEqual(block_result["hd_score_s4"], 0)
def test_s4_rejects_invalid_ip(self):
block_result = __main__(**s4_data(input_ip_address="0.0.0.0"))
self.assertEqual(block_result["hd_score_s4"], 0)
def test_s4_rejects_blocked_connection_type(self):
block_result = __main__(**s4_data(input_ip_connection_type="Mobile Wireless"))
self.assertEqual(block_result["hd_score_s4"], 0)
def test_s4_rejects_blocked_isp(self):
block_result = __main__(**s4_data(input_ip_isp="Cloudflare Inc."))
self.assertEqual(block_result["hd_score_s4"], 0)
def test_s4_requires_distinct_ssn_and_zip_thresholds(self):
block_result = __main__(**s4_data(input_ip_distinct_zip_24h=1))
self.assertEqual(block_result["hd_score_s4"], 0)
def test_s4_schema_contract(self):
request_schema = json.loads((BASE_DIR / "request_schema.json").read_text())
response_schema = json.loads((BASE_DIR / "response_schema.json").read_text())
for field in (
"application_customer_type",
"input_ip_address",
"input_ip_connection_type",
"input_ip_isp",
):
self.assertEqual(request_schema["properties"][field]["type"], ["string", "null"])
for field in (
"input_ip_distinct_ssn_24h",
"input_ip_distinct_zip_24h",
):
self.assertEqual(request_schema["properties"][field]["type"], ["number", "null"])
self.assertEqual(response_schema["properties"]["hd_score_s4"]["type"], ["number", "null"])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()