Source code for coreplexml.synthgen

from __future__ import annotations

"""SynthGen resource for the CorePlexML SDK."""
import time

from coreplexml.exceptions import CorePlexMLError
from coreplexml._http import HTTPClient


[docs] class SynthGenResource: """Synthetic data generation with deep learning models. Train CTGAN, CopulaGAN, TVAE, or Gaussian Copula models on real datasets to generate statistically similar synthetic data. """ def __init__(self, http: HTTPClient): self._http = http @staticmethod def _normalize_status(raw_status: str | None) -> str | None: """Map backend model statuses to SDK-friendly lifecycle states.""" if raw_status is None: return None mapping = { "pending": "queued", "training": "running", "ready": "succeeded", "failed": "failed", } return mapping.get(str(raw_status), str(raw_status)) def _normalize_model(self, payload: dict) -> dict: if not isinstance(payload, dict): return {} model = payload.get("model") out = dict(model) if isinstance(model, dict) else dict(payload) if "id" not in out and payload.get("model_id"): out["id"] = payload["model_id"] raw_status = out.get("status") out["raw_status"] = raw_status out["status"] = self._normalize_status(raw_status) jobs = payload.get("jobs") if isinstance(jobs, list): out["jobs"] = jobs return out
[docs] def list_models(self, project_id: str | None = None, limit: int = 50, offset: int = 0) -> dict: """List synthetic data models. Args: project_id: Filter by project UUID (optional). limit: Maximum results (default 50). offset: Pagination offset. Returns: Dictionary with ``items`` list and ``total`` count. """ params: dict = {"limit": limit, "offset": offset} if project_id: params["project_id"] = project_id return self._http.get("/api/synthgen/models", params=params)
[docs] def create_model(self, project_id: str, dataset_version_id: str, name: str, model_type: str = "ctgan", config: dict | None = None) -> dict: """Train a new synthetic data model. Args: project_id: UUID of the project. dataset_version_id: UUID of the dataset version to train on. name: Model name. model_type: Model architecture -- ``ctgan``, ``copulagan``, ``tvae``, or ``gaussian_copula`` (default ``ctgan``). config: Optional training configuration. Returns: Created model dictionary with ``id`` and ``status``. """ body = { "project_id": project_id, "dataset_version_id": dataset_version_id, "name": name, "model_type": model_type, "config": config or {}, } data = self._http.post("/api/synthgen/models", json=body) out = dict(data) if isinstance(data, dict) else {} if "id" not in out and out.get("model_id"): out["id"] = out["model_id"] return out
[docs] def get_model(self, model_id: str) -> dict: """Get synthetic data model details. Args: model_id: UUID of the SynthGen model. Returns: Model dictionary. """ data = self._http.get(f"/api/synthgen/models/{model_id}") return self._normalize_model(data)
[docs] def generate( self, model_id: str, num_rows: int = 1000, seed: int | None = None, wait: bool = True, interval: float = 2.0, timeout: float = 300.0, ) -> dict: """Generate synthetic data rows. Args: model_id: UUID of the trained SynthGen model. num_rows: Number of synthetic rows to generate (default 1000). seed: Random seed for reproducibility (optional). wait: Wait for async generation job completion. interval: Poll interval in seconds when ``wait=True``. timeout: Maximum wait time in seconds when ``wait=True``. Returns: Generation results dictionary. """ body: dict = {"num_rows": num_rows} if seed is not None: body["seed"] = seed queued = self._http.post(f"/api/synthgen/models/{model_id}/generate", json=body) if not wait: return queued synth_job_id = queued.get("synthgen_job_id") or queued.get("job_id") if not synth_job_id: return queued start = time.time() while time.time() - start < timeout: job = self._http.get(f"/api/synthgen/jobs/{synth_job_id}") status = str(job.get("status", "")).lower() if status == "completed": result = job.get("result") or {} out = dict(result) if isinstance(result, dict) else {} rows_generated = ( out.get("num_rows") or out.get("num_rows_generated") or out.get("rows_generated") or num_rows ) out.setdefault("num_rows", rows_generated) out["status"] = "completed" out["job_id"] = queued.get("job_id") out["synthgen_job_id"] = synth_job_id if job.get("output_artifact_id"): out["output_artifact_id"] = job["output_artifact_id"] return out if status == "failed": detail = job.get("result") or queued.get("detail") or "unknown error" raise CorePlexMLError(f"SynthGen generate failed: {detail}") time.sleep(interval) raise CorePlexMLError(f"SynthGen generation timed out after {timeout}s")
[docs] def delete_model(self, model_id: str) -> dict: """Delete a synthetic data model. Args: model_id: UUID of the SynthGen model. Returns: Empty dictionary on success. """ return self._http.delete(f"/api/synthgen/models/{model_id}")
[docs] def list_jobs(self, limit: int = 50, offset: int = 0) -> dict: """List SynthGen jobs. Args: limit: Maximum results (default 50). offset: Pagination offset. Returns: Dictionary with ``items`` list and ``total`` count. """ return self._http.get("/api/synthgen/jobs", params={"limit": limit, "offset": offset})
[docs] def get_job(self, job_id: str) -> dict: """Get SynthGen job details. Args: job_id: UUID of the SynthGen job. Returns: Job dictionary with status, result, and metadata. """ return self._http.get(f"/api/synthgen/jobs/{job_id}")