system/expected_workflows/flow_hybrid_expected.py
admin user 2e1d2795c8
Some checks failed
CI Workflow / Containerize the Flow (push) Has been cancelled
CI Workflow / Testing the Flow (push) Has been cancelled
Upload files to "expected_workflows"
2025-05-23 02:02:49 +00:00

263 lines
13 KiB
Python

import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.workflow import sleep
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class test_repo_test_branch:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
await sleep(0)
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "test_repo_test_branch",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
async def task_2():
node_id = "2"
block_name = "addition"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("a")
value = jsonpath_expr.search(root_inputs)
input_params["a"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
input_params["a"] = None
try:
jsonpath_expr = jmespath.compile("b")
value = jsonpath_expr.search(root_inputs)
input_params["b"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
input_params["b"] = None
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_addition_97ec9e0d50",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'addition' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'addition' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["2"] = task_2
async def task_m3aiq7ixuo6du35h8tr():
node_id = "m3aiq7ixuo6du35h8tr"
block_name = "multiply"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("2", {})
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(source_data)
input_params["sum"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
input_params["sum"] = None
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_multiply_db086f09c9",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'multiply' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'multiply' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
async def task_m3aiqkrv4k1y6654ymr():
node_id = "m3aiqkrv4k1y6654ymr"
block_name = "power"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("2", {})
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(source_data)
input_params["product"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}")
input_params["product"] = None
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_power_057645be0c",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'power' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'power' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
# Execute tasks according to execution steps
# Execution step 1
tasks = [task_functions[node_id]() for node_id in ['2']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Execution step 2
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise