Hi kedroids :kedro:
We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.
We use the bigquery client simply to trigger the SQL query on bigquery as follows:
def trigger_query_on_bigquery( query: str, ): client = bigquery.Client() query_job = client.query_and_wait(query) return True
def create_retail_data_primary_pipeline() -> Pipeline: nodes = [ node( func=trigger_prm_customer_on_big_query, outputs="prm_customer@status", ), node( func=trigger_prm_transaction_detail_ecom_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_ecom@status", ), node( func=trigger_prm_transaction_detail_retail_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_retail@status", ), node( func=trigger_prm_transaction_detail_on_big_query, inputs=[ "prm_transaction_detail_ecom@status", "prm_transaction_detail_retail@status", "prm_product_hierarchy@status", "prm_customer@status", ], outputs="prm_transaction_detail@status", ), node( func=trigger_prm_incident_on_big_query, outputs="prm_incident@status", ), node( func=trigger_prm_product_hierarchy_on_big_query, outputs="prm_product_hierarchy@status", ), ]
@status
(which is just True
), and then use the actual bigquery spark.SparkDataset
transcoded entry versions of these datasets in downstream pipeline to enforce the order.prm_product_hierarchy@bigquery
dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.Yeah so I don't typically like this pattern since it's an 'out of Kedro DAG' flow, you're essentially relying on side effects outside of Kedro.
I think these days id use our Ibis dataset which will let you keep dataframes in your flow, but importantly delegate execution to BQ.
I'm also super interested in seeing where this library goes as you won't even have to change the syntax
https://github.com/eakmanrq/sqlframe
Yes, we have implemented ibis
for other usecases, and I am 100% with the decision. The only thing being, we now already have 15 SQL queries made, and converting them to ibis
isn't possible in the timeframe.
So I think you can still do ibis.sql() which will at least give you objects to pass between nodes
https://ibis-project.org/how-to/extending/sql
Ah thanks for this. The thing is, we really do not want the result of the query to be returned in momory either as ibis.Table
or any other object. Let me check if the .sql
just supports just creating a table, and returning a "reference" to the table
The thing is, we really do not want the result of the query to be returned in momory either as ibis.Table
or any other object.
Ibis will not materialize it in memory; it will just be a reference, unless you .execute()
or something similarGot it. So that means I can return ibis.Table
as an output (kedro_datasets.ibis.TableDataset
) from my node, and it will create the table in bigquery, and then I can use the same catalog entry to load from bigquery for other nodes. Although I want to load it as a spark dataframe downstream, so I would have to do a custom dataset on top of it.
Or create my_dataset@ibis
, my_dataset@spark
entries.
Downstream nodes are already heavy pyspark API dependant (though ibis can handle it too). So minimizing any additional changes. Also don't wanna do parquet, since I did a lot of performance scaling tests with different partition strategies and spark bigquery connector on gcp is a tad bit faster than parquet on gcs.
the other vote of confidence is that BQ built this with Ibis behind the scenes https://cloud.google.com/python/docs/reference/bigframes/latest
Yes, ibis
is now the way to go. The only thing being, legacy code is still half sql, half pyspark, but if ibis
promises to be maintained, then it will become a no brainer for sure π―