Advanced Usage

This page covers advanced SDK patterns including job polling, batch predictions, dataset versions, Privacy Suite, SynthGen, What-If Analysis, error handling, and timeout configuration.

Job Polling Patterns

Several CorePlexML operations run as background jobs: AutoML training, report generation, and synthetic data model training. The SDK provides built-in wait methods that poll until completion.

Experiment Polling

exp = client.experiments.create(
    project_id=project_id,
    dataset_version_id=version_id,
    target_column="target",
    problem_type="classification",
    config={"max_models": 10},
)

# Block until complete (up to 2 hours, polling every 10 seconds)
result = client.experiments.wait(
    exp["id"],
    interval=10.0,
    timeout=7200.0,
)

if result["status"] == "succeeded":
    print("Training completed")
elif result["status"] in ("failed", "error"):
    print(f"Training failed: {result.get('error', 'unknown')}")

Report Polling

report = client.reports.create(
    project_id=project_id,
    kind="experiment",
    entity_id=experiment_id,
)

# Reports typically finish in under a minute
status = client.reports.wait(report["id"], interval=2.0, timeout=120.0)
if status["report"]["status"] == "succeeded":
    client.reports.download(report["id"], "report.pdf")

Custom Polling Loop

For finer control (e.g., progress logging), write your own loop:

import time
from coreplexml import CorePlexMLError

def wait_with_progress(client, experiment_id, timeout=3600.0):
    """Poll an experiment and print progress updates."""
    start = time.time()
    last_status = None
    while time.time() - start < timeout:
        data = client.experiments.get(experiment_id)
        exp = data.get("experiment", {})
        status = exp.get("status", "unknown")
        if status != last_status:
            elapsed = int(time.time() - start)
            print(f"  [{elapsed}s] Status: {status}")
            last_status = status
        if status in ("succeeded", "failed", "error"):
            return exp
        time.sleep(5.0)
    raise CorePlexMLError(f"Timeout after {timeout}s")

result = wait_with_progress(client, exp["id"])

Batch Predictions

Both models.predict and deployments.predict accept a list of dicts for batch inference.

import csv

