Join the Kedro community

M
M
M
D
M

Kedro pipeline scheduling bigquery queries in order

Hi kedroids :kedro:

We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.

We use the bigquery client simply to trigger the SQL query on bigquery as follows:

def trigger_query_on_bigquery(
    query: str,
):
    client = bigquery.Client()
    query_job = client.query_and_wait(query)

    return True

The kedro dag to schedule multiple queries in order looks as follows:

def create_retail_data_primary_pipeline() -> Pipeline:
    nodes = [
        node(
            func=trigger_prm_customer_on_big_query,
            outputs="prm_customer@status",
        ),
        node(
            func=trigger_prm_transaction_detail_ecom_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_ecom@status",
        ),
        node(
            func=trigger_prm_transaction_detail_retail_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_retail@status",
        ),
        node(
            func=trigger_prm_transaction_detail_on_big_query,
            inputs=[
                "prm_transaction_detail_ecom@status",
                "prm_transaction_detail_retail@status",
                "prm_product_hierarchy@status",
                "prm_customer@status",
            ],
            outputs="prm_transaction_detail@status",
        ),
        node(
            func=trigger_prm_incident_on_big_query,
            outputs="prm_incident@status",
        ),
        node(
            func=trigger_prm_product_hierarchy_on_big_query,
            outputs="prm_product_hierarchy@status",
        ),

    ]

since the node can't output the dataframe itself, we output a transcoded entry with @status (which is just True), and then use the actual bigquery spark.SparkDataset transcoded entry versions of these datasets in downstream pipeline to enforce the order.

So I will use prm_product_hierarchy@bigquery dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.

Is there a better way to do this?

d
A
D
14 comments

Yeah so I don't typically like this pattern since it's an 'out of Kedro DAG' flow, you're essentially relying on side effects outside of Kedro.


I think these days id use our Ibis dataset which will let you keep dataframes in your flow, but importantly delegate execution to BQ.

It will also mean you don't need to use spark

I'm also super interested in seeing where this library goes as you won't even have to change the syntax

https://github.com/eakmanrq/sqlframe

Yes, we have implemented ibis for other usecases, and I am 100% with the decision. The only thing being, we now already have 15 SQL queries made, and converting them to ibis isn't possible in the timeframe.

So I think you can still do ibis.sql() which will at least give you objects to pass between nodes

https://ibis-project.org/how-to/extending/sql

Ah thanks for this. The thing is, we really do not want the result of the query to be returned in momory either as ibis.Table or any other object. Let me check if the .sql just supports just creating a table, and returning a "reference" to the table

I think that's what it does, but may not do the create table - just temp

The thing is, we really do not want the result of the query to be returned in momory either as ibis.Table or any other object.
Ibis will not materialize it in memory; it will just be a reference, unless you .execute() or something similar

Got it. So that means I can return ibis.Table as an output (kedro_datasets.ibis.TableDataset) from my node, and it will create the table in bigquery, and then I can use the same catalog entry to load from bigquery for other nodes. Although I want to load it as a spark dataframe downstream, so I would have to do a custom dataset on top of it.

Or create my_dataset@ibis, my_dataset@spark entries.

  • You could handover as parquet
  • You could in fact do more in BQ without spark because of Ibis

Downstream nodes are already heavy pyspark API dependant (though ibis can handle it too). So minimizing any additional changes. Also don't wanna do parquet, since I did a lot of performance scaling tests with different partition strategies and spark bigquery connector on gcp is a tad bit faster than parquet on gcs.

makes a lot of sense

the other vote of confidence is that BQ built this with Ibis behind the scenes https://cloud.google.com/python/docs/reference/bigframes/latest

Yes, ibis is now the way to go. The only thing being, legacy code is still half sql, half pyspark, but if ibis promises to be maintained, then it will become a no brainer for sure 💯

Add a reply
Sign up and join the conversation on Slack
Join