Performing SQL, Hive and Impala queries¶
You can use the Python APIs to execute SQL queries on any SQL connection in DSS (including Hive and Impala).
Note
There are three capabilities related to performing SQL queries in Dataiku’s Python APIs:
dataiku.SQLExecutor2
,dataiku.HiveExecutor
anddataiku.ImpalaExecutor
in thedataiku
package. It was initially designed for usage within DSS in recipes and Jupyter notebooks. These are used to perform queries and retrieve results, either as an iterator or as a pandas dataframe“partial recipes”. It is possible to execute a “partial recipe” from a Python recipe, to execute a Hive, Impala or SQL query. This allows you to use Python to dynamically generate a SQL (resp Hive, Pig, Impala) query and have DSS execute it, as if your recipe was a SQL query recipe. This is useful when you need complex business logic to generate the final SQL query and can’t do it with only SQL constructs.
dataikuapi.DSSClient.sql_query()
in thedataikuapi
package. This function was initially designed for usage outside of DSS and only supports returning results as an iterator. It does not support pandas dataframe
We recommend the usage of the dataiku
variants.
For more details on the two packages, please see Python APIs
Executing queries¶
You can retrieve the results of a SELECT query as a Pandas dataframe.
from dataiku import SQLExecutor2
executor = SQLExecutor2(connection="db-connection") # or dataset="dataset_name"
df = executor.query_to_df("SELECT col1, COUNT(*) as count FROM mytable")
# df is a Pandas dataframe with two columns : "col1" and "count"
Alternatively, you can retrieve the results of a query as an iterator.
from dataiku import SQLExecutor2
executor = SQLExecutor2(connection="db-connection")
query_reader = executor.query_to_iter("SELECT * FROM mytable")
query_iterator = query_reader.iter_tuples()
Queries with side-effects¶
For databases supporting commit, the transaction in which the queries are executed is rolled back at the end, as is the default in DSS.
In order to perform queries with side-effects such as INSERT or UPDATE, you need to add post_queries=['COMMIT']
to your query_to_df
call.
Depending on your database, DDL queries such as CREATE TABLE
will also need a COMMIT
or not.
Partial recipes¶
It is possible to execute a “partial recipe” from a Python recipe, to execute a Hive, Pig, Impala or SQL query.
This allows you to use Python to dynamically generate a SQL (resp Hive, Pig, Impala) query and have DSS execute it, as if your recipe was a SQL query recipe.
This is useful when you need complex business logic to generate the final SQL query and can’t do it with only SQL constructs.
Note
Partial recipes are only possible when you are running a Python recipe. It is not available in the notebooks nor outside of DSS.
The partial recipe behaves like the corresponding SQL (resp Hive, Impala) recipe w.r.t. the inputs and outputs. Notably, a Python recipe in which a partial Hive recipe is executed can only have HDFS datasets as inputs and outputs. Likewise, a Impala or SQL partial recipe having only one ouput, the output dataset has to be specified for the partial recipe execution.
In the following example, we make a first query in order to dynamically build the larger query that runs as the “main” query of the recipe.
from dataiku import SQLExecutor2
# get the needed data to prepare the query
# for example, load from another table
executor = SQLExecutor2(dataset=my_auxiliary_dataset)
words = executor.query_to_df(
"SELECT word FROM word_frequency WHERE frequency > 0.01 AND frequency < 0.99")
# prepare a query dynamically
sql = 'SELECT id '
for word in words['word']:
sql = sql + ", (length(text) - length(regexp_replace(text, '" + word + "', ''))) / " + len(word) + " AS count_" + word
sql = sql + " FROM reviews"
# execute it
# no need to instantiate an executor object, the method is static
my_output_dataset = dataiku.Dataset("my_output_dataset_name")
SQLExecutor2.exec_recipe_fragment(my_output_dataset, sql)
Executing queries (dataikuapi variant)¶
Note
We recommend using SQLExecutor2
rather, especially inside DSS.
Running a query against DSS is a 3-step process:
create the query
run it and fetch the data
verify that the streaming of the results wasn’t interrupted
The verification will make DSS release the resources taken for the query’s execution, so the verify() call has to be done once the results have been streamed.
An example of a SQL query on a connection configured in DSS is:
streamed_query = client.sql_query('select * from train_set', connection='local_postgres', type='sql')
row_count = 0
for row in streamed_query.iter_rows():
row_count = row_count + 1
streamed_query.verify() # raises an exception in case something went wrong
print('the query returned %i rows' % count)
Queries against Hive and Impala are also possible. In that case, the type must be set to ‘hive’ or ‘impala’ accordingly, and instead of a connection it is possible to pass a database name:
client = DSSClient(host, apiKey)
streamed_query = client.sql_query('select * from train_set', database='test_area', type='impala')
...
In order to run queries before or after the main query, but still in the same session, for example to set variables in the session, the API provides 2 parameters pre_queries and post_queries which take in arrays of queries:
streamed_query = client.sql_query('select * from train_set', database='test_area', type='hive', pre_queries=['set hive.execution.engine=tez'])
...
Reference documentation¶
-
class
dataiku.
SQLExecutor2
(connection=None, dataset=None)¶ This is a handle to execute SQL statements on a given SQL connection.
The connection is derived from either the connection parameter or the dataset parameter.
- Parameters
connection (string) – name of the SQL connection
dataset – name of a dataset or a
dataiku.Dataset
object.
-
static
exec_recipe_fragment
(output_dataset, query, pre_queries=[], post_queries=[], overwrite_output_schema=True, drop_partitioned_on_schema_mismatch=False)¶ Executes a SQL query and store the results to the output_dataset after dropping its underlying table.
- Parameters
output_dataset (object) –
dataiku.Dataset
output dataset where to write the result of the query.query (str) – SQL main query
pre_queries (list) – list of queries to be executed before the main query
post_queries (list) – list of queries to be executed after the main query
overwrite_output_schema (bool) – if True, generates the output schema from the query results. If False, maintains the existing output schema
drop_partitioned_on_schema_mismatch (bool) – for partitioned output datasets. If True, drops all partitions whose schema is inconsistent with that of the dataset. Only relevant when overwrite_output_schema=True
- Returns
None
-
query_to_df
(query, pre_queries=None, post_queries=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)¶ This function returns the result of the main query as a pandas dataframe.
- Parameters
query (str) – SQL main query
pre_queries (list) – list of queries to be executed before the main query
post_queries (list) – list of queries to be executed after the main query
extra_conf – do not use
infer_from_schema (bool) – if True, the resulting pandas dataframe types are set per the SQL query datatypes rather than being inferred by pandas
parse_dates (bool) – if True, SQL datetime columns are set as datetime dtypes in the resulting pandas dataframe. The infer_from_schema must be True for this param to be relevant
bool_as_str (bool) – whether to cast boolean values as string
dtypes (dict) – with key= column name and value=`numpy.dtype()`
script_steps – do not use
script_input_schema – do not use
script_output_schema – do not use
- Returns
a pandas dataframe with the result of the query.
-
query_to_iter
(query, pre_queries=None, post_queries=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)¶ This function returns a
QueryReader
to iterate on the rows.- Parameters
query (str) – the main query
pre_queries (list) – list of queries to be executed before the main query
post_queries (list) – list of queries to be executed after the main query
script_steps – do not use
script_input_schema – do not use
script_output_schema – do not use
- Returns
a
QueryReader
to iterate on the rows.
-
class
dataiku.
HiveExecutor
(dataset=None, database=None, connection=None)¶ -
static
exec_recipe_fragment
(query, pre_queries=[], post_queries=[], overwrite_output_schema=True, drop_partitioned_on_schema_mismatch=False, metastore_handling=None, extra_conf={}, add_dku_udf=False)¶
-
query_to_df
(query, pre_queries=None, post_queries=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)¶
-
query_to_iter
(query, pre_queries=None, post_queries=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)¶
-
static
-
class
dataiku.
ImpalaExecutor
(dataset=None, database=None, connection=None)¶ -
static
exec_recipe_fragment
(output_dataset, query, pre_queries=[], post_queries=[], overwrite_output_schema=True, use_stream_mode=True)¶
-
query_to_df
(query, pre_queries=None, post_queries=None, connection=None, extra_conf={}, infer_from_schema=False, parse_dates=True, bool_as_str=False, dtypes=None, script_steps=None, script_input_schema=None, script_output_schema=None)¶
-
query_to_iter
(query, pre_queries=None, post_queries=None, connection=None, extra_conf={}, script_steps=None, script_input_schema=None, script_output_schema=None)¶
-
static