import json
import time
import requests
from google.oauth2 import service_account
from google.auth.transport import requests as google_requests
from datetime import datetime, timezone
import uuid # For generating unique batch_id
# --- Configuration (UPDATE THESE VALUES) ---
SERVICE_ACCOUNT_KEY_FILE = 'path/to/your/service_account_key.json' # Replace with the actual path
CHRONICLE_CUSTOMER_ID = 'your-chronicle-customer-id-uuid' # Replace with the customer's Chronicle Customer ID (UUID)
CHRONICLE_REGION = 'us' # Replace with 'us', 'europe', 'asia-northeast1', etc. based on customer's region
AUGUR_FEED_NAME = 'Augur Security Threat Feed' # A descriptive name for your IOC feed
# --- API Endpoints ---
# Adjust the base URL based on the CHRONICLE_REGION
CHRONICLE_INGESTION_BASE_URLS = {
'us': 'https://malachiteingestion-pa.googleapis.com',
'europe': 'https://europe-malachiteingestion-pa.googleapis.com',
# Add other regions as needed
# 'asia-northeast1': 'https://asia-northeast1-malachiteingestion-pa.googleapis.com',
}
INGESTION_API_URL = f"{CHRONICLE_INGESTION_BASE_URLS.get(CHRONICLE_REGION, CHRONICLE_INGESTION_BASE_URLS['us'])}/v2/udmevents:batchCreate"
# --- Authentication Scopes ---
SCOPES = ['https://www.googleapis.com/auth/malachite-ingestion']
def get_authorized_session(service_account_file: str) -> google_requests.AuthorizedSession:
"""
Initializes and returns an authorized HTTP session for Chronicle API calls.
"""
try:
credentials = service_account.Credentials.from_service_account_file(
service_account_file,
scopes=SCOPES
)
return google_requests.AuthorizedSession(credentials)
except Exception as e:
print(f"Error initializing authorized session: {e}")
raise
def create_ioc_udm_event(
ioc_value: str,
ioc_type: str, # e.g., "IP_ADDRESS", "DOMAIN_NAME", "FILE_SHA256", "URL"
category: str, # e.g., "MALWARE_C2", "PHISHING", "SPAM"
severity: str, # "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFORMATIONAL"
confidence: str, # "HIGH", "MEDIUM", "LOW"
description: str = "",
source_info: dict = None # Optional: dict for additional custom source details
) -> dict:
"""
Creates a single UDM event dictionary for an IOC.
"""
timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds') + 'Z'
udm_event = {
"metadata": {
"event_type": "THREAT_IOC",
"event_timestamp": timestamp,
"log_type": AUGUR_FEED_NAME.replace(" ", "_").upper() # Creates a UDM-compatible log type
},
"security_result": {
"category": category,
"threat": {
"feed_name": AUGUR_FEED_NAME,
"indicator": ioc_value,
"indicator_type": ioc_type,
"summary": description if description else f"IOC from {AUGUR_FEED_NAME}: {ioc_value}",
"severity": severity,
"confidence": confidence
}
}
}
# Map IOC type to UDM principal/target fields
if ioc_type == "IP_ADDRESS":
udm_event["principal"] = {"ip": ioc_value, "asset_type": ioc_type}
elif ioc_type == "DOMAIN_NAME":
udm_event["principal"] = {"domain": ioc_value, "asset_type": ioc_type}
elif ioc_type == "URL":
udm_event["principal"] = {"url": ioc_value, "asset_type": ioc_type}
elif ioc_type in ["FILE_MD5", "FILE_SHA1", "FILE_SHA256"]:
udm_event["principal"] = {"file": {ioc_type.lower().replace("file_", ""): ioc_value}, "asset_type": "FILE"}
else:
# Fallback for unknown types or if you want to put all in principal.url for simplicity
udm_event["principal"] = {"url": ioc_value, "asset_type": "UNKNOWN_IOC"}
print(f"Warning: Unknown IOC type '{ioc_type}'. Mapped to principal.url.")
if source_info:
udm_event["extensions"] = {"augur_source_info": source_info}
return udm_event
def ingest_iocs(
session: google_requests.AuthorizedSession,
customer_id: str,
iocs_data: list[dict], # List of dicts, where each dict has ioc_value, ioc_type, etc.
batch_size: int = 100,
max_retries: int = 5,
retry_delay_seconds: int = 5
) -> None:
"""
Ingests a list of IOCs into Google Chronicle SIEM.
"""
if not iocs_data:
print("No IOCs to ingest.")
return
num_ingested = 0
total_iocs = len(iocs_data)
for i in range(0, total_iocs, batch_size):
batch_iocs = iocs_data[i:i + batch_size]
udm_events_batch = [create_ioc_udm_event(**ioc) for ioc in batch_iocs]
payload = {
"customer_id": customer_id,
"events": udm_events_batch,
"batch_id": str(uuid.uuid4()) # Unique batch ID for deduplication
}
retries = 0
while retries < max_retries:
try:
print(f"Ingesting batch {i // batch_size + 1} of {total_iocs // batch_size + (1 if total_iocs % batch_size > 0 else 0)}...")
# Note: The Google Auth AuthorizedSession handles refreshing tokens automatically
response = session.post(INGESTION_API_URL, json=payload)
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
print(f"Batch {i // batch_size + 1} successfully ingested. Status: {response.status_code}")
num_ingested += len(batch_iocs)
break # Exit retry loop on success
except requests.exceptions.HTTPError as e:
if e.response.status_code in [429, 500, 502, 503, 504]:
retries += 1
print(f"Temporary error (HTTP {e.response.status_code}) ingesting batch. Retrying in {retry_delay_seconds} seconds... (Attempt {retries}/{max_retries})")
time.sleep(retry_delay_seconds)
retry_delay_seconds *= 2 # Exponential backoff
else:
print(f"Permanent error (HTTP {e.response.status_code}) ingesting batch: {e.response.text}")
break # Don't retry for client errors
except requests.exceptions.RequestException as e:
retries += 1
print(f"Network error ingesting batch: {e}. Retrying in {retry_delay_seconds} seconds... (Attempt {retries}/{max_retries})")
time.sleep(retry_delay_seconds)
retry_delay_seconds *= 2
except Exception as e:
print(f"An unexpected error occurred: {e}")
break # Break for unexpected errors
else: # This else block executes if the while loop completes without a 'break' (i.e., retries exhausted)
print(f"Failed to ingest batch after {max_retries} retries.")
print(f"\nIngestion complete. Successfully ingested {num_ingested} of {total_iocs} IOCs.")
# --- Example Usage ---
if __name__ == "__main__":
# --- IMPORTANT: Replace with your actual values ---
# Example IOCs from Augur Security
# In a real scenario, this data would come from Augur's internal systems
sample_iocs = [
{
"ioc_value": "198.51.100.10",
"ioc_type": "IP_ADDRESS",
"category": "MALWARE_C2",
"severity": "CRITICAL",
"confidence": "HIGH",
"description": "Known C2 server for Ransomware variant X",
"source_info": {"augur_id": "IOC-00123", "threat_actor": "APT28"}
},
{
"ioc_value": "phishingsite.example.com",
"ioc_type": "DOMAIN_NAME",
"category": "PHISHING",
"severity": "HIGH",
"confidence": "MEDIUM",
"description": "Domain distributing fake login page for O365",
"source_info": {"augur_id": "IOC-00124"}
},
{
"ioc_value": "d41d8cd98f00b204e9800998ecf8427e",
"ioc_type": "FILE_MD5",
"category": "MALWARE_DLL",
"severity": "MEDIUM",
"confidence": "HIGH",
"description": "MD5 hash of suspicious DLL",
},
{
"ioc_value": "https://malicious.cdn.ru/payload.exe",
"ioc_type": "URL",
"category": "EXPLOIT_KIT",
"severity": "CRITICAL",
"confidence": "HIGH",
"description": "URL hosting an exploit kit payload",
},
# Add more IOCs as needed
]
try:
print("Authenticating to Google Chronicle SIEM...")
# Get authorized HTTP session
chronicle_session = get_authorized_session(SERVICE_ACCOUNT_KEY_FILE)
print("Authentication successful.")
print(f"Starting ingestion of {len(sample_iocs)} IOCs...")
ingest_iocs(chronicle_session, CHRONICLE_CUSTOMER_ID, sample_iocs)
except FileNotFoundError:
print(f"Error: Service account key file not found at {SERVICE_ACCOUNT_KEY_FILE}")
print("Please ensure the path is correct and the file exists.")
except Exception as e:
print(f"An unrecoverable error occurred: {e}")