Join the Kedro community

Updated 2 weeks ago

Performing Sql Merge In A Kedro Pipeline

Hello,
Is it possible to do a sql merge statement in a “kedro way”? Here is my issue:
I have a pipeline that starts by pulling historical data from an API. It then does some cleaning etc. and then uploads it to our internal database. To upload it, I use pandas.SQLTableDataset.
But the data from the API, being historical, doesn’t change much – if I run the pipeline after a few days, there will be new rows of data and maybe a few correction to old values.
So ideally, I want to be able to only pull about 1 month of data from the API and then use sql-merge to merge that data into the existing database table.
I don’t think there is a kedro dataset that can do this for me. So my initial thoughts is to create a custom dataset that does two steps:
1. uses panda.SQLTableDataset to save the data to a db table (say tbl_data_temp)
2. uses sqlalchemy to execute a merge to merge tbl_data_temp into tbl_data_main
Does this seem like the best approach? I generally dislike doing anything kind of processing inside of custom datasets because, for transparency, I like all the processing to be done in the pipeline/nodes but I can’t think of a better solution.
Thank you

Explanation of Sql Merge: https://www.sqlshack.com/understanding-the-sql-merge-statement/

1
N
d
m
9 comments

Ibis is best way to do Pythonic (let alone Kedro) SQL

Upserts / Merge in general however, don't currently fit into Kedro's mental model well since reproducibility goes against the ideas of stateful merge

there are ways to achieve it, but it's something we're looking to make better
https://github.com/kedro-org/kedro/issues/3578

thank you both for your prompt reply. I'll give Ibis a go.

any thoughts on what you're trying to achieve also appreciated on that github issue

Unfortunately, merge/upsert is not natively supported in Ibis yet. People have worked around it, and it is something that Ibis would like to have, but nobody has contributed an implementation yet.

For more details, including some of the workarounds people used: https://github.com/ibis-project/ibis/issues/5391

@datajoely, in terms of reproducibility, while I think that changing values in old rows is bad, being able to add a new row of data when an old value is corrected can enhance reproducibility generally. If I add the new row with the current timestamp in the ingestion_date field, I can then always re-create the dataset as it was any time in the past (incorrect data and all) with the appropriate sql query.

yeah you're absolutely right, it's just a non trivial problem!

Add a reply
Sign up and join the conversation on Slack