Join the Kedro community

Home
Members
Hugo Barreto
H
Hugo Barreto
Offline, last seen 6 hours ago
Joined December 3, 2024

I'm trying to read a csv file (by chunks) and then save the result as a parquet partitioned files. The following catalog raises a DatasetError:

"{company}.{layer}.transactions":
  type: pandas.ParquetDataset
  filepath: data/{company}/{layer}/transactions
  save_args:
    partition_cols: [year, month]    
The error:
DatasetError: ParquetDataset does not support save argument 'partition_cols'. Please use 'kedro.io.PartitionedDataset' instead.

How am I supposed to do it using PartitionedDatasets and what is the reason behind blocking the use of partition_cols in pandas.ParquetDataset (I'm asking because i could just override it with a custom Dataset)?

6 comments
R
H
N

Hi Kedro Community!

I've came up with a solution that I believe is not quite optimal to my problem, but I'm quite lost with the provided possibilities and I'd like to check with you a better solution.

Context:
I have 3 tables stored in a DB that I want to query them daily and store the raw data then proceed to process it in my pipelines.
The tables are quite large, so I have to query it in chunks.

Current Solution:
I'm using PartionedDataset with underling pandas.CSVDataset (is there a better data format?)

catalog.yml

_SQLQueryDataset: &SQLquery
  type: pandas.SQLQueryDataset
  credentials: db_espelho
  load_args:
    chunksize: 5  # testing only

"DB.table1":
  <<: *SQLquery
  sql: ...

"01_raw.{dataset}":
  type: partitions.PartitionedDataset
  path: data/01_raw/{dataset}
  dataset:
    type: pandas.CSVDataset
    save_args:
      index: False
  filename_suffix: ".csv"

nodes.py
def create_partitions(data_chunks: Iterator[DataFrame]) -> dict[str, Any]:
    return {f"part-{i:02d}": data for i, data in enumerate(data_chunks)}

The problem that I see here is that in the create_partions function all chunks are loaded into memory. Is there a way to avoid that so I can save each chunk at a time?

An alternative solution is to use a custom CSVDataset as in this doc instead of PartitionedDataset. However, I create a huge csv file that I'll have to process it down the line.

I'm open to any suggestions you might have. I preferer using pure Kedro for now, but if there is a plugin for an open-source tool and both of them (plugin and tool) are easy to setup, I'll be glad to try it.

Bonus question:
One of the tables is a transactions table, so I just need to query the previous day entries. Is it possible to do with kedro only?

1 comment
D

Guys, it may be a simple question, but I've noticed that git is able to track the session_store.db file. Shouldn't it be ignored ou is it recommed to track it in the repo?

1 comment
H