Add input IP parsing
All checks were successful
Build and Push Docker Image / test (push) Successful in 3m14s
Build and Push Docker Image / build_and_push (push) Successful in 5m57s

This commit is contained in:
Ankur Malik 2026-05-20 13:05:44 -04:00
parent 67c2174ab3
commit d4b8ad0b8c
5 changed files with 37 additions and 1 deletions

View File

@ -175,8 +175,10 @@ expressions = {
"proxy_ip_longitude": ["Blob.proxy_ip_longitude"],
"dns_ip_latitude": ["Blob.dns_ip_latitude"],
"dns_ip_longitude": ["Blob.dns_ip_longitude"],
"input_ip_address": ["Blob.input_ip_address"],
"input_ip_latitude": ["Blob.input_ip_latitude"],
"input_ip_longitude": ["Blob.input_ip_longitude"],
"input_ip_isp": ["Blob.input_ip_isp"],
# First-seen timestamps for age deltas
"digital_id_first_seen": ["Blob.digital_id_first_seen"],
"account_email_first_seen": ["Blob.account_email_first_seen"],
@ -227,6 +229,7 @@ def __main__(
DeviceId: str,
FuzzyDeviceId: str,
ReasonCode: str,
input_ip_address: str = None,
) -> dict:
# Convert input parameters into a flat dictionary
data = {
@ -255,6 +258,7 @@ def __main__(
"DeviceId": DeviceId,
"FuzzyDeviceId": FuzzyDeviceId,
"ReasonCode": ReasonCode,
"input_ip_address": input_ip_address,
}
# Convert dictionary to a single-row DataFrame
@ -283,7 +287,11 @@ def __main__(
return coalesce(*values)
extracted = combined_df["blob"].apply(_extract_with_fallback)
if column in combined_df.columns:
if column == "input_ip_address" and column in combined_df.columns:
existing = combined_df[column]
has_existing = existing.notnull() & (existing.astype(str).str.strip() != "")
combined_df[column] = existing.where(has_existing, extracted)
elif column in combined_df.columns:
combined_df[column] = extracted.where(extracted.notnull(), combined_df[column])
else:
combined_df[column] = extracted
@ -292,6 +300,8 @@ def __main__(
# logger.info(combined_df.iloc[0].drop("blob").to_dict())
else:
for column in expressions:
if column == "input_ip_address" and column in combined_df.columns:
continue
combined_df[column] = None
# logger.info("pre_flowx data")
# logger.info(combined_df.iloc[0].to_dict())

View File

@ -29,6 +29,9 @@ THX_FIELDS = [
"account_email_attributes",
"tps_ip_latitude",
"tps_ip_longitude",
"input_ip_address",
"input_ip_connection_type",
"input_ip_isp",
]
# Hardcoded M2 data dictionary (replaces file lookup)

View File

@ -101,6 +101,10 @@
"ReasonCode": {
"type": ["string", "null"],
"description": "Reason code from ThreatMetrix."
},
"input_ip_address": {
"type": ["string", "null"],
"description": "Input IP address from ThreatMetrix."
}
},
"required": []

View File

@ -74,6 +74,18 @@
"type": ["string", "null"],
"description": "TPS IP longitude"
},
"input_ip_address": {
"type": ["string", "null"],
"description": "Input IP address from ThreatMetrix"
},
"input_ip_connection_type": {
"type": ["string", "null"],
"description": "Input IP connection type from ThreatMetrix"
},
"input_ip_isp": {
"type": ["string", "null"],
"description": "Input IP ISP from ThreatMetrix"
},
"action": {
"type": ["string", "null"],
"description": "Recommended Action."

View File

@ -57,11 +57,18 @@ class TestBlock(unittest.TestCase):
"hd_score_iso_m2"
]:
self.assertIn(key, result)
self.assertEqual(result["input_ip_address"], "2600:387:15:5211::a")
self.assertEqual(result["input_ip_connection_type"], "tx")
self.assertEqual(result["input_ip_isp"], "at&t enterprises llc")
# self.assertAlmostEqual(float(result["hd_score_m1"]), 1145.0, delta=1.0)
# self.assertAlmostEqual(float(result["hd_score_m2"]), 1182.0, delta=1.0)
# self.assertAlmostEqual(float(result["hd_score_iso_m2"]), 1061.0, delta=1.0)
# THX payload is flattened; no nested thx dict should remain
def test_top_level_input_ip_address_wins_over_blob(self):
result = __main__(**{**input_json, "input_ip_address": "top-level-ip"})
self.assertEqual(result["input_ip_address"], "top-level-ip")
if __name__ == "__main__":
unittest.main()