Join the Kedro community

I'm trying to read a csv file (by chunks) and then save the result as a parquet partitioned files. The following catalog raises a DatasetError:

"{company}.{layer}.transactions":
  type: pandas.ParquetDataset
  filepath: data/{company}/{layer}/transactions
  save_args:
    partition_cols: [year, month]    
The error:
DatasetError: ParquetDataset does not support save argument 'partition_cols'. Please use 'kedro.io.PartitionedDataset' instead.

How am I supposed to do it using PartitionedDatasets and what is the reason behind blocking the use of partition_cols in pandas.ParquetDataset (I'm asking because i could just override it with a custom Dataset)?

6 comments
R
H
N

Another question from my side.

I have a node which outputs a dictionary called train_test_dts which I am saving as a pickle with the backend joblib.
When I then try to run my pipeline with the parallel-runner like this:

kedro run --pipeline feature_engineering --params env=dev,inference_dt=2025-01-05 --runner ParallelRunner
Then I am getting the following error:
AttributeError: The following datasets cannot be used with multiprocessing: ['train_test_dts']
In order to utilize multiprocessing you need to make sure all datasets are serialisable, i.e. datasets should not make use of lambda functions, nested functions, closures etc.
If you are using custom decorators ensure they are correctly decorated using functools.wraps().

Any idea why that happens and what I could do to fix that?

1 comment
P

Would kedro users be opposed defining nodes with decorators? I have written a simple implementation but as I've only recently started using kedro I wonder if I'm missing anything:

The syntax would be:

from kedro.pipeline import Pipeline, node, pipeline

@node(inputs=1, outputs="first_sum")
def step1(number):
   return number + 1

@node(inputs="first_sum", outputs="second_sum")
def step2(number):
   return number + 1 

@node(inputs="second_sum", outputs="final_result")
def step3(number):
   return number + 2

pipeline = pipeline(
   [
       step1,
       step2,
       step3,
   ]
)

the node name could be inferred from the function name

31 comments
N
L
d
D
B

was trying out Kedro-Databricks on a simple project and the job got stuck for 15 minutes before I killed it, has it happened to anyone?

3 comments
N
J

Stupid question, the kedro vscode plugin does not work for me. After installing this and the dependencies I still cannot click on the catalog items. Any standard solution for this?

15 comments
J
P
R
N

Hello, I've worked on a lot of Kedro pipelines the past ~year and am a big fan, but there's one detail that I've seen cause some very confusing problems that id like help with.

Whenever there's an error loading a pipeline, whether it be a syntax error, missing import, etc... instead of the Kedro process erroring out, it will just not use that pipeline and continue on without that pipeline. This is not only confusing, but can lead to some pretty big problems in a model without any errors occurring.

I was wondering how I disable this, forcing Kedro to raise errors when loading pipelines? I tried googling but couldn't find anything.

Thanks!

15 comments
D
B
N

EDIT: was able to run wheel on databricks

Hello, is there a way to get Kedro to create folders in paths if they do not exist? For instance if my data structure is

data:
  outputs:
     
      
And I have the catalog entry
data@pandas:
  type: pandas.CSVDataset
  filepath: data/outputs/csv_files/data.csv
It would be nice for Kedro to automatically create csv_files inside data and store data.csv afterwards.
Or if i'm working with a partitioned dataset, and my nodes returns a dictionary with the structure {'non_existent_subfolder/file_name': data} for being saved in the catalog entry
data@PartitionedDataset:
  type: partitions.PartitionedDataset
  path: data/outputs
  dataset: 
    type: pandas.ExcelDataset
    save_args:
      sheet_name: Sheet1
    load_args:
      sheet_name: Sheet1
  filename_suffix: ".xlsx"
It would be nice for Kedro to create 'non_existen_subfolder' automatically inside data/outputs . I already tryed it and Kedro does not creates folders when they don't exist. Is there a way of changing this default behaviour?

Thank you all in advance :)

4 comments
N
N

what is the best way to overwrite instead of appending the log files? my logging.yml looks like this
EDIT: just adding mode: w did the trick

1 comment
N

Hello, I would like to know how to manage merge conditions with Kedro for Delta tables. For example: whenMatchedUpdate() ...
https://docs.delta.io/latest/delta-update.html

hello, what is the proper way to add a current timestamp to the names of catalog entries thanks

11 comments
R
G
M

Hello, I'm facing a memory issue in my Kedro project and I would like to know if there is a kedro-oriented solution.

I am developing a pipeline for processing large datasets of audio recordings. This involves processing several audio files (large numpy arrays) in a single node and storing them again. I was rellaying on partitionedDatasets for doing so but I'm having memory issues because building the dictionary of numpy arrays is quite heavy and always ends up consuming all of my tiny memory.

Is there a way of storing each processed image as soon as it is done instead of storing them in RAM untill the last one is done? Off course this is possible in many ways but my question is regarding Kedro, is it possible saving in the body of the function using Kedro and partitioned datasets? Has any ou you expereienced something like this before?

Bests
Nicolas

6 comments
R
N
F

[Question: Repo separation btw ETL and ML apps]

