I'm trying to read a csv file (by chunks) and then save the result as a parquet partitioned files. The following catalog raises a DatasetError:
"{company}.{layer}.transactions": type: pandas.ParquetDataset filepath: data/{company}/{layer}/transactions save_args: partition_cols: [year, month]The error:
DatasetError: ParquetDataset does not support save argument 'partition_cols'. Please use '
kedro.io
.PartitionedDataset' instead.
Another question from my side.
I have a node which outputs a dictionary called train_test_dts
which I am saving as a pickle with the backend joblib
.
When I then try to run my pipeline with the parallel-runner like this:
kedro run --pipeline feature_engineering --params env=dev,inference_dt=2025-01-05 --runner ParallelRunnerThen I am getting the following error:
AttributeError: The following datasets cannot be used with multiprocessing: ['train_test_dts'] In order to utilize multiprocessing you need to make sure all datasets are serialisable, i.e. datasets should not make use of lambda functions, nested functions, closures etc. If you are using custom decorators ensure they are correctly decorated using functools.wraps().
Would kedro users be opposed defining nodes with decorators? I have written a simple implementation but as I've only recently started using kedro I wonder if I'm missing anything:
The syntax would be:
from kedro.pipeline import Pipeline, node, pipeline @node(inputs=1, outputs="first_sum") def step1(number): return number + 1 @node(inputs="first_sum", outputs="second_sum") def step2(number): return number + 1 @node(inputs="second_sum", outputs="final_result") def step3(number): return number + 2 pipeline = pipeline( [ step1, step2, step3, ] )
was trying out Kedro-Databricks on a simple project and the job got stuck for 15 minutes before I killed it, has it happened to anyone?
Stupid question, the kedro vscode
plugin does not work for me. After installing this and the dependencies I still cannot click on the catalog items. Any standard solution for this?
Hello, I've worked on a lot of Kedro pipelines the past ~year and am a big fan, but there's one detail that I've seen cause some very confusing problems that id like help with.
Whenever there's an error loading a pipeline, whether it be a syntax error, missing import, etc... instead of the Kedro process erroring out, it will just not use that pipeline and continue on without that pipeline. This is not only confusing, but can lead to some pretty big problems in a model without any errors occurring.
I was wondering how I disable this, forcing Kedro to raise errors when loading pipelines? I tried googling but couldn't find anything.
Thanks!
EDIT: was able to run wheel on databricks
Hello, is there a way to get Kedro to create folders in paths if they do not exist? For instance if my data structure is
data: outputs:And I have the catalog entry
data@pandas: type: pandas.CSVDataset filepath: data/outputs/csv_files/data.csvIt would be nice for Kedro to automatically create
csv_files
inside data and store data.csv
afterwards.{'non_existent_subfolder/file_name': data}
for being saved in the catalog entrydata@PartitionedDataset: type: partitions.PartitionedDataset path: data/outputs dataset: type: pandas.ExcelDataset save_args: sheet_name: Sheet1 load_args: sheet_name: Sheet1 filename_suffix: ".xlsx"It would be nice for Kedro to create '
non_existen_subfolder
' automatically inside data/outputs
. I already tryed it and Kedro does not creates folders when they don't exist. Is there a way of changing this default behaviour?what is the best way to overwrite instead of appending the log files? my logging.yml
looks like this
EDIT: just adding mode: w
did the trick
Hello, I would like to know how to manage merge conditions with Kedro for Delta tables. For example: whenMatchedUpdate()
...
https://docs.delta.io/latest/delta-update.html
hello, what is the proper way to add a current timestamp to the names of catalog entries thanks
Hello, I'm facing a memory issue in my Kedro project and I would like to know if there is a kedro-oriented solution.
I am developing a pipeline for processing large datasets of audio recordings. This involves processing several audio files (large numpy arrays) in a single node and storing them again. I was rellaying on partitionedDatasets for doing so but I'm having memory issues because building the dictionary of numpy arrays is quite heavy and always ends up consuming all of my tiny memory.
Is there a way of storing each processed image as soon as it is done instead of storing them in RAM untill the last one is done? Off course this is possible in many ways but my question is regarding Kedro, is it possible saving in the body of the function using Kedro and partitioned datasets? Has any ou you expereienced something like this before?
Bests
Nicolas
[Question: Repo separation btw ETL and ML apps]
Hi all, can I configure the use of ParallelRunner only on the command line or also in parameters.yml?
Issue Summary
Confusion with Credential Configuration in Kedro 0.19 vs 0.18
Hello Kedro team,
I have encountered an issue regarding the configuration of credentials for accessing storage via abfss
in Kedro 0.19.3, which was not present in version 0.18. Here is a summary of the problem:
In Kedro 0.18, I configured the credentials for accessing storage through Spark configurations with Azure Service Principal, and everything worked fine. However, after upgrading to Kedro 0.19.3, the same setup stopped working. After spending a couple of days troubleshooting, I discovered that adding the credentials as environment variables resolved the issue.
My questions are:
How do you avoid over DRY ("Don't Repeat Yourself") using Kedro? I find given the fairly opinionated syntax and project structure that is proprosed it's easy to DRY bits of code that would be best not DRY (e.g. preprocessing code). I wonder if anyone else has had similar thoughts
hello, I'm facing an issue visualizing my pipeline with kedro viz
.
my node runs correctly, and kedro catalog list
shows the right entries, but kedro viz
cannot retrieve credentials defined in my hooks.py. did anyone face the same issue?
kedro viz KeyError: "Unable to find credentials 'adls_creds': check your data catalog and credentials configuration. See <a target="_blank" rel="noopener noreferrer" href="https://kedro.readthedocs.io/en/stable/kedro.io.DataCatalog.html">https://kedro.readthedocs.io/en/stable/kedro.io.DataCatalog.html</a> for an example."
Hi kedro community!! I have encountered an issue when working with kedro within a marimo notebook (I think the issue would be just the same in a jupyter notebook). Basically, I initially was working on my notebook by calling it from the command line from the kedro project root folder, something like: marimo edit notebooks/nb.py
where my folder structure is something like:
├── README.md ├── conf │ ├── base │ ├── local ├── data ... ├── notebooks │ ├── nb.py ├── pyproject.toml ├── requirements.txt ├── src ... └── tests ...Within
nb.py
I have a cell that runs:from kedro.io import DataCatalog from kedro.config import OmegaConfigLoader from kedro.framework.project import settings from pathlib import Path conf_loader = OmegaConfigLoader( conf_source=Path(__file__).parent /settings.CONF_SOURCE, default_run_env = "base" ) catalog = DataCatalog.from_config(conf_loader["catalog"], credentials=conf_loader["credentials"])
weekly_sales = pl.from_pandas( catalog.load("mytable") )
catalog
all the filepaths are absolute and assume that wherever the catalog is being used from is using the Kedro project root level. the conf_source
argument in the OmegaConfigLoader
instance is an absolute path (e.g. conf/base/sql/somequery.sql
or data/mydataset.csv
so if I run my notebook from the root of my kedro project, all is fine but I were to run: cd notebooks; marimo edit nb.py
then catalog.load
will attempt to load the query or dataset from notebooks/conf/base/sql/somequery.sql
Hi has anyone any example on how to use the kedro_datasets.databricks.ManagedTableDataset
from YAML to connect to a databricks managed delta on Azure?
Hello Kedro community,
I am currently developing a project where I need to pass in a dynamic number of catalog dataset entries as inputs to a node. The number of input datasets to this node depends on the primary input dataset being used , particularly the number of unique values in one field.
For instance this node expects tree inputs: a column name (this is fixed and not dynamic), feature datasets ,target datasets. This node basically collates all these datasets together in one object as the output of the node-
feature_df_list = [ f"{group_name_cleaned}.features_with_clusters" for group_name_cleaned in groups_cleaned ] target_df_list = [ f"{group_name_cleaned}.target_with_clusters" for group_name_cleaned in groups_cleaned ] input_dict = { "target_col": "params:target_col", "group_list": feature_df_list, "target_clusters_with_features": target_df_list, } node( func=collate_results, inputs=input_dict, outputs="run_collection", ),
[KEDRO VIZ]
when using pandera decorators. kedro viz shows the code for the decorator instead of the node itself
Hey Kedro community,
I'm currently working on a project trying to use kedro_mlfow
to store kedro_datasets_experimental.netcdf
as artifacts. Unfortunatly I can't make it work.
The problem seems to be path related:
kedro.io.core.DatasetError: Failed while saving data to dataset MlflowNetCDFDataset(filepath=S:/…/data/07_model_output/D2-24-25/idata.nc, load_args={'decode_times': False}, protocol=file, save_args={'mode': w}). 'str' object has no attribute 'as_posix'I tried to investigate it to the best of my abilities and it seems to have to do with the initialization of
NetCDFDataset
. Most Datasets inherit from AbstractVersionedDataset
and will call __init__
with its _filepath as str.NetCDFDataset
is missing it and so the PurePosixPath
is not created. If this should be the problem in the end I don’t know but it is the point where other datasets have its path set. In the meantime I thought it might be because mlflow isn't capable of tracking Datasets which don't inherit from AbstractVersionedDataset
but in kedro-mlfow documentation it says MlflowArtifactDataset
is a wrapper for all AbstractDatasets
.self._filepath = PurePosixPath(filepath)
myself in the sitepackage but getting a Permission error on saving and that’s were my journey has to end. Would have been too good if this oneline would have made it^^"{dataset}.idata": type: kedro_mlflow.io.artifacts.MlflowArtifactDataset dataset: type: kedro_datasets_experimental.netcdf.NetCDFDataset filepath: data/07_model_output/{dataset}/idata.nc save_args: mode: a load_args: decode_times: Falsenode.py
def predict(model, x_data): idata = model.predict(x_data) return az.convert_to_dataset(idata)pipeline.py
pipeline_inference = pipeline( [ node( func=predict, inputs={ "model": f"{dataset}.model", "x_data": f"{dataset}.x_data", }, outputs=f"{dataset}.idata", name=f"{dataset}.predict_node", tags=["training"], ), ] )
Hi Kedro Community!
I've came up with a solution that I believe is not quite optimal to my problem, but I'm quite lost with the provided possibilities and I'd like to check with you a better solution.
Context:
I have 3 tables stored in a DB that I want to query them daily and store the raw data then proceed to process it in my pipelines.
The tables are quite large, so I have to query it in chunks.
Current Solution:
I'm using PartionedDataset with underling pandas.CSVDataset (is there a better data format?)
catalog.yml
_SQLQueryDataset: &SQLquery type: pandas.SQLQueryDataset credentials: db_espelho load_args: chunksize: 5 # testing only "DB.table1": <<: *SQLquery sql: ... "01_raw.{dataset}": type: partitions.PartitionedDataset path: data/01_raw/{dataset} dataset: type: pandas.CSVDataset save_args: index: False filename_suffix: ".csv"
def create_partitions(data_chunks: Iterator[DataFrame]) -> dict[str, Any]: return {f"part-{i:02d}": data for i, data in enumerate(data_chunks)}
create_partions
function all chunks are loaded into memory. Is there a way to avoid that so I can save each chunk at a time?Hi everyone,
I have a question about the ParallelRunner. I have a modular pipeline setup with lots of pipelines that I want to run in parallel. At the end of each one of the modular pipelines the computations are saved to a PostgreSQL database. Now what I am noticing is that even though some pipelines are completing, the memory utilization is never going down. A little more info on my setup, I am running 32 worker on a 72 core machine and I have thousands of modular pipelines that I want to run in parallel. My question is this: Does the ParallelRunner hold on to the python objects until ALL of the modular pipelines are complete?
When switching to OmegaConfLoader, it seems isoformatted dates (e.g. 2025-01-08) are no longer automatically converted to date objects. I thought an easy fix would be to create a custom resolver to do the conversion for me. But then I got an error stating that a date
object was not a valid primitive type when creating OmegaConf objects. The issue seems to be when soft-merging params, you convert resolved dict objects back into OmegaConf objects which caused the error. Is this a bug?