from __future__ import annotations
"""SynthGen resource for the CorePlexML SDK."""
import time
from coreplexml.exceptions import CorePlexMLError
from coreplexml._http import HTTPClient
[docs]
class SynthGenResource:
"""Synthetic data generation with deep learning models.
Train CTGAN, CopulaGAN, TVAE, or Gaussian Copula models on real datasets to
generate statistically similar synthetic data.
"""
def __init__(self, http: HTTPClient):
self._http = http
@staticmethod
def _normalize_status(raw_status: str | None) -> str | None:
"""Map backend model statuses to SDK-friendly lifecycle states."""
if raw_status is None:
return None
mapping = {
"pending": "queued",
"training": "running",
"ready": "succeeded",
"failed": "failed",
}
return mapping.get(str(raw_status), str(raw_status))
def _normalize_model(self, payload: dict) -> dict:
if not isinstance(payload, dict):
return {}
model = payload.get("model")
out = dict(model) if isinstance(model, dict) else dict(payload)
if "id" not in out and payload.get("model_id"):
out["id"] = payload["model_id"]
raw_status = out.get("status")
out["raw_status"] = raw_status
out["status"] = self._normalize_status(raw_status)
jobs = payload.get("jobs")
if isinstance(jobs, list):
out["jobs"] = jobs
return out
[docs]
def list_models(self, project_id: str | None = None, limit: int = 50, offset: int = 0) -> dict:
"""List synthetic data models.
Args:
project_id: Filter by project UUID (optional).
limit: Maximum results (default 50).
offset: Pagination offset.
Returns:
Dictionary with ``items`` list and ``total`` count.
"""
params: dict = {"limit": limit, "offset": offset}
if project_id:
params["project_id"] = project_id
return self._http.get("/api/synthgen/models", params=params)
[docs]
def create_model(self, project_id: str, dataset_version_id: str, name: str, model_type: str = "ctgan", config: dict | None = None) -> dict:
"""Train a new synthetic data model.
Args:
project_id: UUID of the project.
dataset_version_id: UUID of the dataset version to train on.
name: Model name.
model_type: Model architecture -- ``ctgan``, ``copulagan``, ``tvae``, or ``gaussian_copula`` (default ``ctgan``).
config: Optional training configuration.
Returns:
Created model dictionary with ``id`` and ``status``.
"""
body = {
"project_id": project_id,
"dataset_version_id": dataset_version_id,
"name": name,
"model_type": model_type,
"config": config or {},
}
data = self._http.post("/api/synthgen/models", json=body)
out = dict(data) if isinstance(data, dict) else {}
if "id" not in out and out.get("model_id"):
out["id"] = out["model_id"]
return out
[docs]
def get_model(self, model_id: str) -> dict:
"""Get synthetic data model details.
Args:
model_id: UUID of the SynthGen model.
Returns:
Model dictionary.
"""
data = self._http.get(f"/api/synthgen/models/{model_id}")
return self._normalize_model(data)
[docs]
def generate(
self,
model_id: str,
num_rows: int = 1000,
seed: int | None = None,
wait: bool = True,
interval: float = 2.0,
timeout: float = 300.0,
) -> dict:
"""Generate synthetic data rows.
Args:
model_id: UUID of the trained SynthGen model.
num_rows: Number of synthetic rows to generate (default 1000).
seed: Random seed for reproducibility (optional).
wait: Wait for async generation job completion.
interval: Poll interval in seconds when ``wait=True``.
timeout: Maximum wait time in seconds when ``wait=True``.
Returns:
Generation results dictionary.
"""
body: dict = {"num_rows": num_rows}
if seed is not None:
body["seed"] = seed
queued = self._http.post(f"/api/synthgen/models/{model_id}/generate", json=body)
if not wait:
return queued
synth_job_id = queued.get("synthgen_job_id") or queued.get("job_id")
if not synth_job_id:
return queued
start = time.time()
while time.time() - start < timeout:
job = self._http.get(f"/api/synthgen/jobs/{synth_job_id}")
status = str(job.get("status", "")).lower()
if status == "completed":
result = job.get("result") or {}
out = dict(result) if isinstance(result, dict) else {}
rows_generated = (
out.get("num_rows")
or out.get("num_rows_generated")
or out.get("rows_generated")
or num_rows
)
out.setdefault("num_rows", rows_generated)
out["status"] = "completed"
out["job_id"] = queued.get("job_id")
out["synthgen_job_id"] = synth_job_id
if job.get("output_artifact_id"):
out["output_artifact_id"] = job["output_artifact_id"]
return out
if status == "failed":
detail = job.get("result") or queued.get("detail") or "unknown error"
raise CorePlexMLError(f"SynthGen generate failed: {detail}")
time.sleep(interval)
raise CorePlexMLError(f"SynthGen generation timed out after {timeout}s")
[docs]
def delete_model(self, model_id: str) -> dict:
"""Delete a synthetic data model.
Args:
model_id: UUID of the SynthGen model.
Returns:
Empty dictionary on success.
"""
return self._http.delete(f"/api/synthgen/models/{model_id}")
[docs]
def list_jobs(self, limit: int = 50, offset: int = 0) -> dict:
"""List SynthGen jobs.
Args:
limit: Maximum results (default 50).
offset: Pagination offset.
Returns:
Dictionary with ``items`` list and ``total`` count.
"""
return self._http.get("/api/synthgen/jobs", params={"limit": limit, "offset": offset})
[docs]
def get_job(self, job_id: str) -> dict:
"""Get SynthGen job details.
Args:
job_id: UUID of the SynthGen job.
Returns:
Job dictionary with status, result, and metadata.
"""
return self._http.get(f"/api/synthgen/jobs/{job_id}")