blocks-transformer/graph_processing.py
Ankur Malik 9f3cb9ca4f
All checks were successful
Build and Push Docker Image / test (push) Successful in 25s
Build and Push Docker Image / build_and_push (push) Successful in 2m37s
Add G2 pipeline, models, and schema for g1_v1
2025-11-26 11:50:21 -05:00

62 lines
1.5 KiB
Python

import xgboost as xgb
import pandas as pd
import joblib
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def processing_g1(data):
df = pd.DataFrame([data])
if df.empty:
logger.error("Input DataFrame is empty.")
# Load Model
# model_path = "C:/Users/abinisha/habemco_flowx/g1_v1/xgboost_model_G1.joblib"
model_path = "./xgboost_model_G1.joblib"
model = joblib.load(model_path)
expected_features = model.feature_names
df = df.applymap(lambda x: float("nan") if x is None else x)
dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=float("nan"))
prediction = model.predict(dmatrix)
df["prediction"] = prediction
return df.iloc[0].to_dict()
def processing_g2(data):
df = pd.DataFrame([data])
if df.empty:
logger.error("Input DataFrame is empty.")
# model_path = "C:/Users/abinisha/habemco_flowx/g1_v1/xgboost_model_G2.joblib"
model_path = "./xgboost_model_G2.joblib"
model = joblib.load(model_path)
expected_features = model.feature_names
df = df.reindex(columns=expected_features)
df = df.applymap(lambda x: float("nan") if x is None else x)
dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=float("nan"))
prediction = model.predict(dmatrix)
df["prediction_g2"] = prediction
return df.iloc[0].to_dict()
# Backward compatibility alias
processing = processing_g1