{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "3e1a76a4-dfbe-4b0c-939f-87e0cb2ca6e0",
   "metadata": {},
   "source": "# Data Version Pipeline\n\nElevate Data empowers organizations to effectively manage their data lifecycle by enabling seamless integration of diverse data sources on a large scale, while also providing tools for enhanced governance and efficiency.\n\nElevate Data manages connections to various data sources (S3, Snowflake, and Databricks at this stage), allowing for efficient data extraction.\n\nIt also provides a central repository of data elements, imported from different sources, for greater transparency and efficient collaboration.\n\nThis notebook outlines how to import data from an external data source to Price-It on a fixed cadence: \n\n1. Create a new version of an existing data table to import updated data from the data source.\n2. Create a new version of an existing dataset that includes this data table and a recipe chain.\n3. Update the version of the data table in the dataset to the latest.\n4. Run the recipes on full data.\n5. Release the dataset version.\n6. Transfer dataset to predefined Price-It target."
  },
  {
   "cell_type": "markdown",
   "id": "a344b11a-7b49-45de-a86c-52f6fd1a2598",
   "metadata": {},
   "source": [
    "## Install the SDK"
   ]
  },
  {
   "cell_type": "code",
   "id": "e47c2a23-0cd9-461f-9597-55c7b320d3a6",
   "metadata": {
    "tags": [
     "skip-execution"
    ]
   },
   "source": "!pip install --upgrade earnix-elevate",
   "outputs": [],
   "execution_count": null
  },
  {
   "cell_type": "markdown",
   "id": "83e2ea7f-29d5-49f4-a234-8ca67562a6cd",
   "metadata": {},
   "source": "## Set your Elevate server and credentials\n\nIn this example we use environment variables to authenticate to Elevate, but you can also inject your credentials using Python arguments to each Service's client, like in the commented example."
  },
  {
   "cell_type": "code",
   "id": "d7cf4160-db73-4924-8e2d-a61cb511208a",
   "metadata": {},
   "source": "import os\nos.environ.setdefault(\"E2_SERVER\", \"YOUR_SERVER\")\nos.environ.setdefault(\"E2_CLIENT_ID\", \"YOUR_CLIENT_ID\")\nos.environ.setdefault(\"E2_SECRET_KEY\", \"YOUR_SECRET_KEY\")\n# Alternatively:\n# DataTableService(\n#     server=\"YOUR_SERVER\",\n#     client_id=\"YOUR_CLIENT_ID\",\n#     secret_key=\"YOUR_SECRET_KEY\"\n# )",
   "outputs": [],
   "execution_count": null
  },
  {
   "cell_type": "markdown",
   "id": "b6b60e2d-354c-430e-809e-45cb1dc89126",
   "metadata": {},
   "source": "## Imports and demo preparation"
  },
  {
   "cell_type": "code",
   "id": "72923393-52af-43f9-b7fe-8a4d24fff0d9",
   "metadata": {},
   "source": "from time import sleep\n\nfrom earnix_elevate import (\n    DataTableService,\n    DatasetService,\n    CreateDatasetVersionRequest,\n    UpdateDatasetDataTableNodeRequest,\n    ExecuteRecipeRequest,\n    CreateExportRequest,\n    ExportService\n)\nfrom earnix_elevate.clients.data import JobStatus\nfrom earnix_elevate.clients.imx import JobStatus as ImxJobStatus",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "create-additional-versions-header",
   "metadata": {},
   "source": "## Create Additional Data Version\n\nYou can create a new version of your data table to retrieve updated data from the source. This is useful when the underlying data has changed and you want to refresh your data table."
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e4a03f2f-e92d-4eb2-8a6d-2b7c214812d3",
   "metadata": {},
   "source": [
    "### Create a Data Table service client"
   ]
  },
  {
   "cell_type": "code",
   "id": "a4a429bf-5443-4505-b5ae-7763b41c346e",
   "metadata": {},
   "source": [
    "data_table_service = DataTableService()"
   ],
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7d10ed48-4b9e-40e5-865b-0f74626e540f",
   "metadata": {},
   "source": [
    "### Get the Data Table"
   ]
  },
  {
   "cell_type": "code",
   "id": "2ff6d331-1cce-4f9e-bb31-bfbbdc6b4636",
   "metadata": {},
   "source": "import os\nDATA_TABLE_ID = int(os.environ.get(\"PIPELINE_DATA_TABLE_ID\", \"29104\"))  # use here your data table system id\n\ndata_table = data_table_service.get_data_table(DATA_TABLE_ID)\nprint(f\"\\n{data_table=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c92ea34a-4579-43fd-9f89-df1a1397314b",
   "metadata": {},
   "source": [
    "### Create the Data Table Version"
   ]
  },
  {
   "cell_type": "code",
   "id": "create-new-version-code",
   "metadata": {},
   "source": [
    "data_table_version = data_table_service.create_data_version(data_table.id)\n",
    "print(f\"\\n{data_table_version=}\")"
   ],
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "a4702c25-2441-4734-a9c2-222dc88aacb2",
   "metadata": {},
   "source": [
    "### Follow-up on the import status of the created version"
   ]
  },
  {
   "cell_type": "code",
   "id": "8c9f9d8e-374c-4633-83e2-ab55e6e1998c",
   "metadata": {},
   "source": "data_table_id = data_table_version.data_table_id\ndata_table_version_number = data_table_version.version_number\n\nMAX_ATTEMPTS = 180  # Wait max 15 minutes (180 * 5 sec)\nWAIT_INTERVAL = 5  # seconds\ndv = None\n\nfor attempt in range(MAX_ATTEMPTS):\n    try:\n        dv = data_table_service.get_data_version(data_table_id, data_table_version_number)\n\n        execution = dv.execution\n        status_code = getattr(execution, \"status\", None)\n        if status_code == JobStatus.SUCCEEDED:\n            print(\"\\nImport completed\\n\")\n            break\n        elif status_code == JobStatus.FAILED:\n            print(\"\\nImport failed\\n\")\n            break\n    except Exception as ex:\n        print(f\"Exception during status polling: {ex}\")\n    sleep(WAIT_INTERVAL)\n    if dv:\n        print(f\"Waiting import:\\n{dv}\\n\")\nelse:\n    raise RuntimeError(f\"Timed out waiting for data version {data_table_version_number} to complete.\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1073caa0-b1c2-42fc-8cc3-2c148c635282",
   "metadata": {},
   "source": [
    "## Create Additional Dataset Version\n",
    "\n",
    "Each dataset version is represented by a transformation graph made up of nodes. This graph contains data nodes (data tables), recipe nodes (SQL transformations), and shows the relationships between them."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1292d178-544e-4795-91b4-94ed26739f95",
   "metadata": {},
   "source": [
    "### Create a Dataset service client"
   ]
  },
  {
   "cell_type": "code",
   "id": "56114f71-b488-41ee-a8a8-35a1ad0b90ec",
   "metadata": {},
   "source": [
    "dataset_service = DatasetService()"
   ],
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e7a74eb3-dfb6-4439-bae5-2e0103e73852",
   "metadata": {},
   "source": [
    "### Get the Dataset"
   ]
  },
  {
   "cell_type": "code",
   "id": "eb21b963-41db-4434-9837-d6ce8a1de07d",
   "metadata": {},
   "source": "import os\nDATASET_ID = int(os.environ.get(\"PIPELINE_DATASET_ID\", \"24307\"))  # use here your dataset system id\n\ndataset = dataset_service.get_dataset(DATASET_ID)\nprint(f\"\\n{dataset=}\")\n\ndataset_id = dataset.id",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6d219fde-7a68-4af8-b2ad-3c994f33115f",
   "metadata": {},
   "source": [
    "### Create the Dataset Version"
   ]
  },
  {
   "cell_type": "code",
   "id": "e9256a8d-bc40-49c0-ac91-566e59c209d0",
   "metadata": {},
   "source": "create_new_version_req = CreateDatasetVersionRequest(\n    datasetId=dataset_id,\n    sourceTransformationGraphVersionNumber=dataset.latest_transformation_graph.version_number\n)\n\ndataset_version = dataset_service.create_new_version_to_dataset(\n    dataset_id=dataset_id,\n    create_dataset_version_request=create_new_version_req\n)\n\nprint(f\"\\n{dataset_version=}\")\n\ntransformation_graph = dataset_version.latest_transformation_graph\nprint(f\"\\n{transformation_graph=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "cell_type": "markdown",
   "id": "e2492341-ca5c-4cb3-a489-84c1118ec708",
   "metadata": {},
   "source": [
    "## Update a Data Table version in a Dataset to the latest"
   ]
  },
  {
   "cell_type": "code",
   "id": "67c7d20d-5260-4da3-afaa-823faa6b6aa4",
   "metadata": {},
   "source": "def get_data_table_node_from_transformation_graph():\n    for node in transformation_graph.nodes:\n        if node.node_type == \"TABLE\":\n            return node\n    return None\n\n\ndataset_data_table_node = get_data_table_node_from_transformation_graph()\nassert dataset_data_table_node\n\nupdate_dataset_data_table_node_req = UpdateDatasetDataTableNodeRequest(\n    data_table_id=data_table_id,\n    data_table_version_number=data_table_version_number,\n    dataset_id=dataset_id,\n    id=dataset_data_table_node.id,\n    lock_version=dataset_data_table_node.lock_version,\n    node_type=\"TABLE\"\n)\n\ndataset_data_table_node_with_updated_version = dataset_service.update_dataset_node_in_transformation_graph(\n    dataset_id=dataset_id,\n    transformation_graph_version_number=transformation_graph.version_number,\n    dataset_node_id=dataset_data_table_node.id,\n    update_dataset_node_request=update_dataset_data_table_node_req\n)\n\nprint(f\"\\n{dataset_data_table_node_with_updated_version=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e6e4efd3-f012-4758-8437-6d9196ae8e19",
   "metadata": {},
   "source": [
    "## Run the Recipes on full data\n",
    "\n",
    "* A Recipe is executed only if none of its predecessors in the chain have experienced failure during execution.\n",
    "\n",
    "* When multiple Recipes are marked as pending, initiating the last Recipe will trigger the execution of all preceding Recipes. However, inspection data will be available solely for the last Recipe, and earlier Recipes will remain in a pending state."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2c939e0c-06c2-4bc2-9dfa-df2e608f636d",
   "metadata": {},
   "source": [
    "### Run the last Recipe\n"
   ]
  },
  {
   "cell_type": "code",
   "id": "70dea334-4df5-403a-bf0e-889b29abcf24",
   "metadata": {},
   "source": "run_req = ExecuteRecipeRequest(\n    useSample=False,\n)\n\nrun_recipe = dataset_service.execute_recipe(\n    dataset_id=dataset_id,\n    transformation_graph_version_number=transformation_graph.version_number,\n    dataset_node_id=transformation_graph.output_dataset_recipe_node_id,\n    execute_recipe_request=run_req\n)\n\nprint(f\"\\n{run_recipe=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "bb689b86-f2a9-4cd5-9f33-73b0aa7dfe9b",
   "metadata": {},
   "source": [
    "### Follow-up on the run status of the last Recipe"
   ]
  },
  {
   "cell_type": "code",
   "id": "81fadcd9-30db-4e91-b463-187a8a26b4dc",
   "metadata": {},
   "source": "MAX_ATTEMPTS = 180  # Wait max 15 minutes (180 * 5 sec)\nWAIT_INTERVAL = 5  # seconds\nrun_status_code = None\n\nfor attempt in range(MAX_ATTEMPTS):\n    try:\n        run_status_response = dataset_service.get_latest_recipe_status(\n            dataset_id,\n            transformation_graph.version_number,\n            run_recipe.id)\n\n        execution = run_status_response.execution\n        run_status_code = getattr(execution, \"status\", None)\n        if run_status_code == JobStatus.SUCCEEDED:\n            print(\"\\nRecipe run completed\\n\")\n            break\n        elif run_status_code == JobStatus.FAILED:\n            print(\"\\nRecipe run failed\\n\")\n            break\n    except Exception as ex:\n        print(f\"Exception during run recipe status polling: {ex}\")\n    sleep(WAIT_INTERVAL)\n    if run_status_code:\n        print(f\"Waiting run recipe:{run_status_code}\\n\")\nelse:\n    raise RuntimeError(f\"Timed out waiting for run recipe to complete.\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "5a0c3189-24d2-4fe9-87c9-a81a6852dc11",
   "metadata": {},
   "source": "## Release the Dataset version\n\nA Dataset can have two statuses:\n\n* Draft: under construction (default)\n\n* Released: finished and ready for export to Price-It\n\nNOTE To release a Dataset, ensure that no Recipes have failed and that the Recipe processed the entire data. Only released Datasets can be exported to a Price-It project."
  },
  {
   "cell_type": "code",
   "id": "c3e7ced9-5c7a-41c3-a9ac-680562f8e3a0",
   "metadata": {},
   "source": "released_dataset = dataset_service.release_transformation_graph(\n    dataset_id=dataset_id,\n    transformation_graph_version_number=transformation_graph.version_number\n)\n\nprint(f\"\\n{released_dataset=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "cell_type": "markdown",
   "id": "bfd433e8-a7b0-419b-9f08-938f68147f12",
   "metadata": {},
   "source": "## Transfer Dataset to predefined Price-It target\n\nThe export request supports two source types: 'datatable' and 'dataset', representing data tables and datasets, respectively.\nThe source ID should correspond to the identifier of the data table or dataset to be exported."
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3ba93b9d-0f94-4bd1-af53-4f8a77e10069",
   "metadata": {},
   "source": [
    "### Create an Export service client"
   ]
  },
  {
   "cell_type": "code",
   "id": "6eee8abd-f173-4f9f-992e-b806cf0104da",
   "metadata": {},
   "source": [
    "export_service = ExportService()"
   ],
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "10377e2d-d6da-4552-8c2d-83bcfc0dd7ec",
   "metadata": {},
   "source": [
    "### Export the Dataset"
   ]
  },
  {
   "cell_type": "code",
   "id": "cc557d58-e49d-4e20-8bda-640ea51b9bf8",
   "metadata": {},
   "source": "import os\nTARGET_ID = int(os.environ.get(\"PIPELINE_TARGET_ID\", \"9102\"))  # use here your target system id\n\nexport_dataset_request = CreateExportRequest(\n    targetId=TARGET_ID,\n    sourceType='dataset',\n    sourceId=dataset_id,\n    sourceVersion=transformation_graph.version_number,\n    useSample=False,\n    validateToRelease=True,\n    castBooleanToInteger=False\n)\n\nexport_response = export_service.export(export_dataset_request)\nprint(f\"\\n{export_response=}\")",
   "outputs": [],
   "execution_count": null
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "a75e20d5-92aa-451b-af47-7572abb9b800",
   "metadata": {},
   "source": [
    "### Follow-up on the export status of the exported Dataset"
   ]
  },
  {
   "cell_type": "code",
   "id": "630eee71-5201-4dfa-a81c-8ac6d5e101da",
   "metadata": {},
   "source": "export_id = export_response.export_id\n\nMAX_ATTEMPTS = 180  # Wait max 15 minutes (180 * 5 sec)\nWAIT_INTERVAL = 5  # seconds\n\nfor attempt in range(MAX_ATTEMPTS):\n    try:\n        export_response = export_service.get_export(export_id)\n\n        execution = export_response.execution\n        status_code = getattr(execution, \"status\", None)\n        if status_code == ImxJobStatus.SUCCEEDED:\n            print(\"\\nExport completed\\n\")\n            break\n        elif status_code == ImxJobStatus.FAILED:\n            print(\"\\nExport failed\\n\")\n            break\n    except Exception as ex:\n        print(f\"Exception during status polling: {ex}\")\n    sleep(WAIT_INTERVAL)\n    print(f\"Waiting Export:{export_id}\\n\")\nelse:\n    raise RuntimeError(f\"Timed out waiting for export {export_id} to complete.\")",
   "outputs": [],
   "execution_count": null
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}