Advanced Usage
This page covers advanced SDK patterns including job polling, batch predictions, dataset versions, Privacy Suite, SynthGen, What-If Analysis, error handling, and timeout configuration.
Job Polling Patterns
Several CorePlexML operations run as background jobs: AutoML training,
report generation, and synthetic data model training. The SDK provides
built-in wait methods that poll until completion.
Experiment Polling
exp = client.experiments.create(
project_id=project_id,
dataset_version_id=version_id,
target_column="target",
problem_type="classification",
config={"max_models": 10},
)
# Block until complete (up to 2 hours, polling every 10 seconds)
result = client.experiments.wait(
exp["id"],
interval=10.0,
timeout=7200.0,
)
if result["status"] == "succeeded":
print("Training completed")
elif result["status"] in ("failed", "error"):
print(f"Training failed: {result.get('error', 'unknown')}")
Report Polling
report = client.reports.create(
project_id=project_id,
kind="experiment",
entity_id=experiment_id,
)
# Reports typically finish in under a minute
status = client.reports.wait(report["id"], interval=2.0, timeout=120.0)
if status["report"]["status"] == "succeeded":
client.reports.download(report["id"], "report.pdf")
Custom Polling Loop
For finer control (e.g., progress logging), write your own loop:
import time
from coreplexml import CorePlexMLError
def wait_with_progress(client, experiment_id, timeout=3600.0):
"""Poll an experiment and print progress updates."""
start = time.time()
last_status = None
while time.time() - start < timeout:
data = client.experiments.get(experiment_id)
exp = data.get("experiment", {})
status = exp.get("status", "unknown")
if status != last_status:
elapsed = int(time.time() - start)
print(f" [{elapsed}s] Status: {status}")
last_status = status
if status in ("succeeded", "failed", "error"):
return exp
time.sleep(5.0)
raise CorePlexMLError(f"Timeout after {timeout}s")
result = wait_with_progress(client, exp["id"])
Batch Predictions
Both models.predict and deployments.predict accept a list of dicts for
batch inference.
import csv
# Load rows from a CSV file
rows = []
with open("new_customers.csv") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(row)
# Send batch (the SDK serializes the list to JSON)
results = client.deployments.predict(deployment_id, inputs=rows)
# Process results
for i, pred in enumerate(results["predictions"]):
print(f"Row {i}: {pred['prediction']} (confidence={pred.get('probability', 'N/A')})")
For very large datasets, process in chunks to stay within server payload limits:
def chunked_predict(client, deployment_id, rows, chunk_size=500):
"""Predict in batches to avoid payload limits."""
all_predictions = []
for i in range(0, len(rows), chunk_size):
chunk = rows[i : i + chunk_size]
result = client.deployments.predict(deployment_id, inputs=chunk)
all_predictions.extend(result["predictions"])
print(f" Processed {min(i + chunk_size, len(rows))}/{len(rows)} rows")
return all_predictions
predictions = chunked_predict(client, deployment_id, rows)
Dataset Versions Workflow
Datasets in CorePlexML can contain multiple versions. The SDK upload helper
creates a new dataset with an initial version; additional versions can be
created via the REST endpoint POST /api/datasets/{dataset_id}/versions.
You can list versions and train experiments on a specific version ID.
# Upload initial dataset
ds = client.datasets.upload(
project_id=project_id,
file_path="data_v1.csv",
name="Sales Data",
)
dataset_id = ds["id"]
v1_id = ds["version_id"]
# (Optional) Create an additional version via REST endpoint.
# This endpoint is not wrapped yet by a high-level SDK resource method.
v2 = client._http.upload(f"/api/datasets/{dataset_id}/versions", "data_v2.csv")
v2_id = v2["version_id"]
# List all versions
versions = client.datasets.versions(dataset_id)
for v in versions["items"]:
print(f" Version {v['version']}: {v['row_count']} rows")
# Train on the latest version
exp = client.experiments.create(
project_id=project_id,
dataset_version_id=v2_id,
target_column="revenue",
problem_type="regression",
)
Privacy Suite Workflow
The Privacy Suite detects PII in datasets and applies configurable transformations (masking, hashing, redaction, generalization, etc.) to produce anonymized data that meets compliance requirements.
Full Workflow
# 1. Create a HIPAA compliance policy
policy = client.privacy.create_policy(
project_id=project_id,
name="Patient Data HIPAA Policy",
profile="hipaa",
description="Scan and transform PHI in patient records",
)
policy_id = policy["id"]
# 2. Create a session linking the policy to a dataset
session = client.privacy.create_session(
policy_id=policy_id,
dataset_id=dataset_id,
)
session_id = session["id"]
# 3. Run PII detection
detection = client.privacy.detect(session_id)
print(f"Found {len(detection.get('findings', []))} PII columns:")
for finding in detection.get("findings", []):
print(f" {finding['column']}: {finding['pii_type']} ({finding['count']} occurrences)")
# 4. Apply transformations
transform_result = client.privacy.transform(session_id)
print(f"Transformations applied: {transform_result.get('transformations_applied', 0)}")
# 5. Get full results
results = client.privacy.results(session_id)
print(f"Session status: {results.get('status')}")
Compliance Profiles
CorePlexML supports four built-in compliance profiles, each pre-configured with rules for the relevant PII types:
Profile |
Description |
PII Types |
|---|---|---|
|
HIPAA Safe Harbor |
Names, SSN, MRN, dates, addresses, phone, email, etc. |
|
EU General Data Protection Regulation |
Personal identifiers, IP addresses, biometric data, etc. |
|
Payment Card Industry Data Security Standard |
Credit card numbers, CVVs, cardholder names, etc. |
|
California Consumer Privacy Act |
Broad personal information categories |
SynthGen Workflow
SynthGen trains deep generative models on real datasets to produce statistically similar synthetic data. This is useful for privacy-preserving data sharing, test data generation, and data augmentation.
# 1. Train a CTGAN model
synth = client.synthgen.create_model(
project_id=project_id,
dataset_version_id=version_id,
name="Customer Data Generator",
model_type="ctgan",
config={"epochs": 300, "batch_size": 500},
)
synth_id = synth["id"]
print(f"SynthGen model created: {synth_id}")
# 2. Wait for training to complete (poll manually)
import time
while True:
status = client.synthgen.get_model(synth_id)
if status.get("status") == "succeeded":
print("SynthGen model training complete")
break
elif status.get("status") == "failed":
print(f"Training failed: {status.get('error')}")
break
print(f" Status: {status.get('status')}...")
time.sleep(10)
# 3. Generate synthetic data
result = client.synthgen.generate(
model_id=synth_id,
num_rows=5000,
seed=42,
)
print(f"Generated {result.get('num_rows', 0)} synthetic rows")
Model Types
Model Type |
Description |
Best For |
|---|---|---|
|
Conditional Tabular GAN |
General-purpose tabular data with mixed types |
|
Copula-based GAN |
Data with complex multivariate relationships |
|
Tabular Variational Autoencoder |
Faster training, simpler distributions |
|
Gaussian Copula synthesizer |
Fast baseline for mostly continuous tabular data |
What-If Analysis (Studio) Workflow
ML Studio allows you to explore model behavior by defining scenarios with different input values and comparing the resulting predictions.
# 1. Create a session with a baseline input
session = client.studio.create_session(
project_id=project_id,
deployment_id=deployment_id,
baseline_input={
"age": 35,
"income": 75000,
"credit_score": 680,
"loan_amount": 250000,
"employment_years": 5,
},
)
session_id = session["id"]
# 2. Define alternative scenarios
scenarios = [
("Higher Credit Score", {"credit_score": 780}),
("Lower Income", {"income": 45000}),
("Larger Loan", {"loan_amount": 500000}),
("Senior Applicant", {"age": 55, "employment_years": 25}),
]
for name, changes in scenarios:
s = client.studio.create_scenario(session_id, name=name, changes=changes)
# Execute the scenario
client.studio.run_scenario(s["id"])
# 3. Compare all scenarios
comparison = client.studio.compare(session_id)
print("Scenario Comparison:")
for sc in comparison.get("scenarios", []):
print(f" {sc['name']}: prediction={sc['prediction']}")
Error Handling Patterns
Structured Error Inspection
Every exception carries message, status_code, and detail:
from coreplexml import CorePlexMLError
try:
client.datasets.upload(project_id, "missing.csv", "test")
except CorePlexMLError as e:
print(f"HTTP {e.status_code}")
print(f"Message: {e.message}")
print(f"Detail: {e.detail}")
Safe Resource Fetching
Use NotFoundError for safe lookups:
from coreplexml import NotFoundError
def get_project_or_none(client, project_id):
"""Return project dict or None if not found."""
try:
return client.projects.get(project_id)
except NotFoundError:
return None
project = get_project_or_none(client, some_id)
if project is None:
print("Project not found, creating...")
project = client.projects.create("New Project")
Timeout Configuration
The timeout parameter on the client controls the HTTP request timeout
(how long to wait for the server to respond). This is separate from job
polling timeouts.
# Short timeout for fast endpoints
client = CorePlexMLClient(
base_url="https://ml.example.com",
api_key="your-key",
timeout=10, # 10 seconds
)
# Longer timeout for large file uploads
upload_client = CorePlexMLClient(
base_url="https://ml.example.com",
api_key="your-key",
timeout=300, # 5 minutes
)
For long-running operations, adjust the polling timeout separately:
# Default: poll every 5s, give up after 1 hour
result = client.experiments.wait(exp_id, interval=5.0, timeout=3600.0)
# Large training: poll every 30s, give up after 6 hours
result = client.experiments.wait(exp_id, interval=30.0, timeout=21600.0)
Working with Large Datasets
Upload Performance
For large CSV files, increase the client timeout to accommodate upload time:
client = CorePlexMLClient(
base_url="https://ml.example.com",
api_key="your-key",
timeout=600, # 10 minutes for large uploads
)
ds = client.datasets.upload(
project_id=project_id,
file_path="large_dataset.csv", # e.g., 500 MB
name="Large Training Data",
)
Streaming Downloads
Dataset and report downloads stream data to disk in 8 KB chunks, so they work with files of any size without loading the entire file into memory:
# Download large dataset
client.datasets.download(dataset_id, "/data/export.csv")
# Download report
client.reports.download(report_id, "/reports/analysis.pdf")
Logging
The SDK uses Python’s standard logging module under the coreplexml
logger. Enable debug logging to see HTTP requests and responses:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("coreplexml")
logger.setLevel(logging.DEBUG)
# All SDK HTTP calls will now be logged
client = CorePlexMLClient(base_url="https://ml.example.com", api_key="your-key")
client.projects.list()