conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Using the database, schema, and warehouse
Specify the database and schema in which you want to create tables. Also specify the warehouse that will provide
resources for executing DML statements and queries.
For example, to use the database testdb
, schema testschema
and warehouse tiny_warehouse
(created earlier):
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Creating tables and inserting data
Use the CREATE TABLE command to create tables and the INSERT command to populate the tables with data.
For example, create a table named testtable
and insert two rows into the table:
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
Loading data
Instead of inserting data into tables using individual INSERT commands, you can bulk load data from files staged in either an internal or external location.
Copying data from an internal location
To load data from files on your host machine into a table, first use the PUT command to stage the file in an internal location, then use the
COPY INTO <table> command to copy the data in the files into the table.
For example:
# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
Where your CSV data is stored in a local directory named /tmp/data
in a Linux or macOS environment, and the directory contains files named file0
, file1
, … file100
.
Copying data from an external location
To load data from files already staged in an external location (i.e. your S3 bucket) into a table, use the COPY INTO <table> command.
For example:
# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
Where:
s3://<s3_bucket>/data/
specifies the name of your S3 bucket
The files in the bucket are prefixed with data
.
The bucket is accessed using a storage integration created using CREATE STORAGE INTEGRATION by an account administrator (i.e. a user with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege. A storage integration allows users to avoid supplying credentials to access a private storage location.
This example uses the format() function to compose the statement. If your environment has a risk of SQL injection
attacks, you might prefer to bind values rather than use format().
With the Snowflake Connector for Python, you can submit:
a synchronous query, which returns control to your application after
the query completes.
an asynchronous query, which returns control to your application
before the query completes.
After the query has completed, you use the Cursor
object to
fetch the values in the results. By default, the Snowflake Connector for
Python converts the values from Snowflake data types to native Python data types. (Note that
you can choose to return the values as strings and perform the type conversions in your application.
See Improving query performance by bypassing data conversion.)
By default, values from NUMBER columns are returned as double-precision floating-point values (float64
). To return these
as decimal values (decimal.Decimal
) in the fetch_pandas_all()
and fetch_pandas_batches()
methods, set
the arrow_number_to_decimal
parameter in the connect()
method to True
.
Performing a synchronous query
To perform a synchronous query, call the execute()
method in the Cursor
object. For example:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Use the Cursor
object to fetch the values in the results, as explained in
Using cursor to fetch values.
Performing an asynchronous query
The Snowflake Connector for Python supports asynchronous queries (i.e. queries that return control to the user before the query
completes). You can submit an asynchronous query and use polling to determine when the query has completed. After the query
completes, you can get the results.
To perform asynchronous queries, you must ensure the ABORT_DETACHED_QUERY
configuration parameter is FALSE
(default value).
If the connection to client is lost:
For synchronous queries, all in-progress synchronous queries are aborted immediately regardless of the parameter value.
For asynchronous queries:
If ABORT_DETACHED_QUERY is set to FALSE
, in-progress asynchronous queries continue to run until they end normally.
If ABORT_DETACHED_QUERY is set to TRUE
, Snowflake automatically aborts all in-progress asynchronous queries when a client connection is not re-established after five minutes.
With this feature, you can submit multiple queries in parallel without waiting for each query to complete. You can also run a
combination of synchronous and asynchronous queries during the same session.
Finally, you can submit an asynchronous query from one connection and check the results from a different connection. For example,
a user can initiate a long-running query from your application, exit the application, and restart the application at a later time
to check the results.
Submitting an asynchronous query
To submit an asynchronous query, call the execute_async()
method in the Cursor
object. For example:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
After submitting the query:
To determine if the query is still running, see Checking the status of a query.
To retrieve the results of the query, see Using the query ID to retrieve the results of a query.
For examples of performing asynchronous queries, see Examples of asynchronous queries.
Best practices for asynchronous queries
When submitting an asynchronous query, follow these best practices:
Ensure that you know which queries are dependent upon other queries before you run any queries in parallel. Some queries are
interdependent and order sensitive, and therefore not suitable for parallelizing. For example, obviously an INSERT statement
should not start until after the corresponding CREATE TABLE statement has finished.
Ensure that you do not run too many queries for the memory that you have available. Running multiple queries in parallel
typically consumes more memory, especially if more than one set of results is stored in memory at the same time.
When polling, handle the rare cases where a query does not succeed.
Ensure that transaction control statements (BEGIN, COMMIT, and ROLLBACK) do not execute in parallel with other statements.
Retrieving the Snowflake query ID
A query ID identifies each query executed by Snowflake. When you use the Snowflake Connector for Python to execute a query, you
can access the query ID through the sfqid
attribute in the Cursor
object:
# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
You can use the query ID to:
Check the status of the query in the web interface.
In the Classic Console, query IDs are displayed in the History page.
See Using the History Page to Monitor Queries.
Programmatically check the status of the query (e.g. to determine if an asynchronous query has completed).
See Checking the status of a query.
Retrieve the results of an asynchronous query or a previously submitted synchronous query.
See Using the query ID to retrieve the results of a query.
Cancel a running query.
See Canceling a query by query ID.
To check the status of a query:
Get the query ID from the sfqid
field in the Cursor
object.
Pass the query ID to the get_query_status()
method of the Connection
object to return the
QueryStatus
enum constant that represents the status of the query.
By default, get_query_status()
does not raise an error if the query resulted in an error. If you want an error raised,
call get_query_status_throw_if_error()
instead.
Use the QueryStatus
enum constant to check the status of the query.
To determine if the query is still running (for example, if this is an asynchronous query), pass the constant to the
is_still_running()
method of the Connection
object.
To determine if an error occurred, pass the constant to the is_an_error()
method.
For the full list of enum constants, see QueryStatus
.
The following example executes an asynchronous query and checks the status of the query:
import time
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
The following example raises an error if the query has resulted in an error:
from snowflake.connector import ProgrammingError
import time
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
Using the query ID to retrieve the results of a query
If you performed a synchronous query by calling the execute()
method on a Cursor
object, you don’t need to use the query ID to retrieve the results. You can just fetch the values
from the results, as explained in Using cursor to fetch values.
If you want to retrieve the results of an asynchronous query or a previously submitted synchronous query, follow these steps:
Get the query ID of the query. See Retrieving the Snowflake query ID.
Call the get_results_from_sfqid()
method in the Cursor
object to retrieve the results.
Use the Cursor
object to fetch the values in the results, as explained in
Using cursor to fetch values.
Note that if the query is still running, the fetch methods (fetchone()
, fetchmany()
, fetchall()
, etc.)
will wait for the query to complete.
For example:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Using cursor
to fetch values
Fetch values from a table using the cursor object iterator method.
For example, to fetch columns named “col1” and “col2” from the table
named testtable
, which was created earlier
(in Creating tables and inserting data),
use code similar to the following:
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
Alternatively, the Snowflake Connector for Python provides a convenient shortcut:
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
print('{0}, {1}'.format(col1, col2))
If you need to get a single result (i.e. a single row), use the fetchone
method:
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))
If you need to get the specified number of rows at a time, use the fetchmany
method with the number of rows:
cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
ret = cur.fetchmany(3)
print(ret)
Use fetchone
or fetchmany
if the result set is too large
to fit into memory.
If you need to get all results at once:
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
print('%s, %s' % (rec[0], rec[1]))
To set a timeout for a query, execute a “begin” command and include a timeout parameter on the query. If the query exceeds the length of the parameter value, an error is produced and a rollback occurs.
In the following code, error 604 means the query was canceled. The timeout parameter starts Timer()
and cancels if the query does not finish within the specified time.
conn.cursor().execute("create or replace table testtbl(a int, b string)")
conn.cursor().execute("begin")
try:
conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query
except ProgrammingError as e:
if e.errno == 604:
print("timeout")
conn.cursor().execute("rollback")
else:
raise e
else:
conn.cursor().execute("commit")
Using DictCursor
to fetch values by column name
If you want to fetch a value by column name, create a cursor
object of type DictCursor
.
For example:
# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
cur.execute("SELECT col1, col2 FROM testtable")
for rec in cur:
print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
cur.close()
Examples of asynchronous queries
The following is a simple example of an asynchronous query:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
The next example submits an asynchronous query from one connection and retrieves the results from a different connection:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Canceling a query by query ID
Cancel a query by query ID:
cur = cn.cursor()
try:
cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
result = cur.fetchall()
print(len(result))
print(result[0])
finally:
cur.close()
Replace the string “queryID” with the actual query ID. To get the ID for a query, see
Retrieving the Snowflake query ID.
Improving query performance by bypassing data conversion
To improve query performance, use the SnowflakeNoConverterToPython
class in the snowflake.connector.converter_null
module to bypass
data conversions from the Snowflake internal data type to the native Python data type, e.g.:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython
con = snowflake.connector.connect(
converter_class=SnowflakeNoConverterToPython
for rec in con.cursor().execute("SELECT * FROM large_table"):
# rec includes raw Snowflake data
As a result, all data is represented in string form such that the application is responsible for
converting it to the native Python data types. For example, TIMESTAMP_NTZ
and TIMESTAMP_LTZ
data are the epoch time represented in string form, and TIMESTAMP_TZ
data is the epoch time followed by a space
followed by the offset to UTC in minutes represented in string form.
No impact is made to binding data; Python native data can still be bound for updates.
Binding data
To specify values to be used in a SQL statement, you can include literals in the statement, or you can
bind variables. When you bind variables, you put one or more
placeholders in the text of the SQL statement, and then specify the variable (the value to be used)
for each placeholder.
The following example contrasts the use of literals and binding:
Literals:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")
Binding:
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%s, %s)", (
789,
'test string3'
There is an upper limit to the size of data that you can bind, or that you can combine in a batch. For details, see Limits on Query Text Size.
Snowflake supports the following types of binding:
pyformat
and format
, which bind data on the client.
qmark
and numeric
, which bind data on the server.
Each of these is explained below.
pyformat
or format
binding
Both pyformat
binding and format
binding bind data on the client side rather than on the server side.
By default, the Snowflake Connector for Python supports both pyformat
and format
, so you can use %(name)s
or %s
as the
placeholder. For example:
Using %(name)s
as the placeholder:
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
Using %s
as the placeholder:
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%s, %s)", (
789,
'test string3'
With pyformat
and format
, you can also use a list object to bind data for the IN operator:
# Binding data for IN operator
con.cursor().execute(
"SELECT col1, col2 FROM testtable"
" WHERE col2 IN (%s)", (
['test string1', 'test string3'],
The percent character (“%”) is both a wildcard character for SQL LIKE and a format binding character for Python. If you
use format binding, and if your SQL command contains the percent character, you might need to escape the percent
character. For example, if your SQL statement is:
SELECT col1, col2
FROM test_table
WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
then your Python code should look like the following (note the extra percent sign to escape the original percent sign):
sql_command = "select col1, col2 from test_table "
sql_command += " where col2 like '%%York' limit %(lim)s"
parameter_dictionary = {'lim': 1 }
cur.execute(sql_command, parameter_dictionary)
qmark
or numeric
binding
Both qmark
binding and numeric
binding bind data on the server side rather than on the client side:
For qmark
binding, use a question mark character (?
) to indicate where in the string you want a variable’s value
inserted.
For numeric
binding, use a colon (:
) followed by a number to indicate the position of the variable that you want
substituted at that position. For example, :2
specifies the second variable.
Use numeric binding to bind the same value more than once in the same query. For example, if you have a long VARCHAR or BINARY
or semi-structured value that you want to use more than once, then numeric
binding allows you to send the value to the server once and use it multiple times.
The next sections explain how to use qmark
and numeric
binding:
Using qmark or numeric binding
Using qmark or numeric binding with datetime objects
Using bind variables with the IN operator
Using qmark
or numeric
binding
To use qmark
or numeric
style binding, you can either execute one of the following or set paramstyle
as part of the connection parameters when calling connect()
.
snowflake.connector.paramstyle='qmark'
snowflake.connector.paramstyle='numeric'
If you set paramstyle
to qmark
or numeric
, you must use ?
or :N
(where N
is replaced
with a number) as the placeholders, respectively.
For example:
Using ?
as the placeholder:
from snowflake.connector import connect
connection_parameters = {
'account': 'xxxxx',
'user': 'xxxx',
'password': 'xxxxxx',
"host": "xxxxxx",
"port": 443,
'protocol': 'https',
'warehouse': 'xxx',
'database': 'xxx',
'schema': 'xxx',
'paramstyle': 'qmark' # note paramstyle setting here at connection level
con = connect(**connection_parameters)
con.cursor().execute(
"INSERT INTO testtable2(col1,col2,col3) "
"VALUES(?,?,?)", (
987,
'test string4',
("TIMESTAMP_LTZ", datetime.now())
Using :N
as the placeholder:
import snowflake.connector
snowflake.connector.paramstyle='numeric'
con = snowflake.connector.connect(...)
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(:1, :2)", (
789,
'test string3'
The following query shows how to use numeric
binding to reuse a variable:
con.cursor().execute(
"INSERT INTO testtable(complete_video, short_sample_of_video) "
"VALUES(:1, SUBSTRING(:1, :2, :3))", (
binary_value_that_stores_video, # variable :1
starting_offset_in_bytes_of_video_clip, # variable :2
length_in_bytes_of_video_clip # variable :3
Using qmark
or numeric
binding with datetime
objects
When using qmark
or numeric
binding to bind data to a Snowflake TIMESTAMP data type, set the bind variable to a tuple that
specifies the Snowflake timestamp data type (TIMESTAMP_LTZ
or TIMESTAMP_TZ
) and the value. For example:
import snowflake.connector
snowflake.connector.paramstyle='qmark'
con = snowflake.connector.connect(...)
con.cursor().execute(
"CREATE OR REPLACE TABLE testtable2 ("
" col1 int, "
" col2 string, "
" col3 timestamp_ltz"
con.cursor().execute(
"INSERT INTO testtable2(col1,col2,col3) "
"VALUES(?,?,?)", (
987,
'test string4',
("TIMESTAMP_LTZ", datetime.now())
Unlike client side binding, the server side binding requires the Snowflake data type for the column. Most common Python data types
already have implicit mappings to Snowflake data types (e.g. int
is mapped to FIXED
). However, because the Python
datetime
data can be bound to one of multiple Snowflake data types (TIMESTAMP_NTZ
, TIMESTAMP_LTZ
,
or TIMESTAMP_TZ
), and the default mapping is TIMESTAMP_NTZ
, you must specify the Snowflake data type to use.
Using bind variables with the IN operator
qmark
and numeric
(server side binding) do not support the use of bind variables with the IN operator.
If you need to use bind variables with the IN operator, use
client side binding (pyformat
or format
).
Binding parameters to variables for batch inserts
In your application code, you can insert multiple rows in a single batch. To do this, use parameters for values in an INSERT
statement. For example, the following statement uses placeholders for qmark
binding in an INSERT statement:
insert into grocery (item, quantity) values (?, ?)
Then, to specify the data that should be inserted, define a variable that is a sequence of sequences (for example, a list of
tuples):
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
As shown in the example above, each item in the list is a tuple that contains the column values for a row to be inserted.
To perform the binding, call the executemany()
method, passing the variable as the second argument. For example:
conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
"insert into grocery (item, quantity) values (?, ?)",
rows_to_insert)
If you are binding data on the server (i.e. by using qmark
or
numeric
binding), the connector can optimize the performance of batch inserts through binding.
When you use this technique to insert a large number of values, the driver can improve performance by streaming the data (without
creating files on the local machine) to a temporary stage for ingestion. The driver automatically does this when the number of
values exceeds a threshold.
In addition, the current database and schema for the session must be set. If these are not set, the CREATE TEMPORARY STAGE command
executed by the driver can fail with the following error:
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
For alternative ways to load data into the Snowflake database (including bulk loading using the COPY command), see
Load Data into Snowflake.
Avoid SQL injection attacks
Avoid binding data using Python’s formatting function because you risk SQL injection. For example:
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)d, '%(col2)s')" % {
'col1': 789,
'col2': 'test string3'
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%d, '%s')" % (
789,
'test string3'