Data Version Pipeline¶
Elevate Data empowers organizations to effectively manage their data lifecycle by enabling seamless integration of diverse data sources on a large scale, while also providing tools for enhanced governance and efficiency.
Elevate Data manages connections to various data sources (S3, Snowflake, and Databricks at this stage), allowing for efficient data extraction.
It also provides a central repository of data elements, imported from different sources, for greater transparency and efficient collaboration.
This notebook outlines how to import data from an external data source to Price-It on a fixed cadence:
- Create a new version of an existing data table to import updated data from the data source.
- Create a new version of an existing dataset that includes this data table and a recipe chain.
- Update the version of the data table in the dataset to the latest.
- Run the recipes on full data.
- Release the dataset version.
- Transfer dataset to predefined Price-It target.
Install the SDK¶
!pip install --upgrade earnix-elevate
Set your Elevate server and credentials¶
In this example we use environment variables to authenticate to Elevate, but you can also inject your credentials using Python arguments to each Service's client, like in the commented example.
import os
os.environ.setdefault("E2_SERVER", "YOUR_SERVER")
os.environ.setdefault("E2_CLIENT_ID", "YOUR_CLIENT_ID")
os.environ.setdefault("E2_SECRET_KEY", "YOUR_SECRET_KEY")
# Alternatively:
# DataTableService(
# server="YOUR_SERVER",
# client_id="YOUR_CLIENT_ID",
# secret_key="YOUR_SECRET_KEY"
# )
Imports and demo preparation¶
from time import sleep
from earnix_elevate import (
DataTableService,
DatasetService,
CreateDatasetVersionRequest,
UpdateDatasetDataTableNodeRequest,
ExecuteRecipeRequest,
CreateExportRequest,
ExportService
)
from earnix_elevate.clients.data import JobStatus
from earnix_elevate.clients.imx import JobStatus as ImxJobStatus
Create Additional Data Version¶
You can create a new version of your data table to retrieve updated data from the source. This is useful when the underlying data has changed and you want to refresh your data table.
Create a Data Table service client¶
data_table_service = DataTableService()
Get the Data Table¶
import os
DATA_TABLE_ID = int(os.environ.get("PIPELINE_DATA_TABLE_ID", "29104")) # use here your data table system id
data_table = data_table_service.get_data_table(DATA_TABLE_ID)
print(f"\n{data_table=}")
Create the Data Table Version¶
data_table_version = data_table_service.create_data_version(data_table.id)
print(f"\n{data_table_version=}")
Follow-up on the import status of the created version¶
data_table_id = data_table_version.data_table_id
data_table_version_number = data_table_version.version_number
MAX_ATTEMPTS = 180 # Wait max 15 minutes (180 * 5 sec)
WAIT_INTERVAL = 5 # seconds
dv = None
for attempt in range(MAX_ATTEMPTS):
try:
dv = data_table_service.get_data_version(data_table_id, data_table_version_number)
execution = dv.execution
status_code = getattr(execution, "status", None)
if status_code == JobStatus.SUCCEEDED:
print("\nImport completed\n")
break
elif status_code == JobStatus.FAILED:
print("\nImport failed\n")
break
except Exception as ex:
print(f"Exception during status polling: {ex}")
sleep(WAIT_INTERVAL)
if dv:
print(f"Waiting import:\n{dv}\n")
else:
raise RuntimeError(f"Timed out waiting for data version {data_table_version_number} to complete.")
Create Additional Dataset Version¶
Each dataset version is represented by a transformation graph made up of nodes. This graph contains data nodes (data tables), recipe nodes (SQL transformations), and shows the relationships between them.
Create a Dataset service client¶
dataset_service = DatasetService()
Get the Dataset¶
import os
DATASET_ID = int(os.environ.get("PIPELINE_DATASET_ID", "24307")) # use here your dataset system id
dataset = dataset_service.get_dataset(DATASET_ID)
print(f"\n{dataset=}")
dataset_id = dataset.id
Create the Dataset Version¶
create_new_version_req = CreateDatasetVersionRequest(
datasetId=dataset_id,
sourceTransformationGraphVersionNumber=dataset.latest_transformation_graph.version_number
)
dataset_version = dataset_service.create_new_version_to_dataset(
dataset_id=dataset_id,
create_dataset_version_request=create_new_version_req
)
print(f"\n{dataset_version=}")
transformation_graph = dataset_version.latest_transformation_graph
print(f"\n{transformation_graph=}")
Update a Data Table version in a Dataset to the latest¶
def get_data_table_node_from_transformation_graph():
for node in transformation_graph.nodes:
if node.node_type == "TABLE":
return node
return None
dataset_data_table_node = get_data_table_node_from_transformation_graph()
assert dataset_data_table_node
update_dataset_data_table_node_req = UpdateDatasetDataTableNodeRequest(
data_table_id=data_table_id,
data_table_version_number=data_table_version_number,
dataset_id=dataset_id,
id=dataset_data_table_node.id,
lock_version=dataset_data_table_node.lock_version,
node_type="TABLE"
)
dataset_data_table_node_with_updated_version = dataset_service.update_dataset_node_in_transformation_graph(
dataset_id=dataset_id,
transformation_graph_version_number=transformation_graph.version_number,
dataset_node_id=dataset_data_table_node.id,
update_dataset_node_request=update_dataset_data_table_node_req
)
print(f"\n{dataset_data_table_node_with_updated_version=}")
Run the Recipes on full data¶
A Recipe is executed only if none of its predecessors in the chain have experienced failure during execution.
When multiple Recipes are marked as pending, initiating the last Recipe will trigger the execution of all preceding Recipes. However, inspection data will be available solely for the last Recipe, and earlier Recipes will remain in a pending state.
Run the last Recipe¶
run_req = ExecuteRecipeRequest(
useSample=False,
)
run_recipe = dataset_service.execute_recipe(
dataset_id=dataset_id,
transformation_graph_version_number=transformation_graph.version_number,
dataset_node_id=transformation_graph.output_dataset_recipe_node_id,
execute_recipe_request=run_req
)
print(f"\n{run_recipe=}")
Follow-up on the run status of the last Recipe¶
MAX_ATTEMPTS = 180 # Wait max 15 minutes (180 * 5 sec)
WAIT_INTERVAL = 5 # seconds
run_status_code = None
for attempt in range(MAX_ATTEMPTS):
try:
run_status_response = dataset_service.get_latest_recipe_status(
dataset_id,
transformation_graph.version_number,
run_recipe.id)
execution = run_status_response.execution
run_status_code = getattr(execution, "status", None)
if run_status_code == JobStatus.SUCCEEDED:
print("\nRecipe run completed\n")
break
elif run_status_code == JobStatus.FAILED:
print("\nRecipe run failed\n")
break
except Exception as ex:
print(f"Exception during run recipe status polling: {ex}")
sleep(WAIT_INTERVAL)
if run_status_code:
print(f"Waiting run recipe:{run_status_code}\n")
else:
raise RuntimeError(f"Timed out waiting for run recipe to complete.")
Release the Dataset version¶
A Dataset can have two statuses:
Draft: under construction (default)
Released: finished and ready for export to Price-It
NOTE To release a Dataset, ensure that no Recipes have failed and that the Recipe processed the entire data. Only released Datasets can be exported to a Price-It project.
released_dataset = dataset_service.release_transformation_graph(
dataset_id=dataset_id,
transformation_graph_version_number=transformation_graph.version_number
)
print(f"\n{released_dataset=}")
Transfer Dataset to predefined Price-It target¶
The export request supports two source types: 'datatable' and 'dataset', representing data tables and datasets, respectively. The source ID should correspond to the identifier of the data table or dataset to be exported.
Create an Export service client¶
export_service = ExportService()
Export the Dataset¶
import os
TARGET_ID = int(os.environ.get("PIPELINE_TARGET_ID", "9102")) # use here your target system id
export_dataset_request = CreateExportRequest(
targetId=TARGET_ID,
sourceType='dataset',
sourceId=dataset_id,
sourceVersion=transformation_graph.version_number,
useSample=False,
validateToRelease=True,
castBooleanToInteger=False
)
export_response = export_service.export(export_dataset_request)
print(f"\n{export_response=}")
Follow-up on the export status of the exported Dataset¶
export_id = export_response.export_id
MAX_ATTEMPTS = 180 # Wait max 15 minutes (180 * 5 sec)
WAIT_INTERVAL = 5 # seconds
for attempt in range(MAX_ATTEMPTS):
try:
export_response = export_service.get_export(export_id)
execution = export_response.execution
status_code = getattr(execution, "status", None)
if status_code == ImxJobStatus.SUCCEEDED:
print("\nExport completed\n")
break
elif status_code == ImxJobStatus.FAILED:
print("\nExport failed\n")
break
except Exception as ex:
print(f"Exception during status polling: {ex}")
sleep(WAIT_INTERVAL)
print(f"Waiting Export:{export_id}\n")
else:
raise RuntimeError(f"Timed out waiting for export {export_id} to complete.")