Join the Kedro community

M
M
M
D
M
Members
Vishal Pandey
V
Vishal Pandey
Offline, last seen 19 hours ago
Joined September 13, 2024

Hey Everyone

Interested to know from you people which orchestration service you guys prefer to run kedro in production environments and how has been the experience so far

Recently I have been trying to run kedro on kubeflow and have been facing multiple issues.

11 comments
G
J
m
V
M

I am trying to use custom resolvers to provide credentials in catalog.yml

document_classification:
type: ibis.TableDataset
table_name: document_classification
connection:
backend: ${oc.env:BACKEND}
host: ${oc.env:HOST}
port: ${oc.env:PORT}
database: ${oc.env:DATABASE}
user: ${oc.env:USER}
password: ${oc.env:PASSWORD}


CONFIG_LOADER_ARGS = {
"base_env": "base",
"default_run_env": "local",
"custom_resolvers" : {
"oc.env" : oc.env

}
}

Is it the right way to do it

34 comments
2
N
L
d
V
J

Hey Everyone

I am getting below errors while the pipeline is trying to push some data to s3. Any headsup ?

ClientError: An error occurred (400) when calling the HeadObject operation: Bad Request
The above exception was the direct cause of the following exception:

DatasetError: Failed while saving data to data set CSVDataset(filepath=ml-datawarehouse/warehouse/extraction/doc_table_insert.csv, load_args={},
protocol=s3, save_args={'index': False}, version=Version(load=None, save='2024-10-15T15.35.46.341Z')).
[Errno 22] Bad Request

3 comments
R
V

Hi everyone

I have been exploring ibis for sometime. I just wanted to understand is there a better way to write the below code in a more optimised fashion

import ibis

con = ibis.connect(POSTGRES_CONNECTION_STRING)
training_meta_table:ir.Table = con.table("training_metadata")

filters = {
    "customer_ids" : [59] ,
    "queue_names" : ["General Lit - Misclassifications", "MoveDocs-MR"],
    "start_date" : "2024-09-5 00:00:00",
    "end_date" : "2024-09-11 00:00:00",
    "doc_types" : [],
    "fields" : ["patientFirstName", "patientLastName", "Service Date", "Doctor"]
}
field_conditions = training_meta_table.fields_present.contains(filters["fields"][0]) | training_meta_table.fields_present.contains(filters["fields"][1]) | training_meta_table.fields_present.contains(filters["fields"][2]) | training_meta_table.fields_present.contains(filters["fields"][3])

So there are many or conditions we would like to dynamically join together to create 1 final condition based on the input filters

5 comments
V
d

Hey Everyone Interested to know how do you guys manage your requirements.txt file to reproduce the same environment. What tools do you prefer to keep the requirements.txt file updated

60 comments
d
V
J

Hello everyone

Just wanted to know is there a way to access values of command line arguments like --env in our kedro pipeline source code.

1 comment
d

Hey Folks I am looking for a way to mount AWS EFS volume to my kedro pipeline which will be executed by kubeflow . I am using the kubeflow plugin.
The config has below 2 options for Volumes , I am not sure which one is for what purpose

  volume:

    # Storage class - use null (or no value) to use the default storage
    # class deployed on the Kubernetes cluster
    storageclass: # default

    # The size of the volume that is created. Applicable for some storage
    # classes
    size: 1Gi

    # Access mode of the volume used to exchange data. ReadWriteMany is
    # preferred, but it is not supported on some environements (like GKE)
    # Default value: ReadWriteOnce
    #access_modes: [ReadWriteMany]

    # Flag indicating if the data-volume-init step (copying raw data to the
    # fresh volume) should be skipped
    skip_init: False

    # Allows to specify user executing pipelines within containers
    # Default: root user (to avoid issues with volumes in GKE)
    owner: 0

    # Flak indicating if volume for inter-node data exchange should be
    # kept after the pipeline is deleted
    keep: False
2.
  # Optional section to allow mounting additional volumes (such as EmptyDir)
  # to specific nodes
  extra_volumes:
    tensorflow_step:
    - mount_path: /dev/shm
      volume:
        name: shared_memory
        empty_dir:
          cls: V1EmptyDirVolumeSource
          params:
            medium: Memory

8 comments
V
m
N

Hello Team

