Code Examples
This page provides practical code examples to help you get started with the NordStellar Dark Web API. These examples demonstrate common API operations and integration patterns that you can adapt for your specific use cases.
Table of Contents
- Subscription Service Listeners
- Email Lookup
- Email Domain Lookup
- Zero Knowledge Password Lookup
- Fetching AWS Credentials from Malware Logs
Subscription Service Listeners
This example demonstrates how to set up a webhook server to receive subscription notifications when NordStellar detects new breaches affecting your monitored assets.
from fastapi import FastAPI, Request, HTTPException, Depends, Security
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
import uvicorn
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
security = HTTPBasic()
# Configure these with your actual credentials
WEBHOOK_USERNAME = "your_webhook_username"
WEBHOOK_PASSWORD = "your_webhook_password"
# Model for the subscription webhook payload
class SubscriptionEvent(BaseModel):
subscription_id: str
document_type: str # email-sha256, phone-sha256, domain, cc-argon2id, or nin-argon2id
document_identifier: str
database_ids: List[str]
malware_log_ids: List[str]
credential_list_ids: List[str]
def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)):
# Verify the provided credentials
correct_username = secrets.compare_digest(credentials.username, WEBHOOK_USERNAME)
correct_password = secrets.compare_digest(credentials.password, WEBHOOK_PASSWORD)
if not (correct_username and correct_password):
raise HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials
@app.post("/subscription-webhook")
async def subscription_webhook(
event: SubscriptionEvent,
credentials: HTTPBasicCredentials = Security(verify_credentials)
):
"""
Endpoint to receive NordStellar subscription events.
This will be called whenever new breaches are detected for your monitored assets.
"""
# Process the event based on document type
if event.document_type == "email-sha256":
print(f"New breach detected for email: {event.document_identifier}")
elif event.document_type == "phone-sha256":
print(f"New breach detected for phone number: {event.document_identifier}")
elif event.document_type == "domain":
print(f"New breach detected for domain: {event.document_identifier}")
elif event.document_type in ["cc-argon2id", "nin-argon2id"]:
print(f"New breach detected for sensitive data: {event.document_type}")
# Log the affected breach sources
if event.database_ids:
print(f"Found in {len(event.database_ids)} database breaches")
if event.malware_log_ids:
print(f"Found in {len(event.malware_log_ids)} malware logs")
if event.credential_list_ids:
print(f"Found in {len(event.credential_list_ids)} credential lists")
# Here you would typically:
# 1. Store the event in your database
# 2. Trigger your internal processes (notifications, remediation, etc.)
# 3. Update your risk scoring systems
# Return a success response to acknowledge receipt
return {"status": "success", "message": "Event processed successfully"}
if __name__ == "__main__":
# Run the webhook server
uvicorn.run(app, host="0.0.0.0", port=8000)To register this webhook with NordStellar:
import requests
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
# The webhook URL that NordStellar will call
WEBHOOK_URL = "https://your-server.com/subscription-webhook"
WEBHOOK_USERNAME = "your_webhook_username"
WEBHOOK_PASSWORD = "your_webhook_password"
# Set up headers with authentication
headers = {
"X-API-KEY": API_KEY,
"Content-Type": "application/json"
}
# Prepare the webhook registration payload
payload = {
"url": WEBHOOK_URL,
"username": WEBHOOK_USERNAME,
"password": WEBHOOK_PASSWORD
}
# Register the webhook endpoint
response = requests.post(
f"{BASE_URL}/user/subscription-endpoint",
headers=headers,
json=payload
)
# Check response
if response.status_code == 200:
print("Webhook registered successfully")
else:
print(f"Error registering webhook: {response.status_code}")
print(response.text)Email Lookup
This example demonstrates how to check if an email address has been involved in any data breaches:
import requests
import hashlib
import json
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
def check_email_breaches(email):
"""
Check if an email address has been involved in any data breaches.
Args:
email: The email address to check
Returns:
Dictionary with breach information
"""
# Prepare the email address (hashed with SHA-256)
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()
# Set up headers with authentication
headers = {
"X-API-KEY": API_KEY,
"Content-Type": "application/json"
}
# Make API request
response = requests.get(
f"{BASE_URL}/email/{email_hash}",
headers=headers
)
# Process response
if response.status_code == 200:
data = response.json()
# Print a summary of the breach data
print(f"\nEmail: {email}")
if 'databases' in data and data['databases']:
print(f"Found in {len(data['databases'])} database breaches:")
for idx, breach in enumerate(data['databases'], 1):
breach_id = breach.get('id', 'Unknown')
breach_domain = breach.get('identifier', 'Unknown')
print(f" {idx}. ID: {breach_id}, Domain: {breach_domain}")
# Print the data fields exposed in this breach
if 'data' in breach:
data_fields = [item['key'] for item in breach['data']]
print(f" Exposed data: {', '.join(data_fields)}")
if 'malware_logs' in data and data['malware_logs']:
print(f"Found in {len(data['malware_logs'])} malware logs")
if 'credential_lists' in data and data['credential_lists']:
print(f"Found in {len(data['credential_lists'])} credential lists")
return data
elif response.status_code == 404:
print(f"No breach data found for {email}")
return None
else:
print(f"Error: {response.status_code}")
print(response.text)
return None
# Example usage
if __name__ == "__main__":
emails_to_check = [
"[email protected]",
"[email protected]"
]
for email in emails_to_check:
breach_data = check_email_breaches(email)
# Optionally save the full response to a file for detailed analysis
if breach_data:
with open(f"{email.replace('@', '_at_')}_breaches.json", 'w') as f:
json.dump(breach_data, f, indent=2)Email Domain Lookup
This example shows how to check for breached email addresses within a specific domain:
import requests
import json
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
def check_domain_breaches(domain, limit=100, offset=0):
"""
Check for breached email addresses within a specific domain.
Args:
domain: The domain to check (e.g., example.com)
limit: Maximum number of results to return
offset: Offset for pagination
Returns:
List of breached email addresses for the domain
"""
# Set up headers with authentication
headers = {
"X-API-KEY": API_KEY,
"Content-Type": "application/json"
}
# Set up parameters
params = {
"limit": limit,
"offset": offset
}
# Make API request
response = requests.get(
f"{BASE_URL}/email/domain/{domain}",
headers=headers,
params=params
)
# Process response
if response.status_code == 200:
data = response.json()
# Print a summary of the breach data
print(f"\nDomain: {domain}")
if 'data' in data and data['data']:
print(f"Found {len(data['data'])} breached email addresses")
# Extract and display data about the breached emails
for idx, email_data in enumerate(data['data'], 1):
email = email_data.get('email', 'Unknown')
total_breaches = 0
if 'databases' in email_data:
total_breaches += len(email_data['databases'])
if 'malware_logs' in email_data:
total_breaches += len(email_data['malware_logs'])
if 'credential_lists' in email_data:
total_breaches += len(email_data['credential_lists'])
print(f" {idx}. {email}: Found in {total_breaches} breaches")
# Check if there are more results
if len(data['data']) >= limit:
print(f"There may be more results. Current offset: {offset}, limit: {limit}")
print(f"To see more results, increase offset to {offset + limit}")
return data
elif response.status_code == 404:
print(f"No breach data found for domain {domain}")
return None
else:
print(f"Error: {response.status_code}")
print(response.text)
return None
# Example usage
if __name__ == "__main__":
domains_to_check = [
"example.com",
"nordstellar.com"
]
for domain in domains_to_check:
domain_breach_data = check_domain_breaches(domain)
# Optionally save the full response to a file for detailed analysis
if domain_breach_data:
with open(f"{domain}_breaches.json", 'w') as f:
json.dump(domain_breach_data, f, indent=2)
# If there are many results, paginate through them
if domain_breach_data and 'data' in domain_breach_data and len(domain_breach_data['data']) >= 100:
offset = 100
more_data = check_domain_breaches(domain, limit=100, offset=offset)
# Continue with pagination as neededZero Knowledge Password Lookup
This example demonstrates how to check if a password has been compromised without sending the actual password to the API:
import requests
import hashlib
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
def check_password_breach(password):
"""
Check if a password has been seen in data breaches using the zero-knowledge approach.
Args:
password: The password to check
Returns:
True if the password has been found in breaches, False otherwise
"""
# Hash the password using SHA-256
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Use only the first 6 characters of the hash for the API request
hash_prefix = password_hash[:6]
# Set up headers with authentication
headers = {
"X-API-KEY": API_KEY,
"Content-Type": "application/json"
}
# Make API request with the hash prefix
response = requests.get(
f"{BASE_URL}/zero-knowledge/password/sha256/{hash_prefix}",
headers=headers
)
# Process response
if response.status_code == 200:
data = response.json()
# Check if our full hash is in the returned data
for hash_data in data.get('data', []):
if hash_data.get('hash') == password_hash:
print(f"Password has been found in {hash_data.get('count', 0)} breaches!")
return True
print("Password not found in any known breaches.")
return False
else:
print(f"Error checking password: {response.status_code}")
print(response.text)
return None
def check_bulk_passwords(passwords):
"""
Check multiple passwords at once using the bulk zero-knowledge endpoint.
Args:
passwords: List of passwords to check
Returns:
Dictionary mapping passwords to breach counts
"""
# Hash all passwords and collect their prefixes
hash_prefixes = []
password_hashes = {}
for password in passwords:
full_hash = hashlib.sha256(password.encode()).hexdigest()
prefix = full_hash[:6]
hash_prefixes.append(prefix)
password_hashes[full_hash] = password
# Set up headers with authentication
headers = {
"X-API-KEY": API_KEY,
"Content-Type": "application/json"
}
# Prepare the payload for the bulk request
payload = {
"hash_ranges": hash_prefixes,
"hash_type": "sha256"
}
# Make API request
response = requests.post(
f"{BASE_URL}/zero-knowledge/password",
headers=headers,
json=payload
)
# Process response
results = {}
if response.status_code == 200:
data = response.json()
# Check all returned hashes against our full hashes
for hash_data in data.get('data', []):
hash_value = hash_data.get('hash')
breach_count = hash_data.get('count', 0)
# If this is one of our passwords, record the result
if hash_value in password_hashes:
original_password = password_hashes[hash_value]
results[original_password] = breach_count
if breach_count > 0:
print(f"Password '{original_password}' has been found in {breach_count} breaches!")
else:
print(f"Password '{original_password}' not found in any known breaches.")
else:
print(f"Error checking passwords: {response.status_code}")
print(response.text)
return results
# Example usage
if __name__ == "__main__":
# Check a single password
check_password_breach("Password123")
# Check multiple passwords at once
passwords_to_check = [
"qwerty123",
"Secure_P@ssw0rd!",
"letmein"
]
breach_counts = check_bulk_passwords(passwords_to_check)Fetching AWS Credentials from Malware Logs
This example demonstrates how to retrieve AWS credentials that have been leaked through malware infections:
import asyncio
import aiohttp
import csv
from typing import List, Dict, Optional, Any
from pydantic import BaseModel
import datetime
# Configuration - replace with your values
API_KEY = "YOUR_API_KEY"
TARGET_URL = "us-east-1.console.aws.amazon.com" # URL to search for in credentials
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
LIMIT = 500 # Number of malware logs to retrieve
OUTPUT_FILE = f"nordstellar_aws_credentials_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
TIMEOUT = 120 # 2 minutes timeout in seconds
CREDENTIAL_FETCH_BATCH_SIZE = 50 # Batch size for concurrent credential fetching
class MalwareLogIds(BaseModel):
"""Model for malware log IDs response"""
credentials_url: str
malware_log_ids: List[str]
limit: int
offset: int
class Credential(BaseModel):
"""Model for individual credential data"""
url: Optional[str] = None
cleaned_url: Optional[str] = None
username: Optional[str] = None
email: Optional[str] = None
password: Optional[str] = None
application: Optional[str] = None
class MalwareLogCredentials(BaseModel):
"""Model for malware log credentials response"""
identifier: Optional[str] = None
id: str
published_date: Optional[str] = None
credentials: List[Credential]
async def fetch_malware_log_ids(session, url):
"""Fetch malware log IDs for a specific URL with extended timeout"""
endpoint = f"{BASE_URL}/data-source/malware-log/credentials/url/{url}/id"
headers = {"X-API-KEY": API_KEY}
params = {"limit": LIMIT}
# Use timeout parameter to set a longer timeout for this specific request
async with session.get(endpoint, headers=headers, params=params, timeout=TIMEOUT) as response:
if response.status == 200:
data = await response.json()
return MalwareLogIds(**data)
else:
print(f"Error fetching malware log IDs: {response.status}")
return None
async def fetch_credentials(session, malware_log_id, url):
"""Fetch credentials for a specific malware log ID and URL"""
endpoint = f"{BASE_URL}/data-source/malware-log/{malware_log_id}/credentials/url/{url}"
headers = {"X-API-KEY": API_KEY}
async with session.get(endpoint, headers=headers, timeout=TIMEOUT) as response:
if response.status == 200:
data = await response.json()
return MalwareLogCredentials(**data)
elif response.status == 404:
print(f"No credentials found for log ID {malware_log_id}")
return None
else:
print(f"Error fetching credentials for log ID {malware_log_id}: {response.status}")
return None
async def fetch_credentials_batch(session, log_ids: List[str], target_url: str) -> Dict[str, Any]:
"""
Fetch credentials for multiple log IDs in parallel
Args:
session: The session object for making requests
log_ids: List of log IDs to fetch credentials for
target_url: The URL to fetch credentials from
Returns:
Dictionary mapping log IDs to their respective credentials
"""
tasks = []
for log_id in log_ids:
tasks.append(fetch_credentials(session, log_id, target_url))
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
# Map results back to log_ids
return {log_id: result for log_id, result in zip(log_ids, results)}
async def main():
"""Main function to demonstrate API usage and write results to CSV"""
print(f"Fetching malware logs for URL: {TARGET_URL}")
# Create a CSV file for writing
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
# Define the CSV fields
fieldnames = [
'malware_id', 'malware_identifier', 'published_date',
'url', 'cleaned_url', 'username', 'email', 'password', 'application'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Create a session with the default timeout
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
# Step 1: Get malware log IDs for the target URL
malware_logs = await fetch_malware_log_ids(session, TARGET_URL)
if not malware_logs or not malware_logs.malware_log_ids:
print(f"No malware log IDs found for URL: {TARGET_URL}")
return
log_ids = malware_logs.malware_log_ids
print(f"Found {len(log_ids)} malware log IDs")
# Step 2: Fetch credentials for logs in batches
total_credentials = 0
# Process in batches for better performance
for i in range(0, len(log_ids), CREDENTIAL_FETCH_BATCH_SIZE):
batch = log_ids[i:i + CREDENTIAL_FETCH_BATCH_SIZE]
print(f"Fetching credentials for batch of {len(batch)} logs (logs {i+1} to {i+len(batch)})")
# Process this batch in parallel
batch_results = await fetch_credentials_batch(session, batch, TARGET_URL)
# Process results from this batch
for log_id, cred_log in batch_results.items():
if cred_log and cred_log.credentials:
# Write each credential to its own row in the CSV
for cred in cred_log.credentials:
writer.writerow({
'malware_id': cred_log.id,
'malware_identifier': cred_log.identifier or '',
'published_date': cred_log.published_date or '',
'url': cred.url or '',
'cleaned_url': cred.cleaned_url or '',
'username': cred.username or '',
'email': cred.email or '',
'password': cred.password or '',
'application': cred.application or ''
})
total_credentials += 1
print(f"\nProcessed {len(log_ids)} malware logs with a total of {total_credentials} credentials")
print(f"Results saved to {OUTPUT_FILE}")
if __name__ == "__main__":
# Run the async function
asyncio.run(main())This script can be easily adapted to search for credentials from other services by changing the TARGET_URL variable. For example:
"facebook.com"for Facebook credentials"mail.google.com"for Gmail credentials"github.com"for GitHub credentials
Fetching AWS Secrets from Malware Logs
This example demonstrates how to retrieve AWS secrets that have been leaked through malware infections:
import requests
import json
import time
import datetime
# --- Configuration ---
API_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data/data-source/malware-log/secrets/feed"
# IMPORTANT: Replace 'YOUR_SECRET_TOKEN' with your actual API key
API_KEY = "YOUR_SECRET_TOKEN"
# Initial start timestamp for the iteration (Unix timestamp)
# The script will start fetching data from this point onwards.
INITIAL_START_TIMESTAMP = int(time.time())
TIME_WINDOW_SECONDS = 3600 # Query 1 hour of data at a time
# Types of secrets to fetch
TYPES = ["aws-secret-key"]
# Number of records to fetch per API call
LIMIT = 100
def format_timestamp(ts):
"""Converts a Unix timestamp to a human-readable string."""
if ts is None:
return "N/A"
return datetime.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S UTC")
def fetch_data_batch(ts_from, ts_to, offset):
"""
Fetches a batch of data from the API for a specific time window and offset.
Includes retry logic for 429 Too Many Requests errors.
Args:
ts_from (int): The start Unix timestamp for the query window.
ts_to (int): The end Unix timestamp for the query window.
offset (int): The offset for pagination.
Returns:
list: A list of records fetched from the API, or None if a critical error occurs.
Returns an empty list if no records are found or if retries for 429 are exhausted.
"""
headers = {"Content-Type": "application/json", "X-API-KEY": API_KEY}
payload = {
"published_ts_from": ts_from,
"published_ts_to": ts_to,
"types": TYPES,
"limit": LIMIT,
"offset": offset,
}
MAX_RETRIES_429 = 3
RETRY_DELAY_SECONDS_429 = 5
current_retry_429 = 0
while True: # Loop for retrying 429 errors
attempt_number = current_retry_429 + 1
print(
f"Fetching data: window [{format_timestamp(ts_from)} to {format_timestamp(ts_to)}], offset: {offset}, limit: {LIMIT}"
+ (f", attempt {attempt_number}" if current_retry_429 > 0 else "")
)
# print(f"Payload: {json.dumps(payload, indent=2)}") # Uncomment for debugging payload
response = None # Initialize response here to ensure it's available in broader scope if needed for logging
try:
start_time = time.time()
response = requests.post(
API_URL, headers=headers, json=payload, timeout=30
) # 30-second timeout
end_time = time.time()
duration = end_time - start_time
print(f"Request completed in {duration:.2f} seconds.")
response.raise_for_status() # Raises HTTPError for 4xx/5xx responses
# If we're here, status code was 2xx
data = (
response.json()
) # Can raise JSONDecodeError if 2xx response is not valid JSON
if (
isinstance(data, dict)
and "data" in data
and isinstance(data["data"], list)
):
return data["data"]
else:
print("Warning: Unexpected API response format (after 200 OK).")
if isinstance(data, dict):
print(
"Expected a dictionary with a 'data' key containing a list of records."
)
print(f"Keys found: {list(data.keys())}")
if "data" in data:
print(f"Type of 'data' key: {type(data['data'])}")
else:
print("'data' key not found.")
else:
print(f"Expected a dictionary, but got type: {type(data)}")
print(f"Response sample: {str(data)[:200]}")
return [] # Treat as a failed batch for safety
except requests.exceptions.HTTPError as http_err:
if http_err.response is not None:
status_code = http_err.response.status_code
error_message_detail = f"(Status: {status_code}) "
try:
# Try to parse the error response as JSON to get structured error message
error_json = http_err.response.json()
api_error_body = error_json.get("errors", {}).get("body")
if api_error_body:
error_message_detail += f"API Message: {api_error_body}"
elif (
http_err.response.text
): # Fallback if no structured 'errors.body'
error_message_detail += (
f"Raw Response: {http_err.response.text[:200]}"
)
else:
error_message_detail += (
"(No further details in JSON response body)"
)
except json.JSONDecodeError: # Error response was not JSON
if http_err.response.text:
error_message_detail += (
f"Raw Response (not JSON): {http_err.response.text[:200]}"
)
else:
error_message_detail += (
"(Empty or non-JSON error response body)"
)
except (
Exception
) as e_extract: # Catch any other issue during error detail extraction
error_message_detail += (
f"(Could not fully parse error details: {e_extract})"
)
if status_code == 429: # Too Many Requests
if current_retry_429 < MAX_RETRIES_429:
current_retry_429 += 1
print(
f"Rate limited (429). Retrying in {RETRY_DELAY_SECONDS_429} seconds... (Retry {current_retry_429}/{MAX_RETRIES_429})"
)
time.sleep(RETRY_DELAY_SECONDS_429)
continue # Retry the request
else:
print(
f"Rate limited (429). Max retries ({MAX_RETRIES_429}) reached. Failing for this batch."
)
return [] # Exhausted retries for this batch
elif status_code == 401: # Unauthorized
print(
f"Critical Error: Unauthorized. {error_message_detail}. Check your API_KEY."
)
return None # Critical failure, stop the script
elif status_code == 403: # Forbidden
print(
f"Critical Error: Forbidden. {error_message_detail}. Check permissions or API_KEY validity."
)
return None # Critical failure, stop the script
elif status_code == 404: # Not Found
# For the /secrets/feed endpoint, 404 means "No data found".
# This is an expected outcome for a query window, not a systemic error.
# The calling function `iterate_complete_dataset` handles an empty list `batch` correctly.
# print(f"Info: No data found for this specific request. {error_message_detail}") # Optional: for verbose logging
return [] # Return empty list, indicating no data for this batch
# For other HTTP errors (e.g., 400 Bad Request, 5xx Server Errors)
else:
print(f"HTTP error occurred: {http_err}. {error_message_detail}")
return (
[]
) # Treat as a failed batch, allow script to continue with next window/offset
else: # HTTPError occurred but http_err.response is None (e.g., proxy errors)
print(f"HTTPError occurred without a response object: {http_err}")
return [] # Treat as a failed batch
except requests.exceptions.ConnectionError as conn_err:
print(f"Connection error occurred: {conn_err}")
return []
except requests.exceptions.Timeout as timeout_err:
print(f"Request timed out: {timeout_err}")
return []
except (
json.JSONDecodeError
) as json_err: # Specifically for issues decoding a 2xx response's JSON
print(f"Error decoding JSON response (expected 2xx success): {json_err}")
try:
# 'response' should be defined from the try block if this exception occurs after response = ...
if response and hasattr(response, "text"):
print(
f"Response content that failed to parse: {response.text[:500]}"
)
else:
print(
"Response object not available or has no text attribute to log for JSONDecodeError."
)
except Exception as e_log: # Catching potential errors in logging the error
print(f"Could not log response text for JSONDecodeError: {e_log}")
return []
except (
requests.exceptions.RequestException
) as req_err: # Catch-all for other request-related issues
print(f"An error occurred during the API request: {req_err}")
return []
def iterate_complete_dataset():
"""
Iterates through the dataset by progressing time windows.
"""
if API_KEY == "YOUR_SECRET_TOKEN":
print(
"Error: Please replace 'YOUR_SECRET_TOKEN' with your actual API key in the script."
)
return
current_ts_from = INITIAL_START_TIMESTAMP
total_records_processed_session = 0
print(f"Starting data iteration from: {format_timestamp(current_ts_from)}")
print(
f"Time window size: {TIME_WINDOW_SECONDS // 3600} hours ({TIME_WINDOW_SECONDS} seconds)"
)
print("Press Ctrl+C to stop the script.\n")
try:
while True:
current_ts_to = current_ts_from + TIME_WINDOW_SECONDS - 1
offset = 0
records_in_current_window = 0
print(
f"--- Processing time window: {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)} ---"
)
while True:
batch = fetch_data_batch(current_ts_from, current_ts_to, offset)
if batch is None:
print("Stopping due to critical API error.")
return
if not batch:
if offset == 0:
print(
f"No records found for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)}."
)
else:
print(
f"End of records for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)}."
)
break
print(
f"Retrieved {len(batch)} records for window {format_timestamp(current_ts_from)}-{format_timestamp(current_ts_to)}, offset {offset}."
)
# --- Process your records here ---
for record_index, record in enumerate(batch):
# print(f" Record {record_index + 1}: {json.dumps(record, indent=2)}")
# print(f" Record {record_index + 1}: ID - {record.get('id', 'N/A')}, Type - {record.get('type', 'N/A')}")
pass # Replace with your actual processing logic
# ---------------------------------
records_in_current_window += len(batch)
total_records_processed_session += len(batch)
if len(batch) < LIMIT:
print(
f"Reached end of data for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)} (less than limit)."
)
break
offset += LIMIT
print(
f"--- Finished processing window. Total records for this window: {records_in_current_window} ---"
)
current_ts_from = (
current_ts_from - TIME_WINDOW_SECONDS
) # Start of the previous window (going backward)
except KeyboardInterrupt:
print("\nScript interrupted by user.")
except Exception as e:
print(f"An unexpected error occurred in the main loop: {e}")
finally:
print("\n--- Iteration Summary ---")
print(f"Last processed window started at: {format_timestamp(current_ts_from)}")
print(
f"Total records processed in this session: {total_records_processed_session}"
)
print("Script finished.")
if __name__ == "__main__":
iterate_complete_dataset()These examples demonstrate the key operations available through the NordStellar Dark Web API. You can use them as starting points for your own integrations, adapting them to your specific security needs and technical environment.