blocks-transformer/post_processing.py
admin user 6d94849e2d
All checks were successful
Build and Push Docker Image / test (push) Successful in 10s
Build and Push Docker Image / build_and_push (push) Successful in 20s
Graph lookup block
2025-03-12 16:14:28 +00:00

78 lines
2.5 KiB
Python

import requests
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
def fetch_data(url, record_id):
"""Fetches data for a given record_id from the API."""
try:
response = requests.get(f"{url}/{record_id}", headers={"Content-Type": "application/json"})
if response.status_code == 200:
output= response.json()
return output
else:
logger.error(f"Failed to fetch {record_id}: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"Error fetching {record_id}: {e}")
return None
def post_processing_duplicate(data, url):
"""Main function to fetch and extract required fields."""
record_id = data.get('record_id')
connected_records = data.get('connected_records', [])
# Fetch main record data
record_data = fetch_data(url, record_id)
application_key = None
hd_score_m1 = None
if record_data:
application_key = record_data['data'].get('application_key')
hd_score_m1 = record_data['data'].get('hd_score_m1')
# Fetch application_key for connected records
connected_keys = [
fetch_data(url, rec_id)['data'].get('application_key')
for rec_id in connected_records
if fetch_data(url, rec_id) and fetch_data(url, rec_id)['data'].get('application_key')
]
return {
"application_key": application_key,
"hd_score_m1": hd_score_m1,
"connected_application_keys": connected_keys
}
def post_processing(data, url):
"""Main function to fetch and extract required fields."""
record_id = data.get('hd_key')
connected_records = data.get('matches', []) + [record_id] # Matches + hd_key
# Fetch main record data
record_data = fetch_data(url, record_id)
application_key = None
hd_score_m1 = None
if record_data:
application_key = record_data['data'].get('application_key')
hd_score_m1 = record_data['data'].get('hd_score_m1')
# Fetch application_key for connected records
connected_keys = [
fetch_data(url, rec_id)['data'].get('application_key')
for rec_id in connected_records
if fetch_data(url, rec_id) and fetch_data(url, rec_id)['data'].get('application_key')
]
return {
"application_key": application_key,
"hd_score_m1": hd_score_m1,
"connected_application_keys": connected_keys
}