In the world of DataOps, ensuring efficient and reliable data processing is crucial. Snowflake, a popular cloud data warehousing platform, offers a variety of features to help manage your data operations effectively. In this article, we'll explore a MATE macro that can be used to cancel any queries left running by a DataOps pipeline job in Snowflake. This macro leverages Snowflake's query tag functionality to identify and terminate queries associated with a specific pipeline.
Understanding the Problem:
Data pipelines often involve multiple queries and processes that run in sequence. Occasionally, queries can get stuck in a "RUNNING
" state, consuming resources and potentially causing issues for subsequent jobs. To address this, we need a way to automatically cancel queries related to a particular pipeline that has failed.
The Solution: MATE Macro for Query Cancellation
The MATE macro presented here offers a clean and efficient solution for canceling running queries linked to the current pipeline.
{% macro cancel_running_queries() %}
{% if execute %}
{% set sql -%}
SELECT QUERY_ID, QUERY_TEXT
FROM TABLE(SNOWFLAKE.INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE EXECUTION_STATUS = 'RUNNING'
AND PARSE_JSON(QUERY_TAG):DataOps_Pipeline_ID::INT = {{ env_var('CI_PIPELINE_ID') }}
{%- endset %}
{% set results = run_query(sql) %}
{% for row in results.rows %}
{% if rowo'QUERY_TEXT'] != sql %}
{{ log('Cancelling query ' ~ rowo'QUERY_ID'] ~ '(' ~ rowo'QUERY_TEXT'] ~ ')', True)}}
{% set results = run_query("SELECT SYSTEM$CANCEL_QUERY('" ~ rowo'QUERY_ID'] ~ "')") %}
{{ log(' - ' ~ results.rowsw0]00], True) }}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}
Explanation of the Macro:
-
The
cancel_running_queries
macro is defined to encapsulate the cancellation logic. -
It checks if the macro should execute by evaluating the
execute
condition. This is useful for ensuring that query cancellation only occurs when intended. -
The macro constructs an SQL query that selects all running queries with a specific query tag. The tag includes metadata about the pipeline, making it easy to identify relevant queries.
-
It runs this SQL query using the
run_query
function, obtaining a list of running queries associated with the current pipeline. -
The macro then iterates through the list of running queries and cancels each one by executing the
SYSTEM$CANCEL_QUERY
Snowflake function. -
Detailed logs are generated, providing information about the queries that were canceled.
Using the MATE Macro:
To use this MATE macro in your DataOps pipeline:
-
Include the macro in your pipeline or cleanup job. Example job configuration looks like:
Cancel Abandoned Queries:
extends:
- .modelling_and_transformation_base
- .agent_tag
variables:
TRANSFORM_ACTION: OPERATION
TRANSFORM_OPERATION_NAME: cancel_running_queries
SNOWFLAKE_ROLE: DATAOPS_VAULT(SNOWFLAKE.SOLE.ROLE)
SNOWFLAKE_USER: DATAOPS_VAULT(SNOWFLAKE.SOLE.USERNAME)
SNOWFLAKE_WAREHOUSE: SAM
stage: Clean Up
script: /dataops
icon: ${SNOWFLAKE_ICON}
when: on_failure
Conclusion:
Efficiently managing Snowflake queries in DataOps pipelines is crucial for maintaining smooth and reliable data processing. The MATE macro described in this article provides a straightforward way to automatically cancel queries associated with the current pipeline, helping you avoid resource wastage and potential job conflicts. By leveraging Snowflake's query tags generated by DataOps and the power of macros, you can streamline your data operations and ensure the timely completion of your jobs.