# Load rows from a CSV file
rows = []
with open("new_customers.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:
        rows.append(row)

# Send batch (the SDK serializes the list to JSON)
results = client.deployments.predict(deployment_id, inputs=rows)

# Process results
for i, pred in enumerate(results["predictions"]):
    print(f"Row {i}: {pred['prediction']} (confidence={pred.get('probability', 'N/A')})")

For very large datasets, process in chunks to stay within server payload limits:

def chunked_predict(client, deployment_id, rows, chunk_size=500):
    """Predict in batches to avoid payload limits."""
    all_predictions = []
    for i in range(0, len(rows), chunk_size):
        chunk = rows[i : i + chunk_size]
        result = client.deployments.predict(deployment_id, inputs=chunk)
        all_predictions.extend(result["predictions"])
        print(f"  Processed {min(i + chunk_size, len(rows))}/{len(rows)} rows")
    return all_predictions

predictions = chunked_predict(client, deployment_id, rows)

Dataset Versions Workflow

Datasets in CorePlexML can contain multiple versions. The SDK upload helper creates a new dataset with an initial version; additional versions can be created via the REST endpoint POST /api/datasets/{dataset_id}/versions. You can list versions and train experiments on a specific version ID.

# Upload initial dataset
ds = client.datasets.upload(
    project_id=project_id,
    file_path="data_v1.csv",
    name="Sales Data",
)
dataset_id = ds["id"]
v1_id = ds["version_id"]

# (Optional) Create an additional version via REST endpoint.
# This endpoint is not wrapped yet by a high-level SDK resource method.
v2 = client._http.upload(f"/api/datasets/{dataset_id}/versions", "data_v2.csv")
v2_id = v2["version_id"]

# List all versions
versions = client.datasets.versions(dataset_id)
for v in versions["items"]:
    print(f"  Version {v['version']}: {v['row_count']} rows")

# Train on the latest version
exp = client.experiments.create(
    project_id=project_id,
    dataset_version_id=v2_id,
    target_column="revenue",
    problem_type="regression",
)

Privacy Suite Workflow

The Privacy Suite detects PII in datasets and applies configurable transformations (masking, hashing, redaction, generalization, etc.) to produce anonymized data that meets compliance requirements.

Full Workflow

# 1. Create a HIPAA compliance policy
policy = client.privacy.create_policy(
    project_id=project_id,
    name="Patient Data HIPAA Policy",
    profile="hipaa",
    description="Scan and transform PHI in patient records",
)
policy_id = policy["id"]

# 2. Create a session linking the policy to a dataset
session = client.privacy.create_session(
    policy_id=policy_id,
    dataset_id=dataset_id,
)
session_id = session["id"]

# 3. Run PII detection
detection = client.privacy.detect(session_id)
print(f"Found {len(detection.get('findings', []))} PII columns:")
for finding in detection.get("findings", []):
    print(f"  {finding['column']}: {finding['pii_type']} ({finding['count']} occurrences)")

# 4. Apply transformations
transform_result = client.privacy.transform(session_id)
print(f"Transformations applied: {transform_result.get('transformations_applied', 0)}")

# 5. Get full results
results = client.privacy.results(session_id)
print(f"Session status: {results.get('status')}")

Compliance Profiles

CorePlexML supports four built-in compliance profiles, each pre-configured with rules for the relevant PII types:

Profile

Description

PII Types

hipaa

HIPAA Safe Harbor

Names, SSN, MRN, dates, addresses, phone, email, etc.

gdpr

EU General Data Protection Regulation

Personal identifiers, IP addresses, biometric data, etc.

pci_dss

Payment Card Industry Data Security Standard

Credit card numbers, CVVs, cardholder names, etc.

ccpa

California Consumer Privacy Act

Broad personal information categories

SynthGen Workflow

SynthGen trains deep generative models on real datasets to produce statistically similar synthetic data. This is useful for privacy-preserving data sharing, test data generation, and data augmentation.

# 1. Train a CTGAN model
synth = client.synthgen.create_model(
    project_id=project_id,
    dataset_version_id=version_id,
    name="Customer Data Generator",
    model_type="ctgan",
    config={"epochs": 300, "batch_size": 500},
)
synth_id = synth["id"]
print(f"SynthGen model created: {synth_id}")

# 2. Wait for training to complete (poll manually)
import time
while True:
    status = client.synthgen.get_model(synth_id)
    if status.get("status") == "succeeded":
        print("SynthGen model training complete")
        break
    elif status.get("status") == "failed":
        print(f"Training failed: {status.get('error')}")
        break
    print(f"  Status: {status.get('status')}...")
    time.sleep(10)

# 3. Generate synthetic data
result = client.synthgen.generate(
    model_id=synth_id,
    num_rows=5000,
    seed=42,
)
print(f"Generated {result.get('num_rows', 0)} synthetic rows")

Model Types

Model Type

Description

Best For

ctgan

Conditional Tabular GAN

General-purpose tabular data with mixed types

copulagan

Copula-based GAN

Data with complex multivariate relationships

tvae

Tabular Variational Autoencoder

Faster training, simpler distributions

gaussian_copula

Gaussian Copula synthesizer

Fast baseline for mostly continuous tabular data

What-If Analysis (Studio) Workflow

ML Studio allows you to explore model behavior by defining scenarios with different input values and comparing the resulting predictions.

# 1. Create a session with a baseline input
session = client.studio.create_session(
    project_id=project_id,
    deployment_id=deployment_id,
    baseline_input={
        "age": 35,
        "income": 75000,
        "credit_score": 680,
        "loan_amount": 250000,
        "employment_years": 5,
    },
)
session_id = session["id"]

# 2. Define alternative scenarios
scenarios = [
    ("Higher Credit Score", {"credit_score": 780}),
    ("Lower Income", {"income": 45000}),
    ("Larger Loan", {"loan_amount": 500000}),
    ("Senior Applicant", {"age": 55, "employment_years": 25}),
]

for name, changes in scenarios:
    s = client.studio.create_scenario(session_id, name=name, changes=changes)
    # Execute the scenario
    client.studio.run_scenario(s["id"])

# 3. Compare all scenarios
comparison = client.studio.compare(session_id)
print("Scenario Comparison:")
for sc in comparison.get("scenarios", []):
    print(f"  {sc['name']}: prediction={sc['prediction']}")

Error Handling Patterns

Structured Error Inspection

Every exception carries message, status_code, and detail:

from coreplexml import CorePlexMLError

try:
    client.datasets.upload(project_id, "missing.csv", "test")
except CorePlexMLError as e:
    print(f"HTTP {e.status_code}")
    print(f"Message: {e.message}")
    print(f"Detail: {e.detail}")

Safe Resource Fetching

Use NotFoundError for safe lookups:

from coreplexml import NotFoundError

def get_project_or_none(client, project_id):
    """Return project dict or None if not found."""
    try:
        return client.projects.get(project_id)
    except NotFoundError:
        return None

project = get_project_or_none(client, some_id)
if project is None:
    print("Project not found, creating...")
    project = client.projects.create("New Project")

Timeout Configuration

The timeout parameter on the client controls the HTTP request timeout (how long to wait for the server to respond). This is separate from job polling timeouts.

# Short timeout for fast endpoints
client = CorePlexMLClient(
    base_url="https://ml.example.com",
    api_key="your-key",
    timeout=10,  # 10 seconds
)

# Longer timeout for large file uploads
upload_client = CorePlexMLClient(
    base_url="https://ml.example.com",
    api_key="your-key",
    timeout=300,  # 5 minutes
)

For long-running operations, adjust the polling timeout separately:

# Default: poll every 5s, give up after 1 hour
result = client.experiments.wait(exp_id, interval=5.0, timeout=3600.0)

# Large training: poll every 30s, give up after 6 hours
result = client.experiments.wait(exp_id, interval=30.0, timeout=21600.0)

Working with Large Datasets

Upload Performance

For large CSV files, increase the client timeout to accommodate upload time:

client = CorePlexMLClient(
    base_url="https://ml.example.com",
    api_key="your-key",
    timeout=600,  # 10 minutes for large uploads
)

ds = client.datasets.upload(
    project_id=project_id,
    file_path="large_dataset.csv",  # e.g., 500 MB
    name="Large Training Data",
)

Streaming Downloads

Dataset and report downloads stream data to disk in 8 KB chunks, so they work with files of any size without loading the entire file into memory:

# Download large dataset
client.datasets.download(dataset_id, "/data/export.csv")

# Download report
client.reports.download(report_id, "/reports/analysis.pdf")

Logging

The SDK uses Python’s standard logging module under the coreplexml logger. Enable debug logging to see HTTP requests and responses:

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("coreplexml")
logger.setLevel(logging.DEBUG)

# All SDK HTTP calls will now be logged
client = CorePlexMLClient(base_url="https://ml.example.com", api_key="your-key")
client.projects.list()