Compare commits

...

6 Commits

Author SHA1 Message Date
c3b127d5f4 Update block_wrapper.py
All checks were successful
CI Workflow / Testing the Block (push) Successful in 9s
CI Workflow / Containerize the Block (push) Successful in 46s
2025-11-27 00:37:14 +00:00
d3762e483b Update requirements.txt
All checks were successful
CI Workflow / Testing the Block (push) Successful in 7s
CI Workflow / Containerize the Block (push) Successful in 46s
2025-05-23 15:49:32 +00:00
175fca6e03 Update block_wrapper.py
All checks were successful
CI Workflow / Testing the Block (push) Successful in 6s
CI Workflow / Containerize the Block (push) Successful in 38s
2025-04-07 18:17:30 +00:00
f2d318832f Update block_wrapper.py
All checks were successful
CI Workflow / Testing the Block (push) Successful in 8s
CI Workflow / Containerize the Block (push) Successful in 48s
2025-04-07 17:55:23 +00:00
d6bd8e6abc Add .gitea/workflows/main.yaml
All checks were successful
CI Workflow / Testing the Block (push) Successful in 1m52s
CI Workflow / Containerize the Block (push) Successful in 2m28s
2025-03-11 00:44:30 +00:00
5eb04354a1 Upload files to "/" 2025-03-11 00:43:40 +00:00
6 changed files with 568 additions and 1 deletions

View File

@ -0,0 +1,65 @@
name: CI Workflow
on:
push:
branches:
- '*'
jobs:
test:
name: Testing the Block
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set Up Python Environment
uses: actions/setup-python@v4
with:
python-version: '3.10'
# - name: Install Python Dependencies
# run: |
# python -m pip install --upgrade pip
# pip install -r requirements.txt
# - name: Execute Unit Tests
# run: python -m unittest discover -s . -p 'test_*.py'
build_and_push:
name: Containerize the Block
runs-on: ubuntu-latest
needs: test
steps:
- name: Extract Repository Name
id: extract_repo
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
- name: Checkout Codebase
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Configure Docker Buildx
uses: docker/setup-buildx-action@v3
with:
config-inline: |
[registry."centurion-version-control.default.svc.cluster.local:3000"]
http = true
insecure = true
- name: Log In to Container Registry
uses: docker/login-action@v2
with:
registry: centurion-version-control.default.svc.cluster.local:3000
username: ${{ secrets.CI_USER }}
password: ${{ secrets.CI_USER_TOKEN }}
- name: Build and Push Docker Image to Registry
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: |
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Use Python slim image as base
FROM python:3.10-slim AS base
# Set up a directory for the application code
WORKDIR /app
# Copy only the requirements file initially for better caching
COPY requirements.txt .
# Install Workflow SDK and other dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY . .
# Set entrypoint for the worker
ENTRYPOINT ["python", "/app/block_wrapper.py"]

View File

