Join the Kedro community

Hi folks,
We have our own MLFlow server on internal S3.
Below are the setting I used locally:

os.environ["MLFLOW_TRACKING_URI"] = "<a target="_blank" rel="noopener noreferrer" href="https://xxx.com/mlflow/">https://xxx.com/mlflow/</a>"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "<a target="_blank" rel="noopener noreferrer" href="http://s3xxx.com">http://s3xxx.com</a>"
os.environ["S3_BUCKET_PATH"] = "<a target="_blank" rel="noopener noreferrer" href="s3://xxx/mlflow">s3://xxx/mlflow</a>"
os.environ["AWS_ACCESS_KEY_ID"] = "xxx"
os.environ["AWS_SECRET_ACCESS_KEY"] = "xxx"
os.environ['MLFLOW_TRACKING_USERNAME'] = 'xxx'
os.environ['MLFLOW_TRACKING_PASSWORD'] = 'xxx'
os.environ["MLFLOW_TRACKING_SERVER_CERT_PATH"] = "C:\\xxx\\ca-bundle.crt"
EXPERIMENT_NAME = "ZeMC012"
In order to use in Kedro framework, I create a mlflow.yml file in conf/local folder and the content like this:
server: 
  mlflow_tracking_uri: <a target="_blank" rel="noopener noreferrer" href="https://xxx.com/mlflow/">https://xxx.com/mlflow/</a>
  MLFLOW_S3_ENDPOINT_URL: <a target="_blank" rel="noopener noreferrer" href="http://s3xxx.com">http://s3xxx.com</a>
  S3_BUCKET_PATH: <a target="_blank" rel="noopener noreferrer" href="s3://xxx/mlflow">s3://xxx/mlflow</a>
  AWS_ACCESS_KEY_ID: xxx
  AWS_SECRET_ACCESS_KEY: xxx
  MLFLOW_TRACKING_USERNAME: xxx
  MLFLOW_TRACKING_PASSWORD: xxx
  MLFLOW_EXPERIMENT_NAME: ZeMC012
  MLFLOW_TRACKING_SERVER_CERT_PATH: C:/xxx/ca-bundle.crt
But I got error ValidationError: 8 validation errors for KedroMlflowConfig
How should I modify it?

2 comments
D
S

Question on project setup.

My workflow usually looks like:

mkdir new-project
cd new-project
uv venv --python 3.xx
source .venv/bin/activate
uv pip install kedro
kedro new --name new-project 
Then my directories look like:
new-project/
    .venv/
    new-project/
        ... kedro stuff ...
but really i wanted the <i>current</i> directory to be my kedro project (at the level where .venv is)
Is there a good way to do this?

of course I could just create the venv a directory up, like so:
new-project/
    ... kedro stuff ...
.venv/
but I was things all in the same directory without having to move all the kedro project files one directory up

2 comments
J
I

Hey team, how can I dynamically overwrite an existing dataset in the Kedro catalog with a new configuration or data (e.g., changing the file path or dataset content) when running a pipeline from a Jupyter notebook on databricks? Same for dynamically overwriting a parameter. This would be as a one time test run so currently trying to change the notebook on Databricks and then would delete the added code for future runs. Any help on this would be great!

6 comments
D
M
E

Hey team, looking to add a couple more of our team members here but being told we have reached our limit? Is anyone able to help out? Cheers!

8 comments
D
d
M
J

Did anyone encounter an issue with greenlet transitive dependency of Kedro? Python 3.10, MacOS with M chip.

3 comments
N
J
Y

When you have an expensive operation, is there a good way of loading from an existing dataset? I am trying to check if a certain ID already existst and only perform the functionality of a node when it is new. If it is new, I then add those new entries to the saved dataset so that next time, I don't recalculate it. Effectively caching results.

2 comments
D
J

Bit of a long shot but could use some help on how to do this. My team is using an infrastructure as code tool called pulumi. I bootstrap and running a kedro project in a pulumi project is easy enough by running

bootstrap_project(Path("."))
with KedroSession.create() as session:
     session.run()
But pulumi restricts logs of anything that isn't explicitly a pulumi asset. Pulumi does allow you to log arbitrary python code https://www.pulumi.com/docs/iac/concepts/logging/ by running for example pulumi.info(message) pulumi.warning(message) etc..

Is there any way I could intercept the kedro logging system so that instead of it running something like logger.info(message) to emit a message, it could run pulumi.info(message)?

2 comments
N

Hey folks.
I am trying to pass mlflow credentials to streamlit page via oc.env [via kedro]. Any idea what could be causing the credentials to not be passed? These exist in the ECS task definition but not being passed to the streamlit page.

