Performing SQL, Hive and Impala queries

You can use the Python APIs to execute SQL queries on any SQL connection in DSS (including Hive and Impala).

Note

There are three capabilities related to performing SQL queries in Dataiku’s Python APIs:

  • dataiku.SQLExecutor2, dataiku.HiveExecutor and dataiku.ImpalaExecutor in the dataiku package. It was initially designed for usage within DSS in recipes and Jupyter notebooks. These are used to perform queries and retrieve results, either as an iterator or as a pandas dataframe

  • “partial recipes”. It is possible to execute a “partial recipe” from a Python recipe, to execute a Hive, Impala or SQL query. This allows you to use Python to dynamically generate a SQL (resp Hive, Pig, Impala) query and have DSS execute it, as if your recipe was a SQL query recipe. This is useful when you need complex business logic to generate the final SQL query and can’t do it with only SQL constructs.

  • dataikuapi.DSSClient.sql_query() in the dataikuapi package. This function was initially designed for usage outside of DSS and only supports returning results as an iterator. It does not support pandas dataframe

We recommend the usage of the dataiku variants.

For more details on the two packages, please see Python APIs

Executing queries

You can retrieve the results of a SELECT query as a Pandas dataframe.

from dataiku import SQLExecutor2

executor = SQLExecutor2(connection="db-connection") # or dataset="dataset_name"
df = executor.query_to_df("SELECT col1, COUNT(*) as count FROM mytable")
# df is a Pandas dataframe with two columns : "col1" and "count"

Alternatively, you can retrieve the results of a query as an iterator.

from dataiku import SQLExecutor2

executor = SQLExecutor2(connection="db-connection")
query_reader = executor.query_to_iter("SELECT * FROM mytable")
query_iterator = query_reader.iter_tuples()

Queries with side-effects

For databases supporting commit, the transaction in which the queries are executed is rolled back at the end, as is the default in DSS.

In order to perform queries with side-effects such as INSERT or UPDATE, you need to add post_queries=['COMMIT'] to your query_to_df call.

Depending on your database, DDL queries such as CREATE TABLE will also need a COMMIT or not.

Partial recipes

It is possible to execute a “partial recipe” from a Python recipe, to execute a Hive, Pig, Impala or SQL query.

This allows you to use Python to dynamically generate a SQL (resp Hive, Pig, Impala) query and have DSS execute it, as if your recipe was a SQL query recipe.

This is useful when you need complex business logic to generate the final SQL query and can’t do it with only SQL constructs.

Note

Partial recipes are only possible when you are running a Python recipe. It is not available in the notebooks nor outside of DSS.

The partial recipe behaves like the corresponding SQL (resp Hive, Impala) recipe w.r.t. the inputs and outputs. Notably, a Python recipe in which a partial Hive recipe is executed can only have HDFS datasets as inputs and outputs. Likewise, a Impala or SQL partial recipe having only one ouput, the output dataset has to be specified for the partial recipe execution.

In the following example, we make a first query in order to dynamically build the larger query that runs as the “main” query of the recipe.

from dataiku import SQLExecutor2

# get the needed data to prepare the query
# for example, load from another table
executor = SQLExecutor2(dataset=my_auxiliary_dataset)
words = executor.query_to_df(
"SELECT word FROM word_frequency WHERE frequency > 0.01 AND frequency < 0.99")

# prepare a query dynamically
sql = 'SELECT id '
for word in words['word']:
    sql = sql + ", (length(text) - length(regexp_replace(text, '" + word + "', ''))) / " + len(word) + " AS count_" + word
sql = sql + " FROM reviews"

# execute it
# no need to instantiate an executor object, the method is static
my_output_dataset = dataiku.Dataset("my_output_dataset_name")
SQLExecutor2.exec_recipe_fragment(my_output_dataset, sql)

Executing queries (dataikuapi variant)

Note

We recommend using SQLExecutor2 rather, especially inside DSS.

Running a query against DSS is a 3-step process:

  • create the query

  • run it and fetch the data

  • verify that the streaming of the results wasn’t interrupted

The verification will make DSS release the resources taken for the query’s execution, so the verify() call has to be done once the results have been streamed.