3 comments
O
R

Hi all, can I configure the use of ParallelRunner only on the command line or also in parameters.yml?

2 comments
d
P

Issue Summary
Confusion with Credential Configuration in Kedro 0.19 vs 0.18
Hello Kedro team,
I have encountered an issue regarding the configuration of credentials for accessing storage via abfss in Kedro 0.19.3, which was not present in version 0.18. Here is a summary of the problem:
In Kedro 0.18, I configured the credentials for accessing storage through Spark configurations with Azure Service Principal, and everything worked fine. However, after upgrading to Kedro 0.19.3, the same setup stopped working. After spending a couple of days troubleshooting, I discovered that adding the credentials as environment variables resolved the issue.
My questions are:

  1. Does Kedro 0.19.3 read these environment variables directly?
  2. Is this behavior managed by Kedro itself or by the abfss library?

Additionally, it seems redundant to add the credentials both in the Spark configuration and as environment variables. This redundancy is confusing and feels like a bug rather than a feature. Could you please clarify if this is the intended behavior?
Execution Environment:
  • This is being executed in Databricks.
  • The Spark configurations to use Azure Service Principal are added to the Databricks cluster used. (The cluster configuration includes credentials for multiple storages.)
  • Only one storage credentials can be added as environment variables, but since the spark config authenticates the spark session just filling in these values althugh incorrect allows to access the storages.

Thank you for your assistance!

1 comment
J

How do you avoid over DRY ("Don't Repeat Yourself") using Kedro? I find given the fairly opinionated syntax and project structure that is proprosed it's easy to DRY bits of code that would be best not DRY (e.g. preprocessing code). I wonder if anyone else has had similar thoughts

13 comments
d
D
L

hello, I'm facing an issue visualizing my pipeline with kedro viz.
my node runs correctly, and kedro catalog list shows the right entries, but kedro viz cannot retrieve credentials defined in my hooks.py. did anyone face the same issue?

kedro viz
KeyError: "Unable to find credentials 'adls_creds': check your data catalog and credentials configuration. See <a target="_blank" rel="noopener noreferrer" href="https://kedro.readthedocs.io/en/stable/kedro.io.DataCatalog.html">https://kedro.readthedocs.io/en/stable/kedro.io.DataCatalog.html</a> for an example."

15 comments
R
G
R
A

Hi kedro community!! I have encountered an issue when working with kedro within a marimo notebook (I think the issue would be just the same in a jupyter notebook). Basically, I initially was working on my notebook by calling it from the command line from the kedro project root folder, something like: marimo edit notebooks/nb.py where my folder structure is something like:

├── README.md
├── conf
│   ├── base
│   ├── local
├── data ...
├── notebooks
│   ├── nb.py
├── pyproject.toml
├── requirements.txt
├── src ... 
└── tests ...
Within nb.py I have a cell that runs:
from kedro.io import DataCatalog
from kedro.config import OmegaConfigLoader
from kedro.framework.project import settings
from pathlib import Path
conf_loader = OmegaConfigLoader(
    conf_source=Path(__file__).parent /settings.CONF_SOURCE,
    default_run_env = "base"
)

catalog = DataCatalog.from_config(conf_loader["catalog"], credentials=conf_loader["credentials"])

and later...
weekly_sales = pl.from_pandas(
    catalog.load("mytable")
)

The issue is, within the catalog all the filepaths are absolute and assume that wherever the catalog is being used from is using the Kedro project root level. the conf_source argument in the OmegaConfigLoader instance is an absolute path (e.g. conf/base/sql/somequery.sql or data/mydataset.csv so if I run my notebook from the root of my kedro project, all is fine but I were to run: cd notebooks; marimo edit nb.py then catalog.load will attempt to load the query or dataset from notebooks/conf/base/sql/somequery.sql

Is it clear?

PD: please don't ask me why there is SQL code within the conf folder 😅, it's moving soon

10 comments
J
L
R

Hi has anyone any example on how to use the kedro_datasets.databricks.ManagedTableDataset from YAML to connect to a databricks managed delta on Azure?

1 comment
R

Hello Kedro community,

I am currently developing a project where I need to pass in a dynamic number of catalog dataset entries as inputs to a node. The number of input datasets to this node depends on the primary input dataset being used , particularly the number of unique values in one field.

For instance this node expects tree inputs: a column name (this is fixed and not dynamic), feature datasets ,target datasets. This node basically collates all these datasets together in one object as the output of the node-

  • The number of feature and target datasets is dynamic . Can be 1 or 20. They all have catalog entries .
  • I tried creating a list of catalog entry strings to be passed for the feature and target datasets as below-
feature_df_list = [
    f"{group_name_cleaned}.features_with_clusters"
    for group_name_cleaned in groups_cleaned
]

target_df_list = [
    f"{group_name_cleaned}.target_with_clusters"
    for group_name_cleaned in groups_cleaned
]

input_dict = {
    "target_col": "params:target_col",
    "group_list": feature_df_list,
    "target_clusters_with_features": target_df_list,
}


node(
    func=collate_results,
    inputs=input_dict,
    outputs="run_collection",
),
  • But it treats the catalog entries in the list as strings and does not load the datasets required with them

Please help me in trying to understand ow best I can pass dynamic inputs to a node in Kedro :)