credentials.yml

mlflow:
  tracking_uri: ${oc.env:MLFLOW_TRACKING_URI}
  username: ${oc.env:MLFLOW_TRACKING_USERNAME}
  password: ${oc.env:MLFLOW_TRACKING_PASSWORD}

ERROR:
Exception: MLflow tracking URI not found in credentials.

This works perfectly fine when testing in local env with manually passing the credentials to envars , something like
export MLFLOW_TRACKING_USERNAME=username.

more info on kedro way

10 comments
L
S

hi i've set-up credentials hooks (kedro 0.19) and want to access the azure_creds values from inside the pipelines file to pass them to the node function, but OmegaConfigLoader is returning an empty value, is there a solution to this ?

#hooks.py
from kedro.framework.hooks import hook_impl
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential


class AzureSecretsHook:
    @hook_impl
    def after_context_created(self, context) -> None:
        keyVaultName = "keyvault-0542abb"  # or os.environ["KEY_VAULT_NAME"] if you would like to provide it through environment variables
        KVUri = f"https://{keyVaultName}.vault.azure.net"

        my_credential = DefaultAzureCredential()
        client = SecretClient(vault_url=KVUri, credential=my_credential)

        secrets = {
            "abs_creds": "azure-blob-store",
            "s3_creds": "s3-bucket-creds",
        }
        azure_creds = {
            cred_name: client.get_secret(secret_name).value
            for cred_name, secret_name in secrets.items()
        }

        context.config_loader["credentials"] = {
            **context.config_loader["credentials"],
            **azure_creds,
        }

1 comment
D

Guys, I would like to check with you if theres a simpler way to use a run_identifier on the path into the catalog:

I'm loading a base from BigQuery and spliting each row to run in another pipeline, where I load and save dynamically the inputs/outputs.

I would like to get a value from a column and use as run_identifier in the path on catalog:

filepath: ${root_folder}/${current_datetime}/${run_identifier}/data/model/{placeholder:name}.pt

is there a way known to do something like that? I open to suggestions...

49 comments
d
T
N

In this example: https://docs.kedro.org/en/stable/extend_kedro/plugins.html#project-context

I see that _get_project_metadata does not get called. Is it relevant to this example?

from pathlib import Path

from kedro.framework.startup import _get_project_metadata
from kedro.framework.session import KedroSession


project_path = Path.cwd()
session = KedroSession.create(project_path=project_path)
context = session.load_context()

Am I assuming that project_path would get defined elsewhere?

1 comment
D

I am looking at this example: https://docs.kedro.org/en/stable/development/commands_reference.html#customise-or-override-project-specific-kedro-commands

"""Command line tools for manipulating a Kedro project.
Intended to be invoked via `kedro`."""
import click
from kedro.framework.cli.project import (
    ASYNC_ARG_HELP,
    CONFIG_FILE_HELP,
    CONF_SOURCE_HELP,
    FROM_INPUTS_HELP,
    FROM_NODES_HELP,
    LOAD_VERSION_HELP,
    NODE_ARG_HELP,
    PARAMS_ARG_HELP,
    PIPELINE_ARG_HELP,
    RUNNER_ARG_HELP,
    TAG_ARG_HELP,
    TO_NODES_HELP,
    TO_OUTPUTS_HELP,
)
from kedro.framework.cli.utils import (
    CONTEXT_SETTINGS,
    _config_file_callback,
    _split_params,
    _split_load_versions,
    env_option,
    split_string,
    split_node_names,
)
from kedro.framework.session import KedroSession
from kedro.utils import load_obj


@click.group(context_settings=CONTEXT_SETTINGS, name=__file__)
def cli():
    """Command line tools for manipulating a Kedro project."""


