Advanced S Series Models/Rules
All checks were successful
Build and Push Docker Image / test (push) Successful in 1m43s
Build and Push Docker Image / build_and_push (push) Successful in 29s

This commit is contained in:
admin user 2025-03-12 16:15:38 +00:00
parent 4ca7b2486f
commit 54b5fecdb9
7 changed files with 170 additions and 23 deletions

View File

@ -1 +1 @@
**Hello world!!!** # S Series Model

View File

@ -1,21 +1,34 @@
@flowx_block import logging
def example_function(request: dict) -> dict: from rules_processing import processing
# Processing logic here... # Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
return { def __main__(
"meta_info": [ hd_score_m1: float,
{ hd_score_g1: float,
"name": "created_date", cluster_size_users_v2: int,
"type": "string", target_connected_30_sum: float,
"value": "2024-11-05" email_cnt: int,
} rejected_app_count: float,
], app_dt_day_cnt: int,
"fields": [ ) -> dict:
{ # Create a dictionary instead of using pandas DataFrame
"name": "", data = {
"type": "", "hd_score_m1": hd_score_m1,
"value": "" "hd_score_g1": hd_score_g1,
} "cluster_size_users_v2": cluster_size_users_v2,
] "target_connected_30_sum": target_connected_30_sum,
} "email_cnt": email_cnt,
"rejected_app_count": rejected_app_count,
"app_dt_day_cnt": app_dt_day_cnt,
}
final = processing(data)
logger.info(f"scores of application: {final}")
return final

View File

@ -1 +1,35 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"hd_score_m1": {
"type": ["number", "null"],
"description": "HD fraud Score M1"
},
"hd_score_g1": {
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"cluster_size_users_v2": {
"type": ["number", "null"],
"description": "Size of the user cluster in version 2."
},
"target_connected_30_sum": {
"type": ["number", "null"],
"description": "Sum of target connections within 30 days."
},
"email_cnt": {
"type": ["number", "null"],
"description": "Count of emails associated with the application."
},
"rejected_app_count": {
"type": ["number", "null"],
"description": "Count of rejected applications for the applicant."
},
"app_dt_day_cnt": {
"type": ["number", "null"],
"description": "Number of application days counted."
}
},
"required": []
}

View File

@ -1 +1,3 @@
{}

View File

@ -1 +1,29 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"hd_score_m1": {
"type": ["number", "null"],
"description": "HD Fraud Score M1"
},
"hd_score_g1": {
"type": ["number", "null"],
"description": "HD Fraud Score G1"
},
"hd_score_s1": {
"type": ["number", "null"],
"description": "HD Fraud Score S1"
},
"hd_score_s2": {
"type": ["number", "null"],
"description": "HD Fraud Score S2"
},
"hd_score_s3": {
"type": ["number", "null"],
"description": "HD Fraud Score S3"
}
}
}

54
rules_processing.py Normal file
View File

@ -0,0 +1,54 @@
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def processing(data: dict) -> dict:
try:
hd_score_s1 = (
min(1225 + (data["cluster_size_users_v2"] * 5), 1390)
if data["cluster_size_users_v2"] >= 3 and data["hd_score_m1"] >= 1140
else 0
)
logger.info(f"score_s1 calculated: {hd_score_s1}")
except Exception as e:
logger.error(f"Error processing score_s1 calculations: {e}")
return {}
try:
hd_score_s2 = (
min(1215 + (data["cluster_size_users_v2"] * 5), 1380)
if data["cluster_size_users_v2"] >= 2 and data["app_dt_day_cnt"] == 1
else 0
)
logger.info(f"score_s2 calculated: {hd_score_s2}")
except Exception as e:
logger.error(f"Error processing score_s2 calculations: {e}")
return {}
try:
target_connected_30_sum = data.get("target_connected_30_sum", 0) or 0 # Handling None case
hd_score_s3 = (
min(1250 + (target_connected_30_sum * 5), 1400)
if target_connected_30_sum >= 1
else 0
)
logger.info(f"score_s3 calculated: {hd_score_s3}")
except Exception as e:
logger.error(f"Error processing score_s3 calculations: {e}")
return {}
# Return the final results as a dictionary
return {
"hd_score_m1": data["hd_score_m1"],
"hd_score_g1": data["hd_score_g1"],
"hd_score_s1": hd_score_s1,
"hd_score_s2": hd_score_s2,
"hd_score_s3": hd_score_s3,
}

16
test_block.py Normal file
View File

@ -0,0 +1,16 @@
import unittest
from block import __main__
data = {'hd_score_m1': 1173.0, 'hd_score_g1': 1203.0, 'cluster_size_users_v2': 3.0, 'target_connected_30_sum': 0.0, 'email_cnt': 1.0, 'rejected_app_count': 2.0, 'app_dt_day_cnt': 3.0}
class TestBlock(unittest.TestCase):
def test_main_success(self):
blockResult = __main__(**data)
# breakpoint()
self.assertIsInstance(blockResult, dict, "Result should be a dictionary.")
self.assertIn("hd_score_s1", blockResult, "Result dictionary should contain 'hd_score_s1' if success.")
if __name__ == "__main__":
unittest.main()