9 comments
R
V
P

[KEDRO VIZ]

when using pandera decorators. kedro viz shows the code for the decorator instead of the node itself

1 comment
R

Hey Kedro community,
I'm currently working on a project trying to use kedro_mlfow to store kedro_datasets_experimental.netcdf as artifacts. Unfortunatly I can't make it work.

The problem seems to be path related:

kedro.io.core.DatasetError: 
Failed while saving data to dataset MlflowNetCDFDataset(filepath=S:/…/data/07_model_output/D2-24-25/idata.nc, load_args={'decode_times': False}, protocol=file, save_args={'mode': w}).
'str' object has no attribute 'as_posix'
I tried to investigate it to the best of my abilities and it seems to have to do with the initialization of NetCDFDataset. Most Datasets inherit from AbstractVersionedDataset and will call __init__ with its _filepath as str.
NetCDFDataset is missing it and so the PurePosixPath is not created. If this should be the problem in the end I don’t know but it is the point where other datasets have its path set. In the meantime I thought it might be because mlflow isn't capable of tracking Datasets which don't inherit from AbstractVersionedDataset but in kedro-mlfow documentation it says MlflowArtifactDataset is a wrapper for all AbstractDatasets.

I tried to set the self._filepath = PurePosixPath(filepath) myself in the sitepackage but getting a Permission error on saving and that’s were my journey has to end. Would have been too good if this oneline would have made it^^
Thank you guys for your help

here some reduced code for what I'm trying to achive.

catalog.yml
"{dataset}.idata":
  type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
  dataset:
    type: kedro_datasets_experimental.netcdf.NetCDFDataset
    filepath: data/07_model_output/{dataset}/idata.nc
    save_args:
      mode: a
    load_args:
      decode_times: False
node.py
def predict(model, x_data):

    idata = model.predict(x_data)

    return az.convert_to_dataset(idata)
pipeline.py
pipeline_inference = pipeline(
            [
                node(
                    func=predict,
                    inputs={
                        "model": f"{dataset}.model",
                        "x_data": f"{dataset}.x_data",
                    },
                    outputs=f"{dataset}.idata",
                    name=f"{dataset}.predict_node",
                    tags=["training"],
                ),
            ]
        )

4 comments
J
Y
R
P

Hi Kedro Community!

I've came up with a solution that I believe is not quite optimal to my problem, but I'm quite lost with the provided possibilities and I'd like to check with you a better solution.

Context:
I have 3 tables stored in a DB that I want to query them daily and store the raw data then proceed to process it in my pipelines.
The tables are quite large, so I have to query it in chunks.

Current Solution:
I'm using PartionedDataset with underling pandas.CSVDataset (is there a better data format?)

catalog.yml

_SQLQueryDataset: &SQLquery
  type: pandas.SQLQueryDataset
  credentials: db_espelho
  load_args:
    chunksize: 5  # testing only

"DB.table1":
  <<: *SQLquery
  sql: ...

"01_raw.{dataset}":
  type: partitions.PartitionedDataset
  path: data/01_raw/{dataset}
  dataset:
    type: pandas.CSVDataset
    save_args:
      index: False
  filename_suffix: ".csv"

nodes.py
def create_partitions(data_chunks: Iterator[DataFrame]) -> dict[str, Any]:
    return {f"part-{i:02d}": data for i, data in enumerate(data_chunks)}

The problem that I see here is that in the create_partions function all chunks are loaded into memory. Is there a way to avoid that so I can save each chunk at a time?

An alternative solution is to use a custom CSVDataset as in this doc instead of PartitionedDataset. However, I create a huge csv file that I'll have to process it down the line.

I'm open to any suggestions you might have. I preferer using pure Kedro for now, but if there is a plugin for an open-source tool and both of them (plugin and tool) are easy to setup, I'll be glad to try it.

Bonus question:
One of the tables is a transactions table, so I just need to query the previous day entries. Is it possible to do with kedro only?

1 comment
D

Hi everyone,

I have a question about the ParallelRunner. I have a modular pipeline setup with lots of pipelines that I want to run in parallel. At the end of each one of the modular pipelines the computations are saved to a PostgreSQL database. Now what I am noticing is that even though some pipelines are completing, the memory utilization is never going down. A little more info on my setup, I am running 32 worker on a 72 core machine and I have thousands of modular pipelines that I want to run in parallel. My question is this: Does the ParallelRunner hold on to the python objects until ALL of the modular pipelines are complete?

21 comments
d
J
N

When switching to OmegaConfLoader, it seems isoformatted dates (e.g. 2025-01-08) are no longer automatically converted to date objects. I thought an easy fix would be to create a custom resolver to do the conversion for me. But then I got an error stating that a date object was not a valid primitive type when creating OmegaConf objects. The issue seems to be when soft-merging params, you convert resolved dict objects back into OmegaConf objects which caused the error. Is this a bug?

9 comments
A
M
N