Introduction to Datasets¶
A Dataset in Elevate Data is a flexible entity for combining, transforming, and preparing data to support modeling and analytics in Earnix. Datasets enable users to proactively assess the data quality, merge data from multiple tables, clean and prepare data, and manage schema evolution efficiently — all within a unified workspace.
A Dataset consists of data elements (in the form of table versions), and recipes (reusable SQL transformations) linked together for a specific modeling purpose.
Schema analysis is automatically performed after the transformation to display the impact on the data, and can be accessed through the Inspect Data functionality.
Each Dataset can evolve over time through Dataset Versions.
This notebook outlines how to:
- Create a dataset
- Add a data table to the dataset
- Add a transformation recipe to the dataset
- Add another transformation recipe to the dataset
- Execute the 2nd recipe
- Change the data table version
- Execute the 2nd recipe again
- Remove the first recipe from the chain
- Run the recipe on sample
- Add an existing recipe to a dataset
- Create a new recipe version
- Run the recipe on the full data
- Get Overview, Preview, Schema Analysis, and Alerts
- Release the dataset
- Create a new dataset version
- Delete a dataset
Install the SDK¶
!pip install --upgrade earnix-elevate
Set your Elevate server and credentials¶
In this example we use environment variables to authenticate to Elevate, but you can also inject your credentials using Python arguments to each Service's client, like in the commented example.
import os
os.environ.setdefault("E2_SERVER", "YOUR_SERVER")
os.environ.setdefault("E2_CLIENT_ID", "YOUR_CLIENT_ID")
os.environ.setdefault("E2_SECRET_KEY", "YOUR_SECRET_KEY")
# Alternatively:
# DatasetService(
# server="YOUR_SERVER",
# client_id="YOUR_CLIENT_ID",
# secret_key="YOUR_SECRET_KEY"
# )
Imports and demo preparation¶
from datetime import datetime
from time import sleep
from earnix_elevate import (
AddDatasetDataTableNodeRequest,
AddDatasetRecipeNodeRequest,
CreateDatasetRequest,
CreateDatasetVersionRequest,
CreateRecipeRequest,
DatasetService,
ExecuteRecipeRequest,
RecipeService,
UpdateDatasetDataTableNodeRequest,
CreateRecipeVersionRequest,
)
from earnix_elevate.clients.data import JobStatus, RecipeVersion, SourceNodeDefinition
DEMO_SUFFIX = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Creating a Dataset¶
Each dataset version is represented by a transformation graph made up of nodes. This graph contains data nodes (data tables), recipe nodes (SQL transformations), and shows the relationships between them.
There are several steps to follow in order to create a dataset.
Create a Dataset Service Client¶
dataset_service = DatasetService()
recipe_service = RecipeService()
Create the Dataset¶
import os
dataset_name = os.environ.get(
"DATASETS_DATASET_NAME",
"Bee-Insurance Demo dataset_" + DEMO_SUFFIX,
)
create_dataset_req = CreateDatasetRequest(
description="desc",
name=dataset_name,
)
create_dataset = dataset_service.create_dataset(create_dataset_req)
print(f"\n{create_dataset=}")
Adding a Data Table to the Dataset¶
After creating a dataset, you can add a data table to the dataset.
import os
DATA_TABLE_ID = int(os.environ.get("DATASETS_DATA_TABLE_ID", "28606")) # use your data table system id
DATA_TABLE_VERSION = int(os.environ.get("DATASETS_DATA_TABLE_VERSION", "2")) # use your data table version number
add_table_req = AddDatasetDataTableNodeRequest(
datasetId=create_dataset.id,
dataTableId=DATA_TABLE_ID,
dataTableVersionNumber=DATA_TABLE_VERSION,
nodeType="TABLE",
)
add_ds_with_data_table = dataset_service.add_dataset_node_to_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
add_dataset_node_request=add_table_req,
)
print(f"\n{add_ds_with_data_table=}")
Introduction to Recipes¶
A Recipe in Elevate Data is a reusable SQL transformation that allows users to filter, clean, merge, and shape Data Tables to their specific analytical needs within a Dataset. Users write SQL code to specify the desired transformation—such as filtering for certain records, joining tables, or aggregating fields.
Recipes ensure data processing is transparent, reproducible, and seamlessly integrated into workflow automation.
Multiple Recipes can be created and run sequentially within a Dataset.
Before the recipe run there is a check for errors and schema compatibility.
Transformations can be previewed on sample data, then executed on the full Dataset after validation.
Schema analysis is performed automatically to show changes to the data structure.
There are two types of Recipes:
Transformation Recipes: used to perform transformations on a single data element
Structure Recipes: connecting different data elements together – coming soon
NOTES
Always begin Transformation Recipes with SELECT statements.
To support reuse, refer to the data element being transformed as $CurrentTable.
Recipes follow the Apache Spark SQL protocol. For details, see Spark SQL Reference.
Aliases and Source Node Bindings¶
When adding a recipe node to a dataset, you must specify source node bindings — a list that pairs each upstream source node (a data table or another recipe) with an alias. Aliases are named placeholders used within the recipe's SQL code to reference input data sources (e.g., SELECT * FROM $CurrentTable). Each binding tells the system which upstream data source should be substituted for which alias at execution time.
For single-source recipes, the convention is to use CurrentTable as the alias, but it must still be explicitly provided in the request and must match the alias referenced in the recipe's SQL.
For multi-source recipes — recipes whose SQL references two or more input aliases — every alias must be specified, and each must map to exactly one source node.
Multi-source recipes carry additional constraints:
- Each source node can only be bound once
- Aliases must be unique (case-insensitive)
- All bound source nodes must be free nodes — meaning they have no existing downstream connections in the transformation graph
- The set of aliases you provide must match the recipe's declared aliases exactly (no extras, no omissions)
These validations ensure that the transformation graph remains a well-formed DAG and that every alias referenced in the SQL is unambiguously connected to a data source. If any validation fails, the API returns a descriptive error indicating which constraint was violated.
Adding a Recipe to the Dataset¶
After creating a dataset and adding a data table to it, you can then add a recipe to the dataset.
dataset_data_table_node = add_ds_with_data_table
recipe = recipe_service.create_recipe(CreateRecipeRequest(
name="newRecipe__" + DEMO_SUFFIX,
language="SQL",
aliases=["CurrentTable"],
recipeVersion=RecipeVersion(code="SELECT * FROM $CurrentTable"),
))
add_recipe_req = AddDatasetRecipeNodeRequest(
nodeType="RECIPE",
datasetId=create_dataset.id,
recipeId=recipe.id,
recipeVersionNumber=recipe.latest_version.version_number,
sourceNodeDefinitions=[SourceNodeDefinition(sourceNodeId=dataset_data_table_node.id,alias="CurrentTable")],
)
add_ds_with_recipe = dataset_service.add_dataset_node_to_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
add_dataset_node_request=add_recipe_req,
)
print(f"\n{add_ds_with_recipe=}")
Recipe Chains¶
Recipe Chains enable users to build, manage, and execute a sequence of data transformation Recipes inside the Dataset. It’s designed to simplify complex data preparation workflows by allowing modular, reusable, and editable transformation steps. With the Recipe Chain, users can:
Chain multiple Recipes together to form a transformation pipeline inside a single Dataset version.
Add, edit, or delete Recipes at any point in the chain.
Change Data Table versions within the chain without breaking dependencies.
Within Recipe Chains, several scenarios must be considered:
A Recipe is executed only if none of its predecessors in the chain have experienced failure during execution.
When multiple Recipes are marked as pending, initiating the final Recipe will trigger the execution of all preceding Recipes. However, inspection data will be available solely for the last Recipe, and earlier Recipes will remain in a pending state.
If a Recipe is inserted, modified, or removed within the chain, all subsequent Recipes will revert to pending status. Likewise, when a user selects a different version of the Data Table, all Recipes in the chain will be set back to “Pending run” status.
Add Another Recipe to the Dataset¶
recipe2 = recipe_service.create_recipe(CreateRecipeRequest(
name="newRecipe2__" + DEMO_SUFFIX,
language="SQL",
aliases=["CurrentTable"],
recipeVersion=RecipeVersion(code="SELECT * FROM $CurrentTable"),
))
add_recipe2_req = AddDatasetRecipeNodeRequest(
nodeType="RECIPE",
datasetId=create_dataset.id,
recipeId=recipe2.id,
recipeVersionNumber=recipe2.latest_version.version_number,
sourceNodeDefinitions=[SourceNodeDefinition(sourceNodeId=add_ds_with_recipe.id,alias="CurrentTable")],
)
add_ds_with_2recipes = dataset_service.add_dataset_node_to_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
add_dataset_node_request=add_recipe2_req,
)
print(f"\n{add_ds_with_2recipes=}")
Running a Recipe¶
To implement the SQL transformation specified in the Recipe for a data element, it is necessary to execute / run the Recipe:
Run the 2nd Recipe¶
run_req = ExecuteRecipeRequest(
useSample=True,
)
run_2nd_recipe = dataset_service.execute_recipe(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_2recipes.id,
execute_recipe_request=run_req,
)
print(f"\n{run_2nd_recipe=}")
Wait for Recipe Execution to Complete¶
MAX_ATTEMPTS = 180
WAIT_INTERVAL = 5
for attempt in range(MAX_ATTEMPTS):
try:
run_status = dataset_service.get_latest_recipe_status(
create_dataset.id, 1, run_2nd_recipe.id)
status_code = getattr(run_status.execution, 'status', None)
if status_code == JobStatus.SUCCEEDED:
print('Recipe run completed')
break
elif status_code in (JobStatus.FAILED, JobStatus.CANCELLED):
print(f'Recipe run failed: {status_code}')
break
except Exception as ex:
print(f'Exception during status polling: {ex}')
sleep(WAIT_INTERVAL)
else:
raise RuntimeError('Timed out waiting for recipe execution')
Changing a Data Table Version in a Dataset¶
When adding a Data Table to a Dataset, the latest version is selected by default, but users can select a previous version.
transformation_graph = dataset_service.get_transformation_graph_by_version(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
)
dataset_data_table_node = next(
node for node in transformation_graph.latest_transformation_graph.nodes if node.node_type == "TABLE"
)
update_dataset_data_table_req = UpdateDatasetDataTableNodeRequest(
data_table_id=dataset_data_table_node.data_table_id,
data_table_version_number=1,
dataset_id=create_dataset.id,
id=dataset_data_table_node.id,
lock_version=dataset_data_table_node.lock_version,
node_type="TABLE",
)
dataset_data_table_node_with_other_version = (
dataset_service.update_dataset_node_in_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=dataset_data_table_node.id,
update_dataset_node_request=update_dataset_data_table_req,
)
)
print(f"\n{dataset_data_table_node_with_other_version=}")
Run the 2nd Recipe Again¶
run_req = ExecuteRecipeRequest(
useSample=True,
)
run_2nd_recipe = dataset_service.execute_recipe(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_2recipes.id,
execute_recipe_request=run_req,
)
print(f"\n{run_2nd_recipe=}")
Wait for Recipe Execution to Complete¶
MAX_ATTEMPTS = 180
WAIT_INTERVAL = 5
for attempt in range(MAX_ATTEMPTS):
try:
run_status = dataset_service.get_latest_recipe_status(
create_dataset.id, 1, run_2nd_recipe.id)
status_code = getattr(run_status.execution, 'status', None)
if status_code == JobStatus.SUCCEEDED:
print('Recipe run completed')
break
elif status_code in (JobStatus.FAILED, JobStatus.CANCELLED):
print(f'Recipe run failed: {status_code}')
break
except Exception as ex:
print(f'Exception during status polling: {ex}')
sleep(WAIT_INTERVAL)
else:
raise RuntimeError('Timed out waiting for recipe execution')
Remove the First Recipe from the Chain¶
dataset_service.delete_dataset_node_from_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
)
Run the Recipe on Sample¶
dataset_recipe_node = add_ds_with_2recipes
run_req = ExecuteRecipeRequest(
useSample=True,
)
run_2nd_recipe = dataset_service.execute_recipe(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=dataset_recipe_node.id,
execute_recipe_request=run_req,
)
print(f"\n{run_2nd_recipe=}")
Wait for Recipe Execution to Complete¶
MAX_ATTEMPTS = 180
WAIT_INTERVAL = 5
for attempt in range(MAX_ATTEMPTS):
try:
run_status = dataset_service.get_latest_recipe_status(
create_dataset.id, 1, run_2nd_recipe.id)
status_code = getattr(run_status.execution, 'status', None)
if status_code == JobStatus.SUCCEEDED:
print('Recipe run completed')
break
elif status_code in (JobStatus.FAILED, JobStatus.CANCELLED):
print(f'Recipe run failed: {status_code}')
break
except Exception as ex:
print(f'Exception during status polling: {ex}')
sleep(WAIT_INTERVAL)
else:
raise RuntimeError('Timed out waiting for recipe execution')
Adding a Recipe to a Dataset & Creating a New Version¶
An existing recipe can be added to a dataset by selecting the recipe and a specific version, then assigning it to the target dataset. Once assigned, the recipe version becomes available for execution within that dataset. To modify a recipe, the user creates a new version rather than altering the original. The new version can be initialized in one of the following ways:
- As a copy of an existing recipe version
- From user-provided code
- From a default query: SELECT * FROM $CurrentTable
Previous versions remain accessible and can be edited, as long as they are not in use by a released dataset. Once a recipe version is associated with a released dataset, it becomes locked and can no longer be modified.
Add an Existing Recipe to a Dataset¶
add_existing_recipe_req = AddDatasetRecipeNodeRequest(
nodeType="RECIPE",
datasetId=create_dataset.id,
recipeId=recipe2.id,
recipeVersionNumber=recipe2.latest_version.version_number,
sourceNodeDefinitions=[SourceNodeDefinition(sourceNodeId=dataset_recipe_node.id,alias="CurrentTable")],
)
add_ds_with_recipe = dataset_service.add_dataset_node_to_transformation_graph(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
add_dataset_node_request=add_existing_recipe_req,
)
print(f"\n{add_ds_with_recipe=}")
Create New Recipe Version¶
recipe_version2 = CreateRecipeVersionRequest(
code="SELECT * FROM $CurrentTable",
)
new_version = recipe_service.create_recipe_version(
recipe_id=recipe2.id,
create_recipe_version_request=recipe_version2,
)
print(f"\n{new_version=}")
Run the Recipe on Full Data¶
run_req = ExecuteRecipeRequest(
useSample=False,
)
run_2nd_recipe = dataset_service.execute_recipe(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
execute_recipe_request=run_req,
)
print(f"\n{run_2nd_recipe=}")
Wait for Full-Data Execution to Complete¶
MAX_ATTEMPTS = 180
WAIT_INTERVAL = 5
for attempt in range(MAX_ATTEMPTS):
try:
run_status = dataset_service.get_latest_recipe_status(
create_dataset.id, 1, run_2nd_recipe.id)
status_code = getattr(run_status.execution, 'status', None)
if status_code == JobStatus.SUCCEEDED:
print('Recipe run completed')
break
elif status_code in (JobStatus.FAILED, JobStatus.CANCELLED):
print(f'Recipe run failed: {status_code}')
break
except Exception as ex:
print(f'Exception during status polling: {ex}')
sleep(WAIT_INTERVAL)
else:
raise RuntimeError('Timed out waiting for recipe execution')
Inspect Data¶
The Inspect Data capability empowers users to explore, validate, and understand data elements used in modeling and analytics workflows. It provides intuitive tools for examining data structure, identifying anomalies, and ensuring data quality before transferring it to other Earnix Solutions such as Price-It and Underwrite-It.
Through Inspect Data users can:
proactively assess the quality of the data that has been imported from external sources.
focus users on columns that might require their attention (alerts).
preview the output of each Recipe step.
validate transformations before applying them.
debug issues in data preparation pipelines.
NOTE
The Inspect Data feature is available exclusively for Data Tables that have been successfully imported and for Recipes that have executed without errors.
Inspect Data offers key Dataset details: Overview, Preview, Schema Analysis, and Alerts
Wait for Schema Analysis to Complete¶
MAX_ATTEMPTS = 60
WAIT_INTERVAL = 5
for attempt in range(MAX_ATTEMPTS):
try:
sa_status = dataset_service.get_recipe_node_schema_analysis_status(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
analysis_type="FULL",
)
status_code = (
sa_status.execution.status
if sa_status and hasattr(sa_status, 'execution') and sa_status.execution
else None
)
if status_code == JobStatus.SUCCEEDED:
print('Schema analysis completed')
break
elif status_code in (JobStatus.FAILED, JobStatus.CANCELLED):
print(f'Schema analysis ended with status: {status_code}')
break
print(f'Attempt {attempt+1}: schema analysis status = {status_code}')
except Exception as ex:
print(f'Attempt {attempt+1}: error checking schema analysis status: {ex}')
sleep(WAIT_INTERVAL)
else:
raise TimeoutError('Schema analysis did not complete within timeout')
Overview¶
The Overview summarizes structure and quality, showing metadata (column names, types, sample values) plus quick insights on missing data and distinct values. It also displays column type distribution and a summary of alerts.
schema_overview = dataset_service.get_recipe_node_overview(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
analysis_type="FULL",
)
print(f"\n{schema_overview=}")
Preview¶
Preview shows up to 100 rows of data.
preview = dataset_service.get_dataset_recipe_node_preview(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
)
print(f"\n{preview=}")
Schema Analysis¶
Schema Analysis gives detailed schema information, including statistics per column. Distribution graphs for categorical and numerical columns help spot patterns, outliers, and skewness.
NOTE
If the column is categorical and has unique values, no distribution graph will be presented.
COLUMN_NAME = list(preview[0].keys())[0] if preview else "POLICY_END_DATE" # use your column name
schema_analysis = dataset_service.get_recipe_node_column_analysis(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
column_name=COLUMN_NAME,
analysis_type="FULL",
)
print(f"\n{schema_analysis=}")
Alerts¶
Alerts flag columns with potential issues like missing values, high cardinality, or imbalanced distribution.
alerts = dataset_service.get_recipe_node_schema_alerts(
dataset_id=create_dataset.id,
transformation_graph_version_number=1,
dataset_node_id=add_ds_with_recipe.id,
analysis_type="FULL",
)
print(f"\n{alerts=}")
Release the Dataset¶
A Dataset can have two statuses:
Draft: under construction (default)
Released: finished and ready for export to Price-It
NOTE To release a Dataset, ensure that no Recipe has failed and that the Recipe processed the entire data. Only released Datasets can be exported to a Price-It project.
released_dataset = dataset_service.release_transformation_graph(
dataset_id=create_dataset.id, transformation_graph_version_number=1
)
Dataset Version Management¶
Each Dataset can evolve over time through Dataset Versions. A version is a snapshot of the Dataset at a point in time, capturing its structure and transformations. Users control when the creation of a new version is needed.
Create a New Dataset Version¶
create_new_version_req = CreateDatasetVersionRequest(
datasetId=create_dataset.id, sourceTransformationGraphVersionNumber=1
)
data_set_version_2 = dataset_service.create_new_version_to_dataset(
dataset_id=create_dataset.id, create_dataset_version_request=create_new_version_req
)
print(f"\n{data_set_version_2=}")
Delete a Dataset¶
dataset_service.delete_dataset(dataset_id=create_dataset.id)