Join the Kedro community

Updated 2 weeks ago

The Daily Postgresql Database Update: Accessing Today's Predictions

Hello all,
I have a question about datasets and namespaces. I am not even sure if what I am asking for is possible.
Here is a simplified version of the issue:

I have postgresql database which updates daily with data (predictions from some other models but that is beside the point)
One of the columns in the results_table is called "run_date". So if I want today's results I can do this:

(in catalog.yml):

oneday_of_data:
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)s"
load_args:
params:
run_date: 2024-11-01

this dataset combined with this one node pipeline lets me get the data from the database into my local drive.

(in pipeline.py)

pipeline([
node(
func=lambda x: x,
inputs="database_data",
outputs="local_data",
name="sql_to_local_node"
),]
)

now, if I wanted more than one day's data as different datasets, it seems like this a great candidate for namespacing because nothing changes except for the run date. Like this:

(in catalog.yml)
_run_dates:
run_1: 2024-11-01 #today
run_2: 2024-10-30 #yesterday
run_3: 2024-10-25 #a week ago

"{run_name}.oneday_of_data":
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
load_args:
params:
run_date: ${_run_dates.{run_name}}

but no matter what I try I can't get this to work. I know I can specify {run_name} in the filepath field (if it was a csv dataset say) but is it possible to use inside a templated/variable-interpolated field like this?

I have tried writing my own custom resolver (called "picker") defined as:

(in settings.py)
def pick_from_rundates(dict1, key):
return dict1[key]

CONFIG_LOADER_ARGS = {
"base_env": "base",
"default_run_env": "local",
"custom_resolvers": {"picker": lambda x, y: pick_from_rundates(x, y)}
}


and then tried this...which also failed:

(in catalog.yml)
"{run_name}.oneday_of_data":
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
load_args:
params:
run_date: ${picker:${_run_dates},"{run_name}"}

So am I missing something simple here or is this fundamentally not allowed? Ideally the run dates would be specified in from the globals.yml instead of directly in the catalog.yml but I am trying to walk before I run here.

I will be grateful for any advice here.
minesh

Y
m
7 comments

Not sure if it satisfies your goal, but a workaround could be:

  1. Make your function take a list of dates, instead of a single date
  2. Load all this as a single table from the DB
  3. Partition into dates, and save as PartitionedDataset where something like f"predictions{date}" is key and corresponding predictions are value

So this turns your:

...if I wanted more than one day's data as different datasets...

Into a single dataset, but multiple files

thank you very much Yury for your response. I will give this a go on Monday and report back here.

Hi Yury, just a quick update - this solution worked as long as the dates are hard coded in the sql which (while not ideal) is OK for my needs.


Hmm
I'm wondering why does it depend on hard coded dates

My idea was to do this:

  1. Within your node function, get a dataframe that only contains dates of your interest.
  2. Then do data_by_date: dict[str, pd.DataFrame] = {str(date): data_for_date for date, data_for_date in data.groupby("date")}
  3. This thing is an output of your node function
  4. And it gets saved as PartitionedDataset - which doesn't need to know how many dates you have in advance. It can save 5, 10, 100 dates into separate files - as many as there are keys in that data_by_date dictionary.

Hi Yury, it's an issue with that step 1:
"Within your node function, get a dataframe that only contains dates of your interest."

Hi Yury, it's an issue with that step 1:
"Within your node function, get a dataframe that only contains dates of your interest."

I wanted the "dates of interest" to be a parameterised list in some yml file somewhere. Then I can query the database for those dates: i.e.

select * from mytable
where run_date in ('2024-11-01', '2024-11-02', ... and all other dates from the yml file..)

once I have that data, you are right, I would be able to create the partitioned dataset. Does that make sense or have I missed something?

yeah, it makes sense
I didn't work with SQL datasets in Kedro, so not sure how that typically gets parametrized, but those should be the kwargs of the Dataset in catalog I believe

Add a reply
Sign up and join the conversation on Slack