diff --git a/flow_hybrid_expected.py b/flow_hybrid_expected.py deleted file mode 100644 index 83f49ec..0000000 --- a/flow_hybrid_expected.py +++ /dev/null @@ -1,261 +0,0 @@ -import temporalio.workflow -from typing import Any, Dict, List, Callable, Awaitable -import logging -import asyncio -import json -import datetime -import re -import jmespath - -# Configure logging -logging.basicConfig(level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s") -logger = logging.getLogger(__name__) - -@temporalio.workflow.defn -class test_repo_test_branch: - @temporalio.workflow.run - async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: - workflow_info = temporalio.workflow.info() - workflow_output: Dict[str, Any] = { - "workflow_id": workflow_info.workflow_id, - "run_id": workflow_info.run_id, - "name": "test_repo_test_branch_1234567890", - "status": "in_progress", - "blocks": [], - "root_input": root_inputs - } - try: - # Initialize results - results: Dict[str, Any] = {} - - # Define task functions - task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} - async def task_2(): - node_id = "2" - block_name = "addition" - # Prepare inputs - input_params: Dict[str, Any] = {} - try: - jsonpath_expr = jmespath.compile("a") - value = jsonpath_expr.search(root_inputs) - input_params["a"] = value - except Exception as e: - logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") - input_params["a"] = None - try: - jsonpath_expr = jmespath.compile("b") - value = jsonpath_expr.search(root_inputs) - input_params["b"] = value - except Exception as e: - logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") - input_params["b"] = None - logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) - try: - # Convert timeouts and intervals from milliseconds to seconds - schedule_to_close_timeout_value_ms = 0 - start_to_close_timeout_value_ms = 0 - schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) - start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) - initial_interval_value_ms = 1000 - maximum_interval_value_ms = 100000 - initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) - maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) - maximum_attempts_value = 0 - maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value - - result = await temporalio.workflow.execute_activity( - "block_main_activity", - input_params, - schedule_to_close_timeout=schedule_to_close_timeout, - start_to_close_timeout=start_to_close_timeout, - task_queue="blocks_transformer_addition_97ec9e0d50", - retry_policy=temporalio.common.RetryPolicy( - maximum_attempts=maximum_attempts, - initial_interval=initial_interval, - backoff_coefficient=2, - maximum_interval=maximum_interval - ) - ) - logger.info(f"Completed 'addition' activity with result: %s", result) - block_status = "completed" - block_error = None - results[node_id] = result - except Exception as e: - logger.error(f"Activity 'addition' failed with error: {e}") - result = None - block_status = "failed" - block_error = { - "code": type(e).__name__, - "description": str(e), - "details": {"cause": str(getattr(e, "cause", "No additional details"))} - } - workflow_output["status"] = "failed" - # Collect block output - workflow_output["blocks"].append({ - "activity_id": node_id, - "name": block_name, - "status": block_status, - "input": input_params, - "result": result, - "error": block_error - }) - - task_functions["2"] = task_2 - async def task_m3aiq7ixuo6du35h8tr(): - node_id = "m3aiq7ixuo6du35h8tr" - block_name = "multiply" - # Prepare inputs - input_params: Dict[str, Any] = {} - try: - source_data = results.get("2", {}) - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(source_data) - input_params["sum"] = value - except Exception as e: - logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") - input_params["sum"] = None - logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) - try: - # Convert timeouts and intervals from milliseconds to seconds - schedule_to_close_timeout_value_ms = 0 - start_to_close_timeout_value_ms = 0 - schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) - start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) - initial_interval_value_ms = 1000 - maximum_interval_value_ms = 100000 - initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) - maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) - maximum_attempts_value = 0 - maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value - - result = await temporalio.workflow.execute_activity( - "block_main_activity", - input_params, - schedule_to_close_timeout=schedule_to_close_timeout, - start_to_close_timeout=start_to_close_timeout, - task_queue="blocks_transformer_multiply_db086f09c9", - retry_policy=temporalio.common.RetryPolicy( - maximum_attempts=maximum_attempts, - initial_interval=initial_interval, - backoff_coefficient=2, - maximum_interval=maximum_interval - ) - ) - logger.info(f"Completed 'multiply' activity with result: %s", result) - block_status = "completed" - block_error = None - results[node_id] = result - except Exception as e: - logger.error(f"Activity 'multiply' failed with error: {e}") - result = None - block_status = "failed" - block_error = { - "code": type(e).__name__, - "description": str(e), - "details": {"cause": str(getattr(e, "cause", "No additional details"))} - } - workflow_output["status"] = "failed" - # Collect block output - workflow_output["blocks"].append({ - "activity_id": node_id, - "name": block_name, - "status": block_status, - "input": input_params, - "result": result, - "error": block_error - }) - - task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr - async def task_m3aiqkrv4k1y6654ymr(): - node_id = "m3aiqkrv4k1y6654ymr" - block_name = "power" - # Prepare inputs - input_params: Dict[str, Any] = {} - try: - source_data = results.get("2", {}) - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(source_data) - input_params["product"] = value - except Exception as e: - logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}") - input_params["product"] = None - logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) - try: - # Convert timeouts and intervals from milliseconds to seconds - schedule_to_close_timeout_value_ms = 0 - start_to_close_timeout_value_ms = 0 - schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) - start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) - initial_interval_value_ms = 1000 - maximum_interval_value_ms = 100000 - initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) - maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) - maximum_attempts_value = 0 - maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value - - result = await temporalio.workflow.execute_activity( - "block_main_activity", - input_params, - schedule_to_close_timeout=schedule_to_close_timeout, - start_to_close_timeout=start_to_close_timeout, - task_queue="blocks_transformer_power_057645be0c", - retry_policy=temporalio.common.RetryPolicy( - maximum_attempts=maximum_attempts, - initial_interval=initial_interval, - backoff_coefficient=2, - maximum_interval=maximum_interval - ) - ) - logger.info(f"Completed 'power' activity with result: %s", result) - block_status = "completed" - block_error = None - results[node_id] = result - except Exception as e: - logger.error(f"Activity 'power' failed with error: {e}") - result = None - block_status = "failed" - block_error = { - "code": type(e).__name__, - "description": str(e), - "details": {"cause": str(getattr(e, "cause", "No additional details"))} - } - workflow_output["status"] = "failed" - # Collect block output - workflow_output["blocks"].append({ - "activity_id": node_id, - "name": block_name, - "status": block_status, - "input": input_params, - "result": result, - "error": block_error - }) - - task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr - - # Execute tasks according to execution steps - # Execution step 1 - tasks = [task_functions[node_id]() for node_id in ['2']] - results_step = await asyncio.gather(*tasks, return_exceptions=True) - for result in results_step: - if isinstance(result, Exception): - logger.error(f"Task failed with exception: {result}") - workflow_output["status"] = "failed" - # Execution step 2 - tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']] - results_step = await asyncio.gather(*tasks, return_exceptions=True) - for result in results_step: - if isinstance(result, Exception): - logger.error(f"Task failed with exception: {result}") - workflow_output["status"] = "failed" - - # Update workflow status to completed if not failed - if workflow_output["status"] != "failed": - workflow_output["status"] = "completed" - - return workflow_output - - except Exception as e: - logger.error(f"Workflow failed with error: {e}") - workflow_output["status"] = "failed" - raise \ No newline at end of file