An example of a SQL query on a connection configured in DSS is:

streamed_query = client.sql_query('select * from train_set', connection='local_postgres', type='sql')
row_count = 0
for row in streamed_query.iter_rows():
        row_count = row_count + 1
streamed_query.verify() # raises an exception in case something went wrong
print('the query returned %i rows' % count)

Queries against Hive and Impala are also possible. In that case, the type must be set to ‘hive’ or ‘impala’ accordingly, and instead of a connection it is possible to pass a database name:

client = DSSClient(host, apiKey)
streamed_query = client.sql_query('select * from train_set', database='test_area', type='impala')
...

In order to run queries before or after the main query, but still in the same session, for example to set variables in the session, the API provides 2 parameters pre_queries and post_queries which take in arrays of queries:

streamed_query = client.sql_query('select * from train_set', database='test_area', type='hive', pre_queries=['set hive.execution.engine=tez'])
...

Reference documentation

class dataiku.SQLExecutor2(connection=None, dataset=None)

This is a handle to execute SQL statements on a given SQL connection.

The connection is derived from either the connection parameter or the dataset parameter.

Parameters
  • connection (string) – name of the SQL connection

  • dataset – name of a dataset or a dataiku.Dataset object.

static exec_recipe_fragment(output_dataset, query, pre_queries=[], post_queries=[], overwrite_output_schema=True, drop_partitioned_on_schema_mismatch=False)

Executes a SQL query and store the results to the output_dataset after dropping its underlying table.

Parameters
  • output_dataset (object) – dataiku.Dataset output dataset where to write the result of the query.

  • query (str) – SQL main query

  • pre_queries (list) – list of queries to be executed before the main query

  • post_queries (list) – list of queries to be executed after the main query

  • overwrite_output_schema (bool) – if True, generates the output schema from the query results. If False, maintains the existing output schema

  • drop_partitioned_on_schema_mismatch (bool) – for partitioned output datasets. If True, drops all partitions whose schema is inconsistent with that of the dataset. Only relevant when overwrite_output_schema=True

Returns

None

query_to_df(query, pre_queries=None, post_queries=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)

This function returns the result of the main query as a pandas dataframe.

Parameters
  • query (str) – SQL main query

  • pre_queries (list) – list of queries to be executed before the main query

  • post_queries (list) – list of queries to be executed after the main query

  • extra_conf – do not use

  • infer_from_schema (bool) – if True, the resulting pandas dataframe types are set per the SQL query datatypes rather than being inferred by pandas

  • parse_dates (bool) – if True, SQL datetime columns are set as datetime dtypes in the resulting pandas dataframe. The infer_from_schema must be True for this param to be relevant

  • bool_as_str (bool) – whether to cast boolean values as string

  • dtypes (dict) – with key= column name and value=`numpy.dtype()`

  • script_steps – do not use

  • script_input_schema – do not use

  • script_output_schema – do not use

Returns

a pandas dataframe with the result of the query.

query_to_iter(query, pre_queries=None, post_queries=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)

This function returns a QueryReader to iterate on the rows.

Parameters
  • query (str) – the main query

  • pre_queries (list) – list of queries to be executed before the main query

  • post_queries (list) – list of queries to be executed after the main query

  • script_steps – do not use

  • script_input_schema – do not use

  • script_output_schema – do not use

Returns

a QueryReader to iterate on the rows.

class dataiku.HiveExecutor(dataset=None, database=None, connection=None)
static exec_recipe_fragment(query, pre_queries=[], post_queries=[], overwrite_output_schema=True, drop_partitioned_on_schema_mismatch=False, metastore_handling=None, extra_conf={}, add_dku_udf=False)
query_to_df(query, pre_queries=None, post_queries=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)
query_to_iter(query, pre_queries=None, post_queries=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)
class dataiku.ImpalaExecutor(dataset=None, database=None, connection=None)
static exec_recipe_fragment(output_dataset, query, pre_queries=[], post_queries=[], overwrite_output_schema=True, use_stream_mode=True)
query_to_df(query, pre_queries=None, post_queries=None, connection=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)
query_to_iter(query, pre_queries=None, post_queries=None, connection=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)