Ankur Malik 9f3cb9ca4f
All checks were successful
Build and Push Docker Image / test (push) Successful in 25s
Build and Push Docker Image / build_and_push (push) Successful in 2m37s
Add G2 pipeline, models, and schema for g1_v1
2025-11-26 11:50:21 -05:00

45 lines
1.4 KiB
Python

import logging
from typing import List, Dict
from graph_pre_processing import pre_processing_g1, pre_processing_g2
from graph_processing import processing_g1, processing_g2
from graph_post_processing import post_processing_g1, post_processing_g2
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def __main__(results: List[Dict]) -> Dict:
logger.info("data receiving in g1v1 block: %s", results)
g1_input = pre_processing_g1(results)
g2_input = pre_processing_g2(results)
logger.info("pre_processed_data_g1: %s", g1_input)
logger.info("pre_processed_data_g2: %s", g2_input)
cluster_size = g1_input.get("cluster_size", 2)
if cluster_size is None:
cluster_size = 2
if cluster_size < 2:
g1_processed = {**g1_input, "prediction": 0}
g2_processed = {**g2_input, "prediction_g2": 0}
else:
g1_processed = processing_g1(g1_input)
g2_processed = processing_g2(g2_input)
logger.info("prediction_g1: %.8f", float(g1_processed.get("prediction", 0)))
logger.info("prediction_g2: %.8f", float(g2_processed.get("prediction_g2", 0)))
final_g1 = post_processing_g1(g1_processed)
final_g2 = post_processing_g2(g2_processed)
final = {**final_g1, **final_g2}
logger.info(final)
return final
# testing :
# __main__