Enterprise APIs
Dark Web API
Code Examples

Code Examples

This page provides practical code examples to help you get started with the NordStellar Dark Web API. These examples demonstrate common API operations and integration patterns that you can adapt for your specific use cases.

Table of Contents

Subscription Service Listeners

This example demonstrates how to set up a webhook server to receive subscription notifications when NordStellar detects new breaches affecting your monitored assets.

from fastapi import FastAPI, Request, HTTPException, Depends, Security
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
import uvicorn
from pydantic import BaseModel
from typing import List, Optional
 
app = FastAPI()
security = HTTPBasic()
 
# Configure these with your actual credentials
WEBHOOK_USERNAME = "your_webhook_username"
WEBHOOK_PASSWORD = "your_webhook_password"
 
# Model for the subscription webhook payload
class SubscriptionEvent(BaseModel):
    subscription_id: str
    document_type: str  # email-sha256, phone-sha256, domain, cc-argon2id, or nin-argon2id
    document_identifier: str
    database_ids: List[str]
    malware_log_ids: List[str]
    credential_list_ids: List[str]
 
def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)):
    # Verify the provided credentials
    correct_username = secrets.compare_digest(credentials.username, WEBHOOK_USERNAME)
    correct_password = secrets.compare_digest(credentials.password, WEBHOOK_PASSWORD)
    
    if not (correct_username and correct_password):
        raise HTTPException(
            status_code=401,
            detail="Invalid credentials",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials
 
@app.post("/subscription-webhook")
async def subscription_webhook(
    event: SubscriptionEvent,
    credentials: HTTPBasicCredentials = Security(verify_credentials)
):
    """
    Endpoint to receive NordStellar subscription events.
    This will be called whenever new breaches are detected for your monitored assets.
    """
    # Process the event based on document type
    if event.document_type == "email-sha256":
        print(f"New breach detected for email: {event.document_identifier}")
    elif event.document_type == "phone-sha256":
        print(f"New breach detected for phone number: {event.document_identifier}")
    elif event.document_type == "domain":
        print(f"New breach detected for domain: {event.document_identifier}")
    elif event.document_type in ["cc-argon2id", "nin-argon2id"]:
        print(f"New breach detected for sensitive data: {event.document_type}")
    
    # Log the affected breach sources
    if event.database_ids:
        print(f"Found in {len(event.database_ids)} database breaches")
    if event.malware_log_ids:
        print(f"Found in {len(event.malware_log_ids)} malware logs")
    if event.credential_list_ids:
        print(f"Found in {len(event.credential_list_ids)} credential lists")
    
    # Here you would typically:
    # 1. Store the event in your database
    # 2. Trigger your internal processes (notifications, remediation, etc.)
    # 3. Update your risk scoring systems
    
    # Return a success response to acknowledge receipt
    return {"status": "success", "message": "Event processed successfully"}
 
if __name__ == "__main__":
    # Run the webhook server
    uvicorn.run(app, host="0.0.0.0", port=8000)

To register this webhook with NordStellar:

import requests
 
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
 
# The webhook URL that NordStellar will call
WEBHOOK_URL = "https://your-server.com/subscription-webhook"
WEBHOOK_USERNAME = "your_webhook_username"
WEBHOOK_PASSWORD = "your_webhook_password"
 
# Set up headers with authentication
headers = {
    "X-API-KEY": API_KEY,
    "Content-Type": "application/json"
}
 
# Prepare the webhook registration payload
payload = {
    "url": WEBHOOK_URL,
    "username": WEBHOOK_USERNAME,
    "password": WEBHOOK_PASSWORD
}
 
# Register the webhook endpoint
response = requests.post(
    f"{BASE_URL}/user/subscription-endpoint",
    headers=headers,
    json=payload
)
 
# Check response
if response.status_code == 200:
    print("Webhook registered successfully")
else:
    print(f"Error registering webhook: {response.status_code}")
    print(response.text)

Email Lookup

This example demonstrates how to check if an email address has been involved in any data breaches:

import requests
import hashlib
import json
 
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
 
def check_email_breaches(email):
    """
    Check if an email address has been involved in any data breaches.
    
    Args:
        email: The email address to check
        
    Returns:
        Dictionary with breach information
    """
    # Prepare the email address (hashed with SHA-256)
    email_hash = hashlib.sha256(email.lower().encode()).hexdigest()
    
    # Set up headers with authentication
    headers = {
        "X-API-KEY": API_KEY,
        "Content-Type": "application/json"
    }
    
    # Make API request
    response = requests.get(
        f"{BASE_URL}/email/{email_hash}",
        headers=headers
    )
    
    # Process response
    if response.status_code == 200:
        data = response.json()
        
        # Print a summary of the breach data
        print(f"\nEmail: {email}")
        
        if 'databases' in data and data['databases']:
            print(f"Found in {len(data['databases'])} database breaches:")
            for idx, breach in enumerate(data['databases'], 1):
                breach_id = breach.get('id', 'Unknown')
                breach_domain = breach.get('identifier', 'Unknown')
                print(f"  {idx}. ID: {breach_id}, Domain: {breach_domain}")
                
                # Print the data fields exposed in this breach
                if 'data' in breach:
                    data_fields = [item['key'] for item in breach['data']]
                    print(f"     Exposed data: {', '.join(data_fields)}")
        
        if 'malware_logs' in data and data['malware_logs']:
            print(f"Found in {len(data['malware_logs'])} malware logs")
        
        if 'credential_lists' in data and data['credential_lists']:
            print(f"Found in {len(data['credential_lists'])} credential lists")
            
        return data
    
    elif response.status_code == 404:
        print(f"No breach data found for {email}")
        return None
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None
 
# Example usage
if __name__ == "__main__":
    emails_to_check = [
        "[email protected]",
        "[email protected]"
    ]
    
    for email in emails_to_check:
        breach_data = check_email_breaches(email)
        
        # Optionally save the full response to a file for detailed analysis
        if breach_data:
            with open(f"{email.replace('@', '_at_')}_breaches.json", 'w') as f:
                json.dump(breach_data, f, indent=2)

Email Domain Lookup

This example shows how to check for breached email addresses within a specific domain:

import requests
import json
 
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
 
def check_domain_breaches(domain, limit=100, offset=0):
    """
    Check for breached email addresses within a specific domain.
    
    Args:
        domain: The domain to check (e.g., example.com)
        limit: Maximum number of results to return
        offset: Offset for pagination
        
    Returns:
        List of breached email addresses for the domain
    """
    # Set up headers with authentication
    headers = {
        "X-API-KEY": API_KEY,
        "Content-Type": "application/json"
    }
    
    # Set up parameters
    params = {
        "limit": limit,
        "offset": offset
    }
    
    # Make API request
    response = requests.get(
        f"{BASE_URL}/email/domain/{domain}",
        headers=headers,
        params=params
    )
    
    # Process response
    if response.status_code == 200:
        data = response.json()
        
        # Print a summary of the breach data
        print(f"\nDomain: {domain}")
        
        if 'data' in data and data['data']:
            print(f"Found {len(data['data'])} breached email addresses")
            
            # Extract and display data about the breached emails
            for idx, email_data in enumerate(data['data'], 1):
                email = email_data.get('email', 'Unknown')
                total_breaches = 0
                
                if 'databases' in email_data:
                    total_breaches += len(email_data['databases'])
                if 'malware_logs' in email_data:
                    total_breaches += len(email_data['malware_logs'])
                if 'credential_lists' in email_data:
                    total_breaches += len(email_data['credential_lists'])
                    
                print(f"  {idx}. {email}: Found in {total_breaches} breaches")
            
            # Check if there are more results
            if len(data['data']) >= limit:
                print(f"There may be more results. Current offset: {offset}, limit: {limit}")
                print(f"To see more results, increase offset to {offset + limit}")
                
        return data
    
    elif response.status_code == 404:
        print(f"No breach data found for domain {domain}")
        return None
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None
 
# Example usage
if __name__ == "__main__":
    domains_to_check = [
        "example.com",
        "nordstellar.com"
    ]
    
    for domain in domains_to_check:
        domain_breach_data = check_domain_breaches(domain)
        
        # Optionally save the full response to a file for detailed analysis
        if domain_breach_data:
            with open(f"{domain}_breaches.json", 'w') as f:
                json.dump(domain_breach_data, f, indent=2)
            
        # If there are many results, paginate through them
        if domain_breach_data and 'data' in domain_breach_data and len(domain_breach_data['data']) >= 100:
            offset = 100
            more_data = check_domain_breaches(domain, limit=100, offset=offset)
            # Continue with pagination as needed

Zero Knowledge Password Lookup

This example demonstrates how to check if a password has been compromised without sending the actual password to the API:

import requests
import hashlib
 
# Configuration
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
 
def check_password_breach(password):
    """
    Check if a password has been seen in data breaches using the zero-knowledge approach.
    
    Args:
        password: The password to check
        
    Returns:
        True if the password has been found in breaches, False otherwise
    """
    # Hash the password using SHA-256
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    # Use only the first 6 characters of the hash for the API request
    hash_prefix = password_hash[:6]
    
    # Set up headers with authentication
    headers = {
        "X-API-KEY": API_KEY,
        "Content-Type": "application/json"
    }
    
    # Make API request with the hash prefix
    response = requests.get(
        f"{BASE_URL}/zero-knowledge/password/sha256/{hash_prefix}",
        headers=headers
    )
    
    # Process response
    if response.status_code == 200:
        data = response.json()
        
        # Check if our full hash is in the returned data
        for hash_data in data.get('data', []):
            if hash_data.get('hash') == password_hash:
                print(f"Password has been found in {hash_data.get('count', 0)} breaches!")
                return True
        
        print("Password not found in any known breaches.")
        return False
    else:
        print(f"Error checking password: {response.status_code}")
        print(response.text)
        return None
 
def check_bulk_passwords(passwords):
    """
    Check multiple passwords at once using the bulk zero-knowledge endpoint.
    
    Args:
        passwords: List of passwords to check
        
    Returns:
        Dictionary mapping passwords to breach counts
    """
    # Hash all passwords and collect their prefixes
    hash_prefixes = []
    password_hashes = {}
    
    for password in passwords:
        full_hash = hashlib.sha256(password.encode()).hexdigest()
        prefix = full_hash[:6]
        hash_prefixes.append(prefix)
        password_hashes[full_hash] = password
    
    # Set up headers with authentication
    headers = {
        "X-API-KEY": API_KEY,
        "Content-Type": "application/json"
    }
    
    # Prepare the payload for the bulk request
    payload = {
        "hash_ranges": hash_prefixes,
        "hash_type": "sha256"
    }
    
    # Make API request
    response = requests.post(
        f"{BASE_URL}/zero-knowledge/password",
        headers=headers,
        json=payload
    )
    
    # Process response
    results = {}
    
    if response.status_code == 200:
        data = response.json()
        
        # Check all returned hashes against our full hashes
        for hash_data in data.get('data', []):
            hash_value = hash_data.get('hash')
            breach_count = hash_data.get('count', 0)
            
            # If this is one of our passwords, record the result
            if hash_value in password_hashes:
                original_password = password_hashes[hash_value]
                results[original_password] = breach_count
                
                if breach_count > 0:
                    print(f"Password '{original_password}' has been found in {breach_count} breaches!")
                else:
                    print(f"Password '{original_password}' not found in any known breaches.")
    else:
        print(f"Error checking passwords: {response.status_code}")
        print(response.text)
    
    return results
 
# Example usage
if __name__ == "__main__":
    # Check a single password
    check_password_breach("Password123")
    
    # Check multiple passwords at once
    passwords_to_check = [
        "qwerty123",
        "Secure_P@ssw0rd!",
        "letmein"
    ]
    
    breach_counts = check_bulk_passwords(passwords_to_check)

Fetching AWS Credentials from Malware Logs

This example demonstrates how to retrieve AWS credentials that have been leaked through malware infections:

import asyncio
import aiohttp
import csv
from typing import List, Dict, Optional, Any
from pydantic import BaseModel
import datetime
 
# Configuration - replace with your values
API_KEY = "YOUR_API_KEY"
TARGET_URL = "us-east-1.console.aws.amazon.com"  # URL to search for in credentials
BASE_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data"
LIMIT = 500  # Number of malware logs to retrieve
OUTPUT_FILE = f"nordstellar_aws_credentials_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
TIMEOUT = 120  # 2 minutes timeout in seconds
CREDENTIAL_FETCH_BATCH_SIZE = 50  # Batch size for concurrent credential fetching
 
 
class MalwareLogIds(BaseModel):
    """Model for malware log IDs response"""
    credentials_url: str
    malware_log_ids: List[str]
    limit: int
    offset: int
 
 
class Credential(BaseModel):
    """Model for individual credential data"""
    url: Optional[str] = None
    cleaned_url: Optional[str] = None
    username: Optional[str] = None
    email: Optional[str] = None
    password: Optional[str] = None
    application: Optional[str] = None
 
 
class MalwareLogCredentials(BaseModel):
    """Model for malware log credentials response"""
    identifier: Optional[str] = None
    id: str
    published_date: Optional[str] = None
    credentials: List[Credential]
 
 
async def fetch_malware_log_ids(session, url):
    """Fetch malware log IDs for a specific URL with extended timeout"""
    endpoint = f"{BASE_URL}/data-source/malware-log/credentials/url/{url}/id"
    headers = {"X-API-KEY": API_KEY}
    params = {"limit": LIMIT}
    
    # Use timeout parameter to set a longer timeout for this specific request
    async with session.get(endpoint, headers=headers, params=params, timeout=TIMEOUT) as response:
        if response.status == 200:
            data = await response.json()
            return MalwareLogIds(**data)
        else:
            print(f"Error fetching malware log IDs: {response.status}")
            return None
 
 
async def fetch_credentials(session, malware_log_id, url):
    """Fetch credentials for a specific malware log ID and URL"""
    endpoint = f"{BASE_URL}/data-source/malware-log/{malware_log_id}/credentials/url/{url}"
    headers = {"X-API-KEY": API_KEY}
    
    async with session.get(endpoint, headers=headers, timeout=TIMEOUT) as response:
        if response.status == 200:
            data = await response.json()
            return MalwareLogCredentials(**data)
        elif response.status == 404:
            print(f"No credentials found for log ID {malware_log_id}")
            return None
        else:
            print(f"Error fetching credentials for log ID {malware_log_id}: {response.status}")
            return None
 
 
async def fetch_credentials_batch(session, log_ids: List[str], target_url: str) -> Dict[str, Any]:
    """
    Fetch credentials for multiple log IDs in parallel
    
    Args:
        session: The session object for making requests
        log_ids: List of log IDs to fetch credentials for
        target_url: The URL to fetch credentials from
        
    Returns:
        Dictionary mapping log IDs to their respective credentials
    """
    tasks = []
    for log_id in log_ids:
        tasks.append(fetch_credentials(session, log_id, target_url))
    
    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    
    # Map results back to log_ids
    return {log_id: result for log_id, result in zip(log_ids, results)}
 
 
async def main():
    """Main function to demonstrate API usage and write results to CSV"""
    print(f"Fetching malware logs for URL: {TARGET_URL}")
    
    # Create a CSV file for writing
    with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
        # Define the CSV fields
        fieldnames = [
            'malware_id', 'malware_identifier', 'published_date', 
            'url', 'cleaned_url', 'username', 'email', 'password', 'application'
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        
        # Create a session with the default timeout
        timeout = aiohttp.ClientTimeout(total=TIMEOUT)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # Step 1: Get malware log IDs for the target URL
            malware_logs = await fetch_malware_log_ids(session, TARGET_URL)
            
            if not malware_logs or not malware_logs.malware_log_ids:
                print(f"No malware log IDs found for URL: {TARGET_URL}")
                return
            
            log_ids = malware_logs.malware_log_ids
            print(f"Found {len(log_ids)} malware log IDs")
 
            # Step 2: Fetch credentials for logs in batches
            total_credentials = 0
 
            # Process in batches for better performance
            for i in range(0, len(log_ids), CREDENTIAL_FETCH_BATCH_SIZE):
                batch = log_ids[i:i + CREDENTIAL_FETCH_BATCH_SIZE]
                print(f"Fetching credentials for batch of {len(batch)} logs (logs {i+1} to {i+len(batch)})")
                
                # Process this batch in parallel
                batch_results = await fetch_credentials_batch(session, batch, TARGET_URL)
                
                # Process results from this batch
                for log_id, cred_log in batch_results.items():
                    if cred_log and cred_log.credentials:
                        # Write each credential to its own row in the CSV
                        for cred in cred_log.credentials:
                            writer.writerow({
                                'malware_id': cred_log.id,
                                'malware_identifier': cred_log.identifier or '',
                                'published_date': cred_log.published_date or '',
                                'url': cred.url or '',
                                'cleaned_url': cred.cleaned_url or '',
                                'username': cred.username or '',
                                'email': cred.email or '',
                                'password': cred.password or '',
                                'application': cred.application or ''
                            })
                            total_credentials += 1
            
            print(f"\nProcessed {len(log_ids)} malware logs with a total of {total_credentials} credentials")
            print(f"Results saved to {OUTPUT_FILE}")
 
 
if __name__ == "__main__":
    # Run the async function
    asyncio.run(main())

This script can be easily adapted to search for credentials from other services by changing the TARGET_URL variable. For example:

  • "facebook.com" for Facebook credentials
  • "mail.google.com" for Gmail credentials
  • "github.com" for GitHub credentials

Fetching AWS Secrets from Malware Logs

This example demonstrates how to retrieve AWS secrets that have been leaked through malware infections:

import requests
import json
import time
import datetime
 
# --- Configuration ---
API_URL = "https://enterprise-data-api.nordstellar.com/api/v3/data/data-source/malware-log/secrets/feed"
# IMPORTANT: Replace 'YOUR_SECRET_TOKEN' with your actual API key
API_KEY = "YOUR_SECRET_TOKEN"
 
# Initial start timestamp for the iteration (Unix timestamp)
# The script will start fetching data from this point onwards.
INITIAL_START_TIMESTAMP = int(time.time())
 
TIME_WINDOW_SECONDS = 3600 # Query 1 hour of data at a time
 
# Types of secrets to fetch
TYPES = ["aws-secret-key"]
 
# Number of records to fetch per API call
LIMIT = 100
 
 
def format_timestamp(ts):
    """Converts a Unix timestamp to a human-readable string."""
    if ts is None:
        return "N/A"
    return datetime.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S UTC")
 
 
def fetch_data_batch(ts_from, ts_to, offset):
    """
    Fetches a batch of data from the API for a specific time window and offset.
    Includes retry logic for 429 Too Many Requests errors.
 
    Args:
        ts_from (int): The start Unix timestamp for the query window.
        ts_to (int): The end Unix timestamp for the query window.
        offset (int): The offset for pagination.
 
    Returns:
        list: A list of records fetched from the API, or None if a critical error occurs.
              Returns an empty list if no records are found or if retries for 429 are exhausted.
    """
    headers = {"Content-Type": "application/json", "X-API-KEY": API_KEY}
    payload = {
        "published_ts_from": ts_from,
        "published_ts_to": ts_to,
        "types": TYPES,
        "limit": LIMIT,
        "offset": offset,
    }
 
    MAX_RETRIES_429 = 3
    RETRY_DELAY_SECONDS_429 = 5
    current_retry_429 = 0
 
    while True:  # Loop for retrying 429 errors
        attempt_number = current_retry_429 + 1
        print(
            f"Fetching data: window [{format_timestamp(ts_from)} to {format_timestamp(ts_to)}], offset: {offset}, limit: {LIMIT}"
            + (f", attempt {attempt_number}" if current_retry_429 > 0 else "")
        )
        # print(f"Payload: {json.dumps(payload, indent=2)}") # Uncomment for debugging payload
 
        response = None  # Initialize response here to ensure it's available in broader scope if needed for logging
        try:
            start_time = time.time()
            response = requests.post(
                API_URL, headers=headers, json=payload, timeout=30
            )  # 30-second timeout
            end_time = time.time()
            duration = end_time - start_time
            print(f"Request completed in {duration:.2f} seconds.")
 
            response.raise_for_status()  # Raises HTTPError for 4xx/5xx responses
 
            # If we're here, status code was 2xx
            data = (
                response.json()
            )  # Can raise JSONDecodeError if 2xx response is not valid JSON
 
            if (
                isinstance(data, dict)
                and "data" in data
                and isinstance(data["data"], list)
            ):
                return data["data"]
            else:
                print("Warning: Unexpected API response format (after 200 OK).")
                if isinstance(data, dict):
                    print(
                        "Expected a dictionary with a 'data' key containing a list of records."
                    )
                    print(f"Keys found: {list(data.keys())}")
                    if "data" in data:
                        print(f"Type of 'data' key: {type(data['data'])}")
                    else:
                        print("'data' key not found.")
                else:
                    print(f"Expected a dictionary, but got type: {type(data)}")
                print(f"Response sample: {str(data)[:200]}")
                return []  # Treat as a failed batch for safety
 
        except requests.exceptions.HTTPError as http_err:
            if http_err.response is not None:
                status_code = http_err.response.status_code
 
                error_message_detail = f"(Status: {status_code}) "
                try:
                    # Try to parse the error response as JSON to get structured error message
                    error_json = http_err.response.json()
                    api_error_body = error_json.get("errors", {}).get("body")
                    if api_error_body:
                        error_message_detail += f"API Message: {api_error_body}"
                    elif (
                        http_err.response.text
                    ):  # Fallback if no structured 'errors.body'
                        error_message_detail += (
                            f"Raw Response: {http_err.response.text[:200]}"
                        )
                    else:
                        error_message_detail += (
                            "(No further details in JSON response body)"
                        )
                except json.JSONDecodeError:  # Error response was not JSON
                    if http_err.response.text:
                        error_message_detail += (
                            f"Raw Response (not JSON): {http_err.response.text[:200]}"
                        )
                    else:
                        error_message_detail += (
                            "(Empty or non-JSON error response body)"
                        )
                except (
                    Exception
                ) as e_extract:  # Catch any other issue during error detail extraction
                    error_message_detail += (
                        f"(Could not fully parse error details: {e_extract})"
                    )
 
                if status_code == 429:  # Too Many Requests
                    if current_retry_429 < MAX_RETRIES_429:
                        current_retry_429 += 1
                        print(
                            f"Rate limited (429). Retrying in {RETRY_DELAY_SECONDS_429} seconds... (Retry {current_retry_429}/{MAX_RETRIES_429})"
                        )
                        time.sleep(RETRY_DELAY_SECONDS_429)
                        continue  # Retry the request
                    else:
                        print(
                            f"Rate limited (429). Max retries ({MAX_RETRIES_429}) reached. Failing for this batch."
                        )
                        return []  # Exhausted retries for this batch
 
                elif status_code == 401:  # Unauthorized
                    print(
                        f"Critical Error: Unauthorized. {error_message_detail}. Check your API_KEY."
                    )
                    return None  # Critical failure, stop the script
 
                elif status_code == 403:  # Forbidden
                    print(
                        f"Critical Error: Forbidden. {error_message_detail}. Check permissions or API_KEY validity."
                    )
                    return None  # Critical failure, stop the script
 
                elif status_code == 404:  # Not Found
                    # For the /secrets/feed endpoint, 404 means "No data found".
                    # This is an expected outcome for a query window, not a systemic error.
                    # The calling function `iterate_complete_dataset` handles an empty list `batch` correctly.
                    # print(f"Info: No data found for this specific request. {error_message_detail}") # Optional: for verbose logging
                    return []  # Return empty list, indicating no data for this batch
 
                # For other HTTP errors (e.g., 400 Bad Request, 5xx Server Errors)
                else:
                    print(f"HTTP error occurred: {http_err}. {error_message_detail}")
                    return (
                        []
                    )  # Treat as a failed batch, allow script to continue with next window/offset
 
            else:  # HTTPError occurred but http_err.response is None (e.g., proxy errors)
                print(f"HTTPError occurred without a response object: {http_err}")
                return []  # Treat as a failed batch
 
        except requests.exceptions.ConnectionError as conn_err:
            print(f"Connection error occurred: {conn_err}")
            return []
        except requests.exceptions.Timeout as timeout_err:
            print(f"Request timed out: {timeout_err}")
            return []
        except (
            json.JSONDecodeError
        ) as json_err:  # Specifically for issues decoding a 2xx response's JSON
            print(f"Error decoding JSON response (expected 2xx success): {json_err}")
            try:
                # 'response' should be defined from the try block if this exception occurs after response = ...
                if response and hasattr(response, "text"):
                    print(
                        f"Response content that failed to parse: {response.text[:500]}"
                    )
                else:
                    print(
                        "Response object not available or has no text attribute to log for JSONDecodeError."
                    )
            except Exception as e_log:  # Catching potential errors in logging the error
                print(f"Could not log response text for JSONDecodeError: {e_log}")
            return []
        except (
            requests.exceptions.RequestException
        ) as req_err:  # Catch-all for other request-related issues
            print(f"An error occurred during the API request: {req_err}")
            return []
 
 
def iterate_complete_dataset():
    """
    Iterates through the dataset by progressing time windows.
    """
    if API_KEY == "YOUR_SECRET_TOKEN":
        print(
            "Error: Please replace 'YOUR_SECRET_TOKEN' with your actual API key in the script."
        )
        return
 
    current_ts_from = INITIAL_START_TIMESTAMP
    total_records_processed_session = 0
 
    print(f"Starting data iteration from: {format_timestamp(current_ts_from)}")
    print(
        f"Time window size: {TIME_WINDOW_SECONDS // 3600} hours ({TIME_WINDOW_SECONDS} seconds)"
    )
    print("Press Ctrl+C to stop the script.\n")
 
    try:
        while True:
            current_ts_to = current_ts_from + TIME_WINDOW_SECONDS - 1
            offset = 0
            records_in_current_window = 0
 
            print(
                f"--- Processing time window: {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)} ---"
            )
 
            while True:
                batch = fetch_data_batch(current_ts_from, current_ts_to, offset)
 
                if batch is None:
                    print("Stopping due to critical API error.")
                    return
 
                if not batch:
                    if offset == 0:
                        print(
                            f"No records found for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)}."
                        )
                    else:
                        print(
                            f"End of records for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)}."
                        )
                    break
 
                print(
                    f"Retrieved {len(batch)} records for window {format_timestamp(current_ts_from)}-{format_timestamp(current_ts_to)}, offset {offset}."
                )
 
                # --- Process your records here ---
                for record_index, record in enumerate(batch):
                    # print(f"  Record {record_index + 1}: {json.dumps(record, indent=2)}")
                    # print(f"  Record {record_index + 1}: ID - {record.get('id', 'N/A')}, Type - {record.get('type', 'N/A')}")
                    pass  # Replace with your actual processing logic
                # ---------------------------------
 
                records_in_current_window += len(batch)
                total_records_processed_session += len(batch)
 
                if len(batch) < LIMIT:
                    print(
                        f"Reached end of data for window {format_timestamp(current_ts_from)} to {format_timestamp(current_ts_to)} (less than limit)."
                    )
                    break
 
                offset += LIMIT
 
            print(
                f"--- Finished processing window. Total records for this window: {records_in_current_window} ---"
            )
 
            current_ts_from = (
                current_ts_from - TIME_WINDOW_SECONDS
            )  # Start of the previous window (going backward)
 
    except KeyboardInterrupt:
        print("\nScript interrupted by user.")
    except Exception as e:
        print(f"An unexpected error occurred in the main loop: {e}")
    finally:
        print("\n--- Iteration Summary ---")
        print(f"Last processed window started at: {format_timestamp(current_ts_from)}")
        print(
            f"Total records processed in this session: {total_records_processed_session}"
        )
        print("Script finished.")
 
 
if __name__ == "__main__":
    iterate_complete_dataset()

These examples demonstrate the key operations available through the NordStellar Dark Web API. You can use them as starting points for your own integrations, adapting them to your specific security needs and technical environment.

NordStellar © 2026Privacy Policy