Delete flow_parallel_expected.py
This commit is contained in:
parent
d28ab55696
commit
106a2f2f44
@ -1,252 +0,0 @@
|
||||
import temporalio.workflow
|
||||
from typing import Any, Dict, List, Callable, Awaitable
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import datetime
|
||||
import re
|
||||
import jmespath
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@temporalio.workflow.defn
|
||||
class test_repo_test_branch:
|
||||
@temporalio.workflow.run
|
||||
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
workflow_info = temporalio.workflow.info()
|
||||
workflow_output: Dict[str, Any] = {
|
||||
"workflow_id": workflow_info.workflow_id,
|
||||
"run_id": workflow_info.run_id,
|
||||
"name": "test_repo_test_branch_1234567890",
|
||||
"status": "in_progress",
|
||||
"blocks": [],
|
||||
"root_input": root_inputs
|
||||
}
|
||||
try:
|
||||
# Initialize results
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
# Define task functions
|
||||
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||
async def task_2():
|
||||
node_id = "2"
|
||||
block_name = "addition"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("a")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["a"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||
input_params["a"] = None
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("b")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["b"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||
input_params["b"] = None
|
||||
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["2"] = task_2
|
||||
async def task_m3aiq7ixuo6du35h8tr():
|
||||
node_id = "m3aiq7ixuo6du35h8tr"
|
||||
block_name = "multiply"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("sum")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["sum"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}")
|
||||
input_params["sum"] = None
|
||||
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||
async def task_m3aiqkrv4k1y6654ymr():
|
||||
node_id = "m3aiqkrv4k1y6654ymr"
|
||||
block_name = "power"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("product")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["product"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}")
|
||||
input_params["product"] = None
|
||||
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_power_057645be0c",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'power' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||
|
||||
# Execute tasks according to execution steps
|
||||
# Execution step 1
|
||||
tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
|
||||
# Update workflow status to completed if not failed
|
||||
if workflow_output["status"] != "failed":
|
||||
workflow_output["status"] = "completed"
|
||||
|
||||
return workflow_output
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Workflow failed with error: {e}")
|
||||
workflow_output["status"] = "failed"
|
||||
raise
|
||||
Loading…
x
Reference in New Issue
Block a user