@cli.command()
@click.option(
    "--from-inputs", type=str, default="", help=FROM_INPUTS_HELP, callback=split_string
)
@click.option(
    "--to-outputs", type=str, default="", help=TO_OUTPUTS_HELP, callback=split_string
)
@click.option(
    "--from-nodes", type=str, default="", help=FROM_NODES_HELP, callback=split_node_names
)
@click.option(
    "--to-nodes", type=str, default="", help=TO_NODES_HELP, callback=split_node_names
)
@click.option("--nodes", "-n", "node_names", type=str, multiple=True, help=NODE_ARG_HELP)
@click.option(
    "--runner", "-r", type=str, default=None, multiple=False, help=RUNNER_ARG_HELP
)
@click.option("--async", "is_async", is_flag=True, multiple=False, help=ASYNC_ARG_HELP)
@env_option
@click.option("--tags", "-t", type=str, multiple=True, help=TAG_ARG_HELP)
@click.option(
    "--load-versions",
    "-lv",
    type=str,
    multiple=True,
    help=LOAD_VERSION_HELP,
    callback=_split_load_versions,
)
@click.option("--pipeline", "-p", type=str, default=None, help=PIPELINE_ARG_HELP)
@click.option(
    "--config",
    "-c",
    type=click.Path(exists=True, dir_okay=False, resolve_path=True),
    help=CONFIG_FILE_HELP,
    callback=_config_file_callback,
)
@click.option(
    "--conf-source",
    type=click.Path(exists=True, file_okay=False, resolve_path=True),
    help=CONF_SOURCE_HELP,
)
@click.option(
    "--params",
    type=click.UNPROCESSED,
    default="",
    help=PARAMS_ARG_HELP,
    callback=_split_params,
)
def run(
    tags,
    env,
    runner,
    is_async,
    node_names,
    to_nodes,
    from_nodes,
    from_inputs,
    to_outputs,
    load_versions,
    pipeline,
    config,
    conf_source,
    params,
):
    """Run the pipeline."""

    runner = load_obj(runner or "SequentialRunner", "kedro.runner")
    tags = tuple(tags)
    node_names = tuple(node_names)

    with KedroSession.create(
        env=env, conf_source=conf_source, extra_params=params
    ) as session:
        session.run(
            tags=tags,
            runner=runner(is_async=is_async),
            node_names=node_names,
            from_nodes=from_nodes,
            to_nodes=to_nodes,
            from_inputs=from_inputs,
            to_outputs=to_outputs,
            load_versions=load_versions,
            pipeline_name=pipeline,
        )

Generally in Python users are supposed to stay away from single underscore prefixed variables, however this example in the docs illustrates using them. When functions like _config_file_callback appear in user documentation for constructing examples it makes it less clear what is intended for end-users.

Are such methods / functions supposed to be part of the public API?

3 comments
D
N

Hi Kedroids! :kedro:

Is there a way to override global parameters through a cli kedro pipeline trigger?

kedro run --pipeline <my_pipeline> --params "<my_list_of_global_params>"

6 comments
D
A

I have a fairly simple objective: I have data in Cloud SQL (GCP managed SQL server), in response to cloud pub/sub messages I want to run pipelines to extract data from cloud sql, to some transformation, and write results to BigQuery. I'm looking through the docs, but don't see who I'd trigger a pipeline in response to a pub/sub message (whatever the orchestrator); or is this this kind of thing that's outside kedro's remit - I'd have to do that bit separately?

1 comment
D

Hello Kedro experts. We’re trying to evaluate how Kedro might fit into our data engineering processes as we deploy ML models for our customers. The nature of our work is such that we expect to deploy similar solutions across different customers who will have different environments. As such there are certain python scripts/packages that we’re expecting to want to port across different environments, as well as aspects of every deployment that we’ll expect to be custom. That probably means we want to have “nodes” in our data engineering pipelines that potentially run with a different set of package requirements as some of the ported code may have conflicting requirements. However, I believe a kedro pipeline typically requires the same requirements.txt to be used throughout. Is that right?

9 comments
d
J
R

Hello!!!
I currently have an images classification use case. I have 7 classes and save images for each class separately (one class one folder). Not I setup the catalog.yml like this:
"{class_name}_data":
type: partitions.PartitionedDataset
filepath: ../data/01_raw/B4CD/{class_name}
dataset:
type: pillow.ImageDataset
But when I use catalog.load('XXXX')
What should I write for 'XXXX'? {class_name}_data or I have to load each folder/class separately?

3 comments
A
S

Hi everyone! Has anyone implemented a customer log handler and successfully configured it in logging.yml? I'm getting a "No module named 'myproject.handlers'" error. I guess the logging is instantiated at a point where the project hasn't been loaded yet. Any idea how to get a custom logger running?

6 comments
D
P

Hello everyone!
I'm having some troubles using the geopandas.GenericDataset. Here is my dataset:

raw_line:
  type: geopandas.GenericDataset
  filepath: "data/01_raw/lines/lines.shp"
  file_format: file

I'm facing the error:
DatasetError: Failed while loading data from dataset GenericDataset(file_format=file,
filepath=C:/MyCodes/my_project/data/01_raw/lines/lines.shp, load_args={}, protocol=file, save_args={}).
Failed to open dataset (flags=68): /vsimem/6485f3632b634505a3cf8c07708393b2

It looks like there is an old issue related to fsspec + geopandas:
https://github.com/kedro-org/kedro/issues/695#issuecomment-973953139

