From e70a472e6473657c4c622b9ee5b933a57ad993c1 Mon Sep 17 00:00:00 2001 From: admin user Date: Tue, 25 Mar 2025 20:03:16 +0000 Subject: [PATCH] Upload files to "/" --- flow_hybrid_expected.py | 261 +++++++++++++++++++++++++++++++++++ flow_parallel_expected.py | 252 +++++++++++++++++++++++++++++++++ flow_sequential_expected.py | 268 ++++++++++++++++++++++++++++++++++++ 3 files changed, 781 insertions(+) create mode 100644 flow_hybrid_expected.py create mode 100644 flow_parallel_expected.py create mode 100644 flow_sequential_expected.py diff --git a/flow_hybrid_expected.py b/flow_hybrid_expected.py new file mode 100644 index 0000000..83f49ec --- /dev/null +++ b/flow_hybrid_expected.py @@ -0,0 +1,261 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch_1234567890", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 2 + tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise \ No newline at end of file diff --git a/flow_parallel_expected.py b/flow_parallel_expected.py new file mode 100644 index 0000000..495ba81 --- /dev/null +++ b/flow_parallel_expected.py @@ -0,0 +1,252 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch_1234567890", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(root_inputs) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(root_inputs) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise \ No newline at end of file diff --git a/flow_sequential_expected.py b/flow_sequential_expected.py new file mode 100644 index 0000000..66d67e7 --- /dev/null +++ b/flow_sequential_expected.py @@ -0,0 +1,268 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch_1234567890", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("m3aiq7ixuo6du35h8tr", {}) + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(source_data) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 2 + tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 3 + tasks = [task_functions[node_id]() for node_id in ['m3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise \ No newline at end of file