Can someone help me to use kubernetes Secrets within kedro pipelines to connect with S3 or any other external service.

2 comments
m
V

Hello folks
Wanted to hear from all of you, how do you document your data pipelines.
I recently started building data pipeline and feels like there are many things which shall be documented. But is there any tool / template to achieve that in a more structured manner.

4 comments
d
R
V

Hello everyone

Any idea about how to resolve this validation issue

6 comments
N
I
F

Hey Everyone

I wanted to know more about kedro CLI that we have . So there are arguments like --env , --nodes , -- pipelines which we pass using the kedro run command .

So for any given plugin related to deployments like airflow , kubeflow . How can we supply these arguments ?

4 comments
V
m
N

Is there a way to store a python object and make it available throughout the entire kedro lifecycle or a run.

So i want some way to generate an uuid before the pipeline starts which can be done with the help of hooks but how do i make it available throughput all the hook implementations and probably the nodes also.

4 comments
L
V
N

Hello everyone

This is regarding the kubeflow plugin.

I wanted to just gain some information about how kedro nodes are executed by kubeflow.

Does kubeflow run each node in a separate container ?? or separate pods ?? or all of nodes are executed in the same container

95 comments
V
A

The omegaconfigloader scans the conf/ for subdirectories like base/ and local/ . Suppose , we have a catalog.yml file in both base/ and local/ .

Can we have the same top level keys in catalog.yml for base/ and local/ ??

7 comments
D
M
V

Hello everyone

Can we access params and runtime params in catalog

4 comments
L
V
A

A quick question , i would like to register the pipelines by making an entry in a database with some keys like pipeline_id, name , description whenever the pipeline is executed for the first time in production. This simply requires creating a db connection and running an insert query. I would like to better understand the below things -

  1. Where can I store pipeline specific metadata in the kedro project . Let's say we have 3 pipelines defined in a project data_extraction , data_processing , model_training .
  2. How can we read all these metadata followed by creating a db connection and finally executing the insert operation.
  3. Last, What is the best place to achieve such tasks in kedro project. Is it Hooks ? Like We can run this logic after the context is created .

45 comments
V
L

Hey Team a very naive question. How can we know if a pipeline ran successfully. So basically, I would like to record the status of each pipeline run as success or Failure. I wanted to know the best way to have it in a kedro project.

4 comments
M
V
N

Hey team I want to push files to s3 but with dynamic names e.g appending a timestamp to the file just to store multiple copies of the file .

dummy_csv:
type: pandas.CSVDataset
filepath: s3://ml-datawarehouse/warehouse/test.csv
credentials: dev_s3

Right now, test.csv is being overwritten in S3. Is conf-resolver the answer to this question . I tried resolver like below , but no success yet -

dummy_csv:
type: pandas.CSVDataset
filepath: s3://ml-datawarehouse/warehouse/test_"{$today:}".csv
credentials: dev_s3


from datetime import date


CONFIG_LOADER_ARGS = {
    "custom_resolvers": {
        "today": lambda: date.today(),
    }
}

5 comments
M
V
L

Is there a better way of defining the credentials for ibis.TableDataset ? Looks like ibis.TableDataset doesn't supports credentials defined in credentials.yml

4 comments
N
V
D

Hello everyone

Is there a way to load a flat file from S3 based on some conditions like pulling the latest file from the mentioned bucket.

19 comments
L
V

Hey Everyone, I am writing a basic ETL pipeline using Kedro. My data sources are Postgresql and AWS EFS and the destination Storage is again a SQL backend only.
As per some of the discussions in the forum , I found ibis as the best choice for good data connector as it can connect to many of the backends and thus provides flexibility in switching the backends and ofcourse the pythonic way of writing complex queries.

This is actually regarding ibis , So I have used Ibis to extract data from a postgres backend. For extraction i have used simple functions like .select(), filter(), joins() .

Did some transformations and pushed the transformed data as a csv file to S3.

Now one of the node would like to read data from S3 and push the data into our warehouse which is again an sql backend for this use case. How do we do that now , can i somehow utilise ibis again ?

any headsup guys ?

33 comments
V
D
J

Why is Rich handler inconsistent in formatting the logs. I don't understand why doesn't it prints Timestamps for nodes.py module. any headsup .

4 comments
N
d
V