Join the Kedro community

M
M
M
D
M

Registering pipeline metadata in a database

A quick question , i would like to register the pipelines by making an entry in a database with some keys like pipeline_id, name , description whenever the pipeline is executed for the first time in production. This simply requires creating a db connection and running an insert query. I would like to better understand the below things -

  1. Where can I store pipeline specific metadata in the kedro project . Let's say we have 3 pipelines defined in a project data_extraction , data_processing , model_training .
  2. How can we read all these metadata followed by creating a db connection and finally executing the insert operation.
  3. Last, What is the best place to achieve such tasks in kedro project. Is it Hooks ? Like We can run this logic after the context is created .

L
V
45 comments

Hi Vishal, yes a hook could work. There is a after_context_created hook in which you could implement the logic.

Question is, should you? It seems like you'd like tracking across pipeline runs? Have you considered the MLFlow plugin? Seems this could provide a good starting point.

https://github.com/Galileo-Galilei/kedro-mlflow

yeaah do you think , Tracking pipeline and their respective runs should go into MLFlow.

Till now I was thinking to create some tables in db like

pipeline_details

pipeline_id - PK
pipeline_name - 
description
created_date
created_by
run_details
run_id - PK
triggered_at
time_taken
triggered_by

Idea is to push logs for each run into the ELK ecosystem as well track the above discussed metrics

Context: Reason I'm bringing up MLFlow is because we're currently integrating it, and our experience with it so far is positive. I'm afraid your DB approach will hit limits quickly, e.g., imagine you want to start logging parameters as part of the run, or model artifacts, or charts with model convergence etc. All of this is already provided by MLFlow (maybe there are other alternatives though, but we've gone for this one)

I think logging whether the pipeline fail/success is also important, and then question becomes how would you track that?

As the pipeline grows, you might want to execute nodes in parallel, so then you can't determine pipeline fail/success on the single kedro run invocation

Hey Thanks for responding and sharing your experience.

Currently, We are building Data Pipelines(ETL) , Which will simply extract data from data sources like sql dbs and perform some transformation . This transformed data is stored in flat files like csv and will be Versioned while storing on S3 kind of service. This Versioned data is then Loaded into Some warehouse as part of the Load component.


Why do we want to store pipeline and runs related information is because this ETL pipeline will be scheduled to run for let's say every day or every week. So we would like to know when exactly the last pipeline was triggered so that the new run can know beforehand what all data needs to be pulled from the last run date.

Can you also share some way of Registering the pipeline whenever it is executed for the first time. Like where exactly we can maintain some basic information regarding the pipeline , like some identifier , description etc in our kedro project and then may be the hooks can read all these information and check such pipeline is already registered with MLFlow , if not it can register it and within that the pipeline can start logging the metrics as various runs.

IF you need that, what about considering Airflow? Pipeline scheduling and tracking remains a difficult topic.

If you still want to implement it yourself, you will have to add your own way of identifying pipeline runs, Kedro does not maintain state, so it does not know what it means for it to "execute a first time".

I am thinking of maintaining a config file which can store Pipeline related metadata and the hook implementation can read this config . Hooks can query using MLflow API to find out if an Experiment Exists corresponding to a pipeline . In case it doesn't exists , it can simply create an Experiment.

But we would like to give some unique id to each run so that we can identify the logs being generated in specific run in the ELK stack as I mentioned earlier. Is it possible for us to create a run with a custom name ?

haha you're touching some interesting territory, and there are some issues here

  1. MLFlow does not allow creating runs with a custom <i>name. </i>The runs have a unique run id that you should use, but MLFlow is the one generating it.
  1. I've therefore implemented functionality to create a run with a specific name, which essentially does a search in the runs to see if the run with the name exists. If it does not exist, it creates it, otherwise it returns the id.
  1. We use Argo Workflows to orchestrate our pipelines, Argo does generate a unique run for each pipeline so we use that run to identify Kedro in MLFlow. We use argo workflows as we run everything on k8s, though Airflow might be a better option if you're so interested in tracking runtimes etc

In our case we would like to run our kedro pipelines on kubeflow.

Then kubeflow could generate the identifier, and you could set a env variable and read that with Kedro I think (I've not used Kubeflow before)

I think i will need more clarity here. So we are the ones who is uploading the kedro pipeline on kubeflow using the kedro-kubeflow plugin. They have exposed couple of commands mentioned below -

1. Kedro-kubeflow init -> This creates a config file as mentioned in this link - https://kedro-kubeflow.readthedocs.io/en/0.7.4/source/02_installation/02_configuration.html
2. kedro-kubeflow upload_pipeline uses the above generated config and converts the kedro DAG into Kubeflow Compatible DAG and publishes the pipeline on kubeflow.


If you carefully see, there are run related configs present in the config file being generated. Any heads up from here as you already know our use case.

I mean the use case is to simply track the run level metrics and dump all the logs to ELK stack, this should not be a problem if we can produce a unique run_id or name. But I see some challenges here -

  1. Once a pipeline is scheduled to be running using a command like kedro-kubeflow run , I am not sure how can we get unique run_ids .

I'm not familiar with the kubeflow plugin unfortunately

Based on a quick searxh

each pipeline in kubeflow gets a KFP_RUN_ID which is set in the env variable

that should help you no? then you can plug into the id kubeflow uses

But how do i fetch this run id in kedro ??

You mean they must be available in env variables

Simple os.environ.get(“KFP_RUN_ID”) should do the job ?

you can use omegaconf

${oc.env:KFP_RUN_ID}

which you can use in the catalog

Will get back to you on this 🫡

gave you wrong link

this way you can literally turn it into a pipeline param or global and have it picked up

(or add it directly to the kubeflow config)

in that case MLflow still looks a viable option to track metrics 😊

yep, though I've implemented https://github.com/Galileo-Galilei/kedro-mlflow/issues/579 to ensure I have a stable run name for my runs

cause the plugin always generates a <i>new</i> name, so if you would re-run a workflow it would list as a <i>new</i> run in MLFlow

the above hook ensures a stable name is used, so the reruns keep working (and write to the same MLFlow run)

Need to go through this as well

any thoughts on such use case

Opening this discussion again.

just to re-iterate you mentioned that there is a unique id being generated for each kubeflow run and this is luckily stored in env variables .
And we can use a a resolver like oc.env to access this run_id in the kedro project , and probably set in the globals.yml and then we can re use it anywhere needed like in parameters.yml or catalog.yml or we can directly use oc.env wherever it is permitted across the files in conf/

Add a reply
Sign up and join the conversation on Slack
Join