Join the Kedro community

Home
Members
Hugo Barreto
H
Hugo Barreto
Offline, last seen last month
Joined December 3, 2024

Guys, I'm having trouble while trying to run kedro on Azure Function.

The error that I'm getting is No module named recsys

Does anyone know how to make sure the package wheel is installed when publishing the function to azure?

I'm executing the following command to publish it from local to Azure:
func azure functionapp publish FUNC_APP_NAME


Further Info:

Here is my app folder

.
├── __pycache__
│   └── function_app.cpython-311.pyc
├── dist
│   ├── conf-recsys.tar.gz
│   └── recsys-0.1-py3-none-any.whl
├── function_app.py
├── host.json
├── local.settings.json
├── pyproject.toml
└── requirements.txt

The following is the function_app code:
import logging
import subprocess

import azure.functions as func

app = func.FunctionApp()

@app.route(route="DataPipeline", auth_level=func.AuthLevel.ANONYMOUS)
def DataPipeline(
    req: func.HttpRequest,
) -> func.HttpResponse:
    try:
        subprocess.run(
            [
                "python",
                "-m",
                "recsys",
                "-r",
                "ThreadRunner",
                "--conf-source=dist/conf-recsys.tar.gz",
            ],
            check=True,
            capture_output=True,
        )

        logging.info("Data successfully saved to Blob Storage.")

    except Exception as e:
        logging.error(f"Error processing data: {e}")
        return func.HttpResponse(
            f"{e}\n{e.stderr}",
            status_code=500,
        )

    return func.HttpResponse("DB Extraction Succeded")

And requirements.txt
--find-links dist
azure-functions
pyodbc
sqlalchemy
pandas
recsys 

4 comments
J
H

I'm trying to read a csv file (by chunks) and then save the result as a parquet partitioned files. The following catalog raises a DatasetError:

"{company}.{layer}.transactions":
  type: pandas.ParquetDataset
  filepath: data/{company}/{layer}/transactions
  save_args:
    partition_cols: [year, month]    
The error:
DatasetError: ParquetDataset does not support save argument 'partition_cols'. Please use 'kedro.io.PartitionedDataset' instead.

How am I supposed to do it using PartitionedDatasets and what is the reason behind blocking the use of partition_cols in pandas.ParquetDataset (I'm asking because i could just override it with a custom Dataset)?

6 comments
R
H
N

Hi Kedro Community!

I've came up with a solution that I believe is not quite optimal to my problem, but I'm quite lost with the provided possibilities and I'd like to check with you a better solution.

Context:
I have 3 tables stored in a DB that I want to query them daily and store the raw data then proceed to process it in my pipelines.
The tables are quite large, so I have to query it in chunks.

Current Solution:
I'm using PartionedDataset with underling pandas.CSVDataset (is there a better data format?)

catalog.yml

_SQLQueryDataset: &SQLquery
  type: pandas.SQLQueryDataset
  credentials: db_espelho
  load_args:
    chunksize: 5  # testing only

"DB.table1":
  <<: *SQLquery
  sql: ...

"01_raw.{dataset}":
  type: partitions.PartitionedDataset
  path: data/01_raw/{dataset}
  dataset:
    type: pandas.CSVDataset
    save_args:
      index: False
  filename_suffix: ".csv"

nodes.py
def create_partitions(data_chunks: Iterator[DataFrame]) -> dict[str, Any]:
    return {f"part-{i:02d}": data for i, data in enumerate(data_chunks)}

The problem that I see here is that in the create_partions function all chunks are loaded into memory. Is there a way to avoid that so I can save each chunk at a time?

An alternative solution is to use a custom CSVDataset as in this doc instead of PartitionedDataset. However, I create a huge csv file that I'll have to process it down the line.

I'm open to any suggestions you might have. I preferer using pure Kedro for now, but if there is a plugin for an open-source tool and both of them (plugin and tool) are easy to setup, I'll be glad to try it.

Bonus question:
One of the tables is a transactions table, so I just need to query the previous day entries. Is it possible to do with kedro only?

1 comment
D

Guys, it may be a simple question, but I've noticed that git is able to track the session_store.db file. Shouldn't it be ignored ou is it recommed to track it in the repo?

1 comment
H