Hello,
Is it possible to do a sql merge statement in a “kedro way”? Here is my issue:
I have a pipeline that starts by pulling historical data from an API. It then does some cleaning etc. and then uploads it to our internal database. To upload it, I use pandas.SQLTableDataset.
But the data from the API, being historical, doesn’t change much – if I run the pipeline after a few days, there will be new rows of data and maybe a few correction to old values.
So ideally, I want to be able to only pull about 1 month of data from the API and then use sql-merge to merge that data into the existing database table.
I don’t think there is a kedro dataset that can do this for me. So my initial thoughts is to create a custom dataset that does two steps:
1. uses panda.SQLTableDataset to save the data to a db table (say tbl_data_temp)
2. uses sqlalchemy to execute a merge to merge tbl_data_temp into tbl_data_main
Does this seem like the best approach? I generally dislike doing anything kind of processing inside of custom datasets because, for transparency, I like all the processing to be done in the pipeline/nodes but I can’t think of a better solution.
Thank you
Explanation of Sql Merge: https://www.sqlshack.com/understanding-the-sql-merge-statement/
Upserts / Merge in general however, don't currently fit into Kedro's mental model well since reproducibility goes against the ideas of stateful merge
there are ways to achieve it, but it's something we're looking to make better
https://github.com/kedro-org/kedro/issues/3578
Unfortunately, merge/upsert is not natively supported in Ibis yet. People have worked around it, and it is something that Ibis would like to have, but nobody has contributed an implementation yet.
For more details, including some of the workarounds people used: https://github.com/ibis-project/ibis/issues/5391
@datajoely, in terms of reproducibility, while I think that changing values in old rows is bad, being able to add a new row of data when an old value is corrected can enhance reproducibility generally. If I add the new row with the current timestamp in the ingestion_date field, I can then always re-create the dataset as it was any time in the past (incorrect data and all) with the appropriate sql query.