Import from Snowflake¶
Elevate Data empowers organizations to effectively manage their data lifecycle by enabling seamless integration of diverse data sources on a large scale, while also providing tools for enhanced governance and efficiency.
Elevate Data manages connections to various data sources (S3, Snowflake, and Databricks at this stage), allowing for efficient data extraction.
It also provides a central repository of data elements, imported from different sources, for greater transparency and efficient collaboration.
This notebook outlines how to:
- Create a Connection: Connections allow for a secure setup to external data sources, such as a Snowflake host, a Databricks Delta Sharing, or an Amazon S3 bucket.
- Retrieve and list connections to manage your data sources.
- Create a Data Source: A Data Source defines the Snowflake query and connection used to extract data.
- Create a Data Table: A Data Table references a Data Source and triggers the import of the first version automatically.
- Retrieve and list data tables to manage your data assets.
- Follow-up on the import status of the version.
- Preview 100 rows of the version.
- Create additional data versions to refresh your data.
In this example, we are going to create a Connection to a Snowflake host and import data from it using a Data Table.
Install the SDK¶
%pip install --upgrade earnix-elevate
Set your Elevate server and credentials¶
In this example we use environment variables to authenticate to Elevate, but you can also inject your credentials using Python arguments to each Service's client, like in the commented example.
import os
os.environ.setdefault("E2_SERVER", "YOUR_SERVER")
os.environ.setdefault("E2_CLIENT_ID", "YOUR_CLIENT_ID")
os.environ.setdefault("E2_SECRET_KEY", "YOUR_SECRET_KEY")
# Alternatively:
# ConnectionService(
# server="YOUR_SERVER",
# client_id="YOUR_CLIENT_ID",
# secret_key="YOUR_SECRET_KEY"
# )
Imports and demo preparation¶
from datetime import datetime
from time import sleep
from earnix_elevate import (
ConnectionService,
CreateDataTableRequest,
CreateRDBMSDataSourceRequest,
CreateSnowflakeConnectionRequest,
DataSourceService,
DataTableService,
SnowflakeClientCredentialsAuthRequest,
SnowflakePasswordAuthRequest,
)
from earnix_elevate.clients.data import JobStatus
DEMO_SUFFIX = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
connection_service = ConnectionService()
data_source_service = DataSourceService()
data_table_service = DataTableService()
Create a Connection¶
There are several steps to follow in order to create a Snowflake connection.
Create the Connection¶
import os
# Option 1: OAuth2 Client Credentials
# auth = SnowflakeClientCredentialsAuthRequest(
# authType="snowflake_client_credentials",
# clientId="0ebd7557-328f-XXXX-XXXX-026bf502770b",
# clientSecret="sye8Q~XXXX45Mtfks1YaRueRW.kima-u",
# host="your-account.region.snowflakecomputing.com",
# tokenUrl="https://login.microsoftonline.com/5e9f6a19-afc7-XXXX-8ad7-XXXX/oauth2/v2.0/token",
# scope="api://c2aefe1e-8dee-XXXX-XXXX-a4145e13856d/.default",
# )
# Option 2: Password Authentication
auth = SnowflakePasswordAuthRequest(
authType="snowflake_password",
username=os.environ.get("SNOWFLAKE_USERNAME", "YOUR_USERNAME"),
host=os.environ.get("SNOWFLAKE_HOST", "YOUR_HOST.REGION.snowflakecomputing.com"),
password=os.environ.get("SNOWFLAKE_PASSWORD", "YOUR_PASSWORD"),
)
new_conn = CreateSnowflakeConnectionRequest(
type="snowflake",
name="Bee-Insurance Demo Conn " + DEMO_SUFFIX,
auth=auth,
warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", "YOUR_WAREHOUSE"),
)
create_conn = connection_service.create_connection(new_conn)
print(f"\n{create_conn=}")
Retrieve Connection Details¶
After creating a connection, you can retrieve its details using the connection ID.
retrieved_conn = connection_service.get_connection(create_conn.id)
print(f"\n{retrieved_conn=}")
List All Connections¶
You can also list all available connections in your environment.
all_connections = connection_service.list_connections().items
print(f"\nFound {len(all_connections)} connections:")
for conn in all_connections:
print(f" - {conn.name} (ID: {conn.id})")
Create a Data Table¶
There are several steps to follow in order to create a Data Table that references specific Snowflake elements through the established connection. Once the Data Table is created, the import of the first version will begin automatically.
Create the Data Source¶
When creating a Data Source for the Data Table, provide the ID of the connection you wish to use, and define the Snowflake query to retrieve data from the source. You can also specify the Snowflake warehouse to use while running the query (optional).
new_ds = CreateRDBMSDataSourceRequest(
type="rdbms",
name="Bee-Insurance Demo DS " + DEMO_SUFFIX,
connectionId=create_conn.id,
query="select * from BEE_INSURANCE.HOME.HOME_POLICIES",
warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", "YOUR_WAREHOUSE"),
)
create_ds = data_source_service.create_data_source(new_ds)
print(f"\n{create_ds=}")
Create the Data Table¶
new_dt = CreateDataTableRequest(
name="Bee-Insurance Demo DT " + DEMO_SUFFIX,
dataSourceId=create_ds.id,
)
create_dt = data_table_service.create_data_table(new_dt)
print(f"\n{create_dt=}")
Retrieve Data Table Details¶
After creating a data table, you can retrieve its details using the data table ID.
retrieved_dt = data_table_service.get_data_table(create_dt.id)
print(f"\n{retrieved_dt=}")
List All Data Tables¶
You can also list all available data tables in your environment.
all_data_tables = data_table_service.list_data_tables().items
print(f"\nFound {len(all_data_tables)} data tables:")
for dt in all_data_tables:
print(f" - {dt.name} (ID: {dt.id})")
Follow-up on the import status of the created version¶
data_version_number = create_dt.latest_data_version.version_number
MAX_ATTEMPTS = 180 # Wait max 15 minutes (180 * 5 sec)
WAIT_INTERVAL = 5 # seconds
dv = None
for attempt in range(MAX_ATTEMPTS):
try:
dv = data_table_service.get_data_version(create_dt.id, data_version_number)
status_code = dv.execution.status if dv.execution else None
if status_code == JobStatus.SUCCEEDED:
print("\nImport completed\n")
break
elif status_code == JobStatus.FAILED:
print("\nImport failed\n")
break
except Exception as ex:
print(f"Exception during status polling: {ex}")
sleep(WAIT_INTERVAL)
if dv:
print(f"Waiting:\n{dv}")
else:
raise RuntimeError(f"Timed out waiting for data version {data_version_number} to complete.")
Preview¶
Once the version has been successfully imported, you can preview the first 100 rows of the imported data.
dvt = data_table_service.get_data_version_preview(create_dt.id, data_version_number)
print(f"\n{dvt=}")
Create Additional Data Versions¶
You can create new versions of your data table to retrieve updated data from the source. This is useful when the underlying data in Snowflake has changed and you want to refresh your data table.
new_version = data_table_service.create_data_version(create_dt.id)
print(f"\n{new_version=}")
Cleanup¶
Optional. Delete all resources created in this notebook.
# Cleanup — delete all resources created in this notebook
try:
data_table_service.delete_data_table(data_table_id=create_dt.id)
print(f"Deleted data table {create_dt.id}")
except Exception as e:
print(f"Failed to delete data table: {e}")
try:
data_source_service.delete_data_source(data_source_id=create_ds.id)
print(f"Deleted data source {create_ds.id}")
except Exception as e:
print(f"Failed to delete data source: {e}")
try:
connection_service.delete_connection(connection_id=create_conn.id)
print(f"Deleted connection {create_conn.id}")
except Exception as e:
print(f"Failed to delete connection: {e}")