Advanced Graph Series model
All checks were successful
Build and Push Docker Image / test (push) Successful in 50s
Build and Push Docker Image / build_and_push (push) Successful in 2m21s

This commit is contained in:
admin user 2025-03-12 16:13:29 +00:00
parent 4ca7b2486f
commit 62dfadde20
10 changed files with 214 additions and 23 deletions

View File

@ -1 +1,2 @@
**Hello world!!!** # G Series Block
Loads and score G 1 v1 model

View File

@ -1,21 +1,34 @@
@flowx_block import logging
def example_function(request: dict) -> dict: from typing import List, Dict
from graph_pre_processing import pre_processing
from graph_processing import processing
from graph_post_processing import post_processing
# Processing logic here... # Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
return {
"meta_info": [ def __main__(results: List[Dict]) -> List[Dict]:
{ logger.info(f"data receiving in g1v1 block: {results}")
"name": "created_date", data = pre_processing(results)
"type": "string", logger.info(f"pre_processed_data, new_user_app_data: {data}")
"value": "2024-11-05"
} # df = processing(data)
], if data.get("cluster_size", 2) < 2:
"fields": [ data["prediction"] = 0
{ else:
"name": "", data = processing(data)
"type": "", logger.info("prediction: %.8f", float(data['prediction']))
"value": ""
} # Post-processing: calculate the Final Score and update the dataframe.
] final = post_processing(data)
} logger.info(final)
return final
# testing :
# __main__

31
graph_post_processing.py Normal file
View File

@ -0,0 +1,31 @@
import logging
import math
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def post_processing(data):
try:
prediction = data.get("prediction", 0)
score_g1 = round(
min(prediction * 100 + 0.00001, 1) * 89 +
max(math.log2(prediction * 100 + 0.000001) * 193, 0),
0
)
data["hd_score_g1"] = score_g1
logger.info(f"score_g1 calculated: {score_g1}")
except Exception as e:
logger.error(f"Error processing score_g1 calculations: {e}")
return {
key: data.get(key, None)
for key in [
"hd_score_m1", "hd_score_g1", "cluster_size_users_v2",
"target_connected_30_sum", "email_cnt", "rejected_app_count",
"app_dt_day_cnt"
]
}

35
graph_pre_processing.py Normal file
View File

@ -0,0 +1,35 @@
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def pre_processing(results):
result = results[0]
dtypes = {
"hd_score_m1": float,
"cluster_size_users_v2": float,
"target_connected_30_sum": float,
"email_cnt": float,
"rejected_app_count": float,
"app_dt_day_cnt": float
}
data = {
"hd_score_m1": result["hd_score_m1"],
"cluster_size_users_v2": result["cluster_size_users_v2"],
"target_connected_30_sum": result["target_connected_30_sum"],
"email_cnt": result["email_cnt"],
"rejected_app_count": result["rejected_app_count"],
"app_dt_day_cnt": result["app_dt_day_cnt"],
"cluster_size": result["cluster_size"],
}
for col, dtype in dtypes.items():
if col in data:
value = str(data[col]).strip()
data[col] = dtype(value) if value.replace(".", "", 1).isdigit() else None
return data

34
graph_processing.py Normal file
View File

@ -0,0 +1,34 @@
import xgboost as xgb
import pandas as pd
import joblib
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def processing(data):
df = pd.DataFrame([data])
if df.empty:
logger.error("Input DataFrame is empty.")
# Load Model
model_path = "./xgboost_model.joblib"
# model_path ="C:/Users/abinisha/habemco_flowx/g1_v1/xgboost_model.joblib"
model = joblib.load(model_path)
expected_features = model.feature_names
df = df.applymap(lambda x: float('nan') if x is None else x)
dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=float('nan'))
prediction = model.predict(dmatrix)
df['prediction'] = prediction
return df.iloc[0].to_dict()

View File

@ -1 +1,11 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"properties": {
"results": {
"type": ["array", "null"],
"items": {"type": "object"}
}
},
"required": []
}

View File

@ -1 +1,5 @@
{} joblib==1.4.2
pandas==2.2.3
xgboost==2.1.3
typing==3.6.1

View File

@ -1 +1,37 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"hd_score_m1": {
"type": ["number", "null"],
"description": "HD fraud Score M1"
},
"hd_score_g1": {
"type": ["number", "null"],
"description": "HD fraud Score G1"
},
"cluster_size_users_v2": {
"type": ["number", "null"],
"description": "Size of the user cluster in version 2."
},
"target_connected_30_sum": {
"type": ["number", "null"],
"description": "Sum of target connections within 30 days."
},
"email_cnt": {
"type": ["number", "null"],
"description": "Count of emails associated with the application."
},
"rejected_app_count": {
"type": ["number", "null"],
"description": "Count of rejected applications for the applicant."
},
"app_dt_day_cnt": {
"type": ["number", "null"],
"description": "Number of application days counted."
}
}
}

27
test_block.py Normal file
View File

@ -0,0 +1,27 @@
import unittest
import pandas as pd
from block import __main__
data = [{
# "application_key": "0A123C7F-BE45-4912-8E22-0904707325E7",
"hd_score_m1": 1211,
"cluster_size_users_v2": 2,
"target_connected_30_sum": 0,
"email_cnt": 1,
"rejected_app_count": 2,
"app_dt_day_cnt": 2,
"cluster_size": 3
}]
class TestBlock(unittest.TestCase):
def test_main_success(self):
blockResult = __main__(data)
# breakpoint()
self.assertIsInstance(blockResult, dict, "Result should be a dictionary.")
self.assertIn("hd_score_g1", blockResult, "Result dictionary should contain 'hd_score_g1' if success.")
if __name__ == "__main__":
unittest.main()

BIN
xgboost_model.joblib Normal file

Binary file not shown.