Join the Kedro community

Home
Members
Hugo Acosta
H
Hugo Acosta
Offline, last seen last month
Joined September 17, 2024

Hello everyone,

I am encountering some issues regarding the use of placeholders for the data catalog and I was hoping you can shed some light on this .
I have the following pipeline:

load_date = settings.LOAD_DATE_COMPARISON.get("current")
previous_load_date = settings.LOAD_DATE_COMPARISON.get("previous")

def create_pipeline(**kwargs) -> Pipeline:


    format_data_quality = pipeline(
                [   node(
                        func= compare_id,
                        inputs=[f"maestro_indicadores_{load_date}",
                                f"maestro_indicadores_{previous_load_date}"],
                        outputs=f"compare_id_{load_date}_{previous_load_date}",
                        name="compare_id_node",
                        tags = "compare_id"
    ),]
    )
    return format_data_quality
With the corresponding catalog entry for the output:

compare_id_{load_date}_{previous_load_date}:
  type: json.JSONDataset
  filepath: reports/{load_date}/id_comparison/id_comparison_{load_date}_{previous_load_date}.json
The issue here is that whenever the value of load date is something like 2024_07_01, it will generate a path like:
reports/2024/id_comparison/id_comparison_ 2024_07_01_2024_05_01.json

Note that the first placeholder is not being substituted with the intended value, while the others are.
This will only happen when the value of load_date contains underscores, not happening with dots or hyphens.
Why does this happen?

12 comments
R
H
N
V
A

Hi everyone,

I need some help understanding how to define filters in load_args when loading a ParquetDataset with Dask from the catalog.

My catalog entry would be something like:

data:
  type: dask.ParquetDataset
  filepath: data/
  load_args :
    filters: [('filter_1', '==', 1) or
                ('filter_2', '==', 1) or
                ('filter_3', '==', 1) or
                ('filter_4', '==', 1) ]
I tested this exact syntax for filters in the Python API and while it works there, I cannot seem to find a way to make it work using the catalog, since it raises the error:
kedro.io.core.DatasetError: Failed while loading data from data set 
An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: too many values to unpack (expected 3)

2 comments
N

Hello everyone,

I am working on a dynamic pipeline that generates a file for each year in a list, such that the catalog entry would be

data_{year}:
  type: pandas.ExcelDataset
  filepath: reports/folder/data_{year}.xlsx
  save_args:
    index: False
Then, I have another pipeline that aggregates all files to process them loading them as a PartitionedDataset, with entry:

partitioned_data:
  type: partitions.PartitionedDataset
  path: reports/folder
  dataset:
    type: pandas.ExcelDataset
The main problem with my approach is that even though these two entries refer to the same data, they are in fact different entries, so Kedro runs the second pipeline before the dynamic one.
I would appreciate your input on this issue,

Thanks a lot!

6 comments
H
N