Is it possible to do a sql merge statement in a “kedro way”? Here is my issue:
I have a pipeline that starts by pulling historical data from an API. It then does some cleaning etc. and then uploads it to our internal database. To upload it, I use pandas.SQLTableDataset.
But the data from the API, being historical, doesn’t change much – if I run the pipeline after a few days, there will be new rows of data and maybe a few correction to old values.
So ideally, I want to be able to only pull about 1 month of data from the API and then use sql-merge to merge that data into the existing database table.
I don’t think there is a kedro dataset that can do this for me. So my initial thoughts is to create a custom dataset that does two steps:
1. uses panda.SQLTableDataset to save the data to a db table (say tbl_data_temp)
2. uses sqlalchemy to execute a merge to merge tbl_data_temp into tbl_data_main
Does this seem like the best approach? I generally dislike doing anything kind of processing inside of custom datasets because, for transparency, I like all the processing to be done in the pipeline/nodes but I can’t think of a better solution.
Thank you
Explanation of Sql Merge: https://www.sqlshack.com/understanding-the-sql-merge-statement/
Hello, I want to use a namespaced pipeline and data catalog to get a series of dataframes, do some manipulations, and then save them all in one Excel spreadsheet in different sheets. I thought something like this would work in the catalog:
"{namespace}.spreadsheet_data": type: pandas.ExcelDataset filepath: data/03_primary/all_data_sources.xlsx save_args: sheet_name: "{namespace}_data"but this doesn't work. I just end up with a spreadsheet with one sheet - with the name of whatever namespace ran last. I.e. it must be overwriting it each time.
Hello all,
I have a question about datasets and namespaces. I am not even sure if what I am asking for is possible.
Here is a simplified version of the issue:
I have postgresql database which updates daily with data (predictions from some other models but that is beside the point)
One of the columns in the results_table is called "run_date". So if I want today's results I can do this:
(in catalog.yml):oneday_of_data:
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)s"
run_date: 2024-11-01
this dataset combined with this one node pipeline lets me get the data from the database into my local drive.
(in pipeline.py)pipeline([
func=lambda x: x,
now, if I wanted more than one day's data as different datasets, it seems like this a great candidate for namespacing because nothing changes except for the run date. Like this:
(in catalog.yml)_run_dates:
run_1: 2024-11-01 #today
run_2: 2024-10-30 #yesterday
run_3: 2024-10-25 #a week ago
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
run_date: ${_run_dates.{run_name}}
but no matter what I try I can't get this to work. I know I can specify {run_name} in the filepath field (if it was a csv dataset say) but is it possible to use inside a templated/variable-interpolated field like this?
I have tried writing my own custom resolver (called "picker") defined as:
(in settings.py)def pick_from_rundates(dict1, key):
return dict1[key]
"base_env": "base",
"default_run_env": "local",
"custom_resolvers": {"picker": lambda x, y: pick_from_rundates(x, y)}
and then tried this...which also failed:
(in catalog.yml)"{run_name}.oneday_of_data":
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
run_date: ${picker:${_run_dates},"{run_name}"}
So am I missing something simple here or is this fundamentally not allowed? Ideally the run dates would be specified in from the globals.yml instead of directly in the catalog.yml but I am trying to walk before I run here.
I will be grateful for any advice here.