My libs:
kedro==0.19.9
kedro-datasets==5.1.0
fiona==1.10.1
fsspec==2024.10.0
geopandas==1.0.1

Is anyone able to use geopandas.GenericDataset with .shp files?

7 comments
J
D
J

Hello 🙂
I would like to know if, when generating the Airflow DAG for a Kedro project using the kedro-airflow tool, is it possible to create a separate DAG for each pipeline in the project rather than a single DAG per project? If so, how can I configure each DAG to specify start times and other parameters for each DAG corresponding to each pipeline in the project?

2 comments
M
J

How does the Kedro dev team think about delineating what components belong to the public API vs being internal-use only?

I see single leading underscores _<foo> are used, which I assume means they belong to the private API.

'Sometimes' I see <i><code>__all__</code></i> is used. Are things in that list safe to assume as part of the public API?

If a variable (function/method/class/etc) does not have a leading underscore, and is not in a __<i><code>all_</code></i>_ , does that mean it is safe to assume it is also part of the public API?

4 comments
D
N
G

Is there a way to add args during Kedro pipeline execution from terminal?

I mean, something like this

kedro run —parameter==01

3 comments
L
S

Hello all,
I have a question about datasets and namespaces. I am not even sure if what I am asking for is possible.
Here is a simplified version of the issue:

I have postgresql database which updates daily with data (predictions from some other models but that is beside the point)
One of the columns in the results_table is called "run_date". So if I want today's results I can do this:

(in catalog.yml):

oneday_of_data:
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)s"
load_args:
params:
run_date: 2024-11-01

this dataset combined with this one node pipeline lets me get the data from the database into my local drive.

(in pipeline.py)

pipeline([
node(
func=lambda x: x,
inputs="database_data",
outputs="local_data",
name="sql_to_local_node"
),]
)

now, if I wanted more than one day's data as different datasets, it seems like this a great candidate for namespacing because nothing changes except for the run date. Like this:

(in catalog.yml)
_run_dates:
run_1: 2024-11-01 #today
run_2: 2024-10-30 #yesterday
run_3: 2024-10-25 #a week ago

"{run_name}.oneday_of_data":
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
load_args:
params:
run_date: ${_run_dates.{run_name}}

but no matter what I try I can't get this to work. I know I can specify {run_name} in the filepath field (if it was a csv dataset say) but is it possible to use inside a templated/variable-interpolated field like this?

I have tried writing my own custom resolver (called "picker") defined as:

(in settings.py)
def pick_from_rundates(dict1, key):
return dict1[key]

CONFIG_LOADER_ARGS = {
"base_env": "base",
"default_run_env": "local",
"custom_resolvers": {"picker": lambda x, y: pick_from_rundates(x, y)}
}


and then tried this...which also failed:

(in catalog.yml)
"{run_name}.oneday_of_data":
type: pandas.SQLQueryDataset
credentials: db_credentials
sql: "select * from results_table where run_date = %(run_date)"
load_args:
params:
run_date: ${picker:${_run_dates},"{run_name}"}

So am I missing something simple here or is this fundamentally not allowed? Ideally the run dates would be specified in from the globals.yml instead of directly in the catalog.yml but I am trying to walk before I run here.

I will be grateful for any advice here.
minesh

7 comments
Y
m

When using BigQuery Datasets how do you define a default dataset project wide?

12 comments
d
L
J

Hey Kedro!
We are migrating to the latest versions of kedro and kedro-datasets and we are confused with the new geopandas.GenericDataset
We have an error that suggests problems with dependencies but we did install the specific group of kedro-datasets using pip. We checked the source code and the class is defined. Geopandas is installed too.
Here's the dataset in the catalog:

renab_geoloc_vul:
  filepath: data/vectors/renab/renab_vul_geoloc.geojson
  type: geopandas.GenericDataset
  versioned: false
Here's the error:
DatasetError: An exception occurred when parsing config for dataset 'renab_geoloc_vul': You can only set the value of existing options. Please see the documentation on how to install relevant dependencies for kedro_datasets.geopandas.GenericDataset:
<a target="_blank" rel="noopener noreferrer" href="https://docs.kedro.org/en/stable/kedro_project_setup/dependencies.html#install-dependencies-related-to-the-data-catalog">https://docs.kedro.org/en/stable/kedro_project_setup/dependencies.html#install-dependencies-related-to-the-data-catalog</a>
Can you help please ? 🙏

2 comments
L
H

Hi, I just want to run "kedro catalog list", then I got this error below:

NotSupportedError: deterministic=True requires SQLite 3.8.3 or higher
Which library with which version should I install?

3 comments
L
S