@ -1 +1,7 @@
**Hello world!!!**
# Activity Block Wrapper
### Example Usage with Docker
1. **Build the Base Image**:
```bash
docker build -f Dockerfile.base -t activity_block_wrapper:latest .

280
block_wrapper.py Normal file
View File

@ -0,0 +1,280 @@
import importlib
import json
import asyncio
import logging
import os
import re
import sys
import requests
from temporalio import activity
from temporalio.exceptions import ApplicationError
from jsonschema import validate, ValidationError
from temporalio.client import Client
from temporalio.worker import Worker
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Automatically determine if in a test environment
IS_TEST_ENVIRONMENT = "unittest" in sys.modules
# Environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
SQLPAD_API_URL = os.getenv('SQLPAD_API_URL')
if not BRANCH_NAME or not COMMIT_ID or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
raise ValueError("Missing required environment variables.")
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize name function
def sanitize_name(name):
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
sanitized = re.sub(r'_+', '_', sanitized)
return sanitized.strip('_')
BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME
block_name_safe = sanitize_name(BLOCK_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
# Construct the task queue name
TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}"
# Load JSON schema
def load_schema(schema_path):
try:
with open(schema_path, 'r') as schema_file:
return json.load(schema_file)
except Exception as e:
logger.error("Failed to load schema from %s: %s", schema_path, e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Schema loading failed: {e}")
else:
raise ValueError(f"Schema loading failed: {e}")
# Validate input against request schema
def validate_input(input_data):
request_schema = load_schema("/app/request_schema.json")
try:
validate(instance=input_data, schema=request_schema)
logger.info("Input data validated successfully")
except ValidationError as e:
logger.error("Input validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Input validation error: {e}")
else:
raise ValueError(f"Input validation error: {e}")
# Validate output against response schema
def validate_output(output_data):
response_schema = load_schema("/app/response_schema.json")
try:
validate(instance=output_data, schema=response_schema)
logger.info("Output data validated successfully")
except ValidationError as e:
logger.error("Output validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Output validation error: {e}")
else:
raise ValueError(f"Output validation error: {e}")
# Get the connection ID from config.json
def get_connection_id(namespace):
response_schema = load_schema("/app/config.json")
for item in response_schema:
if item.get("namespace") == namespace:
logger.info("Got the connectionID")
return item.get("connectionId")
logger.error("Provided Namespace not found.")
raise ValueError(f"Namespace '{namespace}' not found")
# Read SQL file and replace placeholders
def construct_sql(input_data):
try:
with open("/app/main.sql", "r") as sql_file:
sql_template = sql_file.read()
for key, value in input_data.items():
placeholder = f"${key}"
# Decide the final text (replacement) for SQL:
if value is None:
replacement = "NULL"
elif isinstance(value, bool):
replacement = "TRUE" if value else "FALSE"
elif isinstance(value, (list, tuple)):
# Serialize list/tuple safely to avoid quote issues in SQL.
replacement = f"'{json.dumps(value)}'"
elif isinstance(value, str):
# Escape single quotes to keep SQL well-formed.
sanitized = value.replace("'", "''")
replacement = f"'{sanitized}'"
else:
replacement = str(value)
# Replace the placeholder with our final replacement string
sql_template = sql_template.replace(placeholder, replacement)
logger.info("SQL query constructed.")
return sql_template.strip()
except Exception as e:
logger.error("Error processing SQL template: %s", e)
raise ApplicationError(f"SQL template error: {e}")
def get_batch_results(batch_id, retry_interval=0.05, max_retries=5):
retries = 0
while retries < max_retries:
try:
response = requests.get(f"{SQLPAD_API_URL}/api/batches/{batch_id}")
response.raise_for_status()
batch_status = response.json()
status = batch_status.get("status")
if status in ["finished", "error"]:
statements = batch_status.get("statements", [])
if not statements:
raise ApplicationError("No statements found in batch response.")
statement = statements[0]
statement_id = statement.get("id")
error = statement.get("error")
columns = statement.get("columns", None)
sql_text = batch_status.get("batchText", "").strip().lower()
logger.info(f"statements: {statements}")
logger.info(f"error from batches result {error}, statement: {statement_id}, columns: {columns}")
if error:
raise ApplicationError(f"SQL execution failed: {error}")
# Check if query is WITH + SELECT or plain SELECT
is_select_query = sql_text.startswith("select") or (
sql_text.startswith("with") and "select" in sql_text
)
# Ensure SELECT queries always return columns
if is_select_query and not columns:
raise ApplicationError("SELECT query did not return columns, cannot process data.")
return status, statement_id, error, columns, is_select_query
time.sleep(retry_interval)
retries += 1
except requests.RequestException as e:
logger.error("Failed to fetch batch results: %s", e)
raise ApplicationError(f"Failed to fetch batch results: {e}")
raise ApplicationError("SQLPad batch execution timed out.")
# Execute SQL via SQLPad APIs
def execute_sqlpad_query(connection_id, sql_query):
payload = {
"connectionId": connection_id,
"name": "",
"batchText": sql_query,
"selectedText": ""
}
try:
# Step 1: Create batch
response = requests.post(f"{SQLPAD_API_URL}/api/batches", json=payload)
response.raise_for_status()
batch_response = response.json()
# Extract batch ID from the response
batch_id = batch_response.get("statements", [{}])[0].get("batchId")
logger.info(f"Batch ID from the batches API response {batch_id}")
if not batch_id:
raise ApplicationError("Batch ID not found in SQLPad response.")
# Step 2: Retrieve batch statement ID and determine if it's a SELECT query
status, statement_id, error, columns, is_select_query = get_batch_results(batch_id)
# If it's a non-SELECT query
if not is_select_query:
return {"status": status, "error": error}
# Step 3: Fetch statement results only for SELECT/CTE queries
result_response = requests.get(f"{SQLPAD_API_URL}/api/statements/{statement_id}/results")
result_response.raise_for_status()
result_data = result_response.json()
type_mapping = {
"number": float,
"string": str,
"date": str,
"boolean": bool,
"timestamp": str,
}
column_names_list = [col["name"] for col in columns]
column_types_list = [col["datatype"] for col in columns]
converted_data = [
[
type_mapping.get(dtype, str)(value) if value is not None else None
for dtype, value in zip(column_types_list, row)
]
for row in result_data
]
results_dict_list = [dict(zip(column_names_list, row)) for row in converted_data]
logger.info(f"results_dict_list: {results_dict_list}")
return {"results": results_dict_list}
except requests.RequestException as e:
logger.error("SQLPad API request failed: %s", e)
raise ApplicationError(f"SQLPad API request failed: {e}")
# Registering activity
@activity.defn
async def block_main_activity(input_data):
# Validate the input
validate_input(input_data)
try:
sql_query = construct_sql(input_data)
logger.info(f"constructed sql query: {sql_query}")
connection_id = get_connection_id(NAMESPACE)
if connection_id:
logger.info(f"connection id exists {connection_id}")
result = execute_sqlpad_query(connection_id, sql_query)
validate_output(result)
logger.info(f"final result for the query: {result}")
return result
else:
logger.error(f"connection id not exists, please add the connection id according to the namespace.")
raise ApplicationError("connection id not exists, please add the connection id according to the namespace.")
except Exception as e:
logger.error("Error executing query execution: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError("Error during query execution") from e
else:
raise RuntimeError("Error during query execution") from e
# Worker function
async def main():
try:
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[block_main_activity],
)
logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE)
await worker.run()
except Exception as e:
logger.critical("Worker failed to start: %s", e)
raise
if __name__ == "__main__":
asyncio.run(main())

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
temporalio==1.11.1
jsonschema==4.23.0
requests==2.32.3

196
test_block_wrapper.py Normal file
View File

@ -0,0 +1,196 @@
import unittest
from unittest.mock import patch, MagicMock, mock_open
import json
import asyncio
from jsonschema import ValidationError
import os
with patch.dict('os.environ', {
"REPO_NAME": "test_repo",
"BRANCH_NAME": "test_branch",
"VERSION": "test_version",
"NAMESPACE": "test_namespace",
"FLOWX_ENGINE_ADDRESS": "test_address"
}):
from block_wrapper import block_main_activity, validate_input, validate_output , construct_sql, get_connection_id
class TestBlockWrapper(unittest.TestCase):
def setUp(self):
# Mock schemas to use for testing
self.mock_request_schema = {
"type": "object",
"properties": {
"salary": {"type": "number"},
"department": {"type": "string"}
},
"required": ["salary", "department"]
}
self.mock_response_schema = {
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema",
"properties": {
"id": {
"type": "integer"
},
"first_name": {
"type": "string"
},
"last_name": {
"type": "string"
},
"email": {
"type": "string",
"format": "email"
},
"phone_number": {
"type": "string"
},
"hire_date": {
"type": "string",
"format": "date-time"
},
"job_title": {
"type": "string"
},
"salary": {
"type": "number"
},
"department": {
"type": "string"
}
},
"required": ["id","first_name","last_name","email","phone_number","hire_date","job_title","salary","department"]
}
self.mock_config_schema = [
{
"namespace": "staging",
"connectionId": "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b"
},
{
"namespace": "production",
"connectionId": "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu"
}
]
self.mock_main_sql = "SELECT * FROM public.employee WHERE salary=$salary and department=$department;"
# Mock the contents of request_schema.json and response_schema.json using different patchers
self.mock_open_request = mock_open(read_data=json.dumps(self.mock_request_schema))
self.mock_open_response = mock_open(read_data=json.dumps(self.mock_response_schema))
self.mock_open_config = mock_open(read_data=json.dumps(self.mock_config_schema))
self.mock_open_main_sql = mock_open(read_data=self.mock_main_sql)
self.open_main_sql_patcher = patch("builtins.open", self.mock_open_main_sql)
self.open_main_sql_patcher.start()
# Mock load_block_main to return a mock main function
self.load_block_main_patcher = patch("block_wrapper.execute_sqlpad_query", return_value=MagicMock(return_value={"id": 4, "first_name": "Bob", "last_name": "Brown", "email": "bob.brown@example.com", "phone_number": "444-222-1111", "hire_date": "2020-07-25", "job_title": "Marketing Specialist", "salary": 60000.00, "department": "Marketing"}))
self.mock_load_block_main = self.load_block_main_patcher.start()
def tearDown(self):
# Stop all patches
self.load_block_main_patcher.stop()
@patch("block_wrapper.load_schema")
def test_validate_input_success(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"salary": 20000.0, "department": "Marketing"}
validate_input(input_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_input_failure(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"salary": 20000.00} # Missing 'department'
with self.assertRaises(ValueError):
validate_input(input_data)
@patch("block_wrapper.load_schema")
def test_validate_output_success(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
output_data = {"id": 4, "first_name": "Bob", "last_name": "Brown", "email": "bob.brown@example.com", "phone_number": "444-222-1111", "hire_date": "2020-07-25", "job_title": "Marketing Specialist", "salary": 60000.00, "department": "Marketing"}
validate_output(output_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_output_failure(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
output_data = {"id": 4, "first_name": "Bob", "last_name": "Brown", "email": "bob.brown@example.com", "phone_number": "444-222-1111", "hire_date": "2020-07-25", "job_title": "Marketing Specialist", "salary": 60000.00} # Missing 'department'
with self.assertRaises(ValueError):
validate_output(output_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_success(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
input_data = {"salary": 20000.0, "department": "Marketing"}
result = await block_main_activity(input_data)
self.assertEqual(result, {"id": 4, "first_name": "Bob", "last_name": "Brown", "email": "bob.brown@example.com", "phone_number": "444-222-1111", "hire_date": "2020-07-25", "job_title": "Marketing Specialist", "salary": 60000.00, "department": "Marketing"})
@patch("block_wrapper.load_schema")
async def test_block_main_activity_failure(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
# Cause an exception in main function
self.mock_load_block_main.side_effect = Exception("Unexpected error")
input_data = {"salary": 20000.0, "department": "Marketing"}
with self.assertRaises(RuntimeError):
await block_main_activity(input_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_input_validation_failure(self, mock_load_schema):
# Mock validate_input to raise ValidationError
with patch("block_wrapper.validate_input", side_effect=ValidationError("Invalid input")):
input_data = {"salary": 20000.00} # Missing 'department'
with self.assertRaises(ValueError):
await block_main_activity(input_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_output_validation_failure(self, mock_load_schema):
# Mock validate_output to raise ValidationError
with patch("block_wrapper.validate_output", side_effect=ValidationError("Invalid output")):
input_data = {"salary": 20000.0, "department": "Marketing"}
with self.assertRaises(ValueError):
await block_main_activity(input_data)
@patch.dict(os.environ, {"NAMESPACE": "staging"})
@patch("block_wrapper.load_schema")
def test_get_connection_id_staging(self, mock_load_schema):
"""Test fetching connectionId for 'staging' namespace"""
mock_load_schema.return_value = self.mock_config_schema
connection_id = get_connection_id(os.environ["NAMESPACE"])
self.assertEqual(connection_id, "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b")
@patch.dict(os.environ, {"NAMESPACE": "production"})
@patch("block_wrapper.load_schema")
def test_get_connection_id_production(self, mock_load_schema):
"""Test fetching connectionId for 'production' namespace"""
mock_load_schema.return_value = self.mock_config_schema
connection_id = get_connection_id(os.environ["NAMESPACE"])
self.assertEqual(connection_id, "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu")
@patch("block_wrapper.load_schema")
def test_get_connection_id_invalid_namespace(self, mock_load_schema):
"""Test handling of invalid namespace"""
mock_load_schema.return_value = self.mock_config_schema
with self.assertRaises(ValueError) as context:
get_connection_id("development")
self.assertIn("Namespace 'development' not found", str(context.exception))
@patch("block_wrapper.load_schema")
def test_valid_sql_replacement(self, mock_load_schema):
mock_load_schema.return_value = self.mock_main_sql
input_data = {"salary": 20000.0, "department": "Marketing"}
expected_sql = "SELECT * FROM public.employee WHERE salary=20000.0 and department='Marketing';"
result = construct_sql(input_data)
self.assertEqual(result, expected_sql)
if __name__ == "__main__":
unittest.main()