Guys, I would like to check with you if theres a simpler way to use a run_identifier on the path into the catalog:
I'm loading a base from BigQuery and spliting each row to run in another pipeline, where I load and save dynamically the inputs/outputs.
I would like to get a value from a column and use as run_identifier in the path on catalog:
filepath: ${root_folder}/${current_datetime}/${run_identifier}/data/model/{placeholder:name}.pt
is there a way known to do something like that? I open to suggestions...
yeah, but I need it to be updated dynamically, based on the value from a row that comes from the input...
So I understand that you basically want to override some parameters base on a node output.
https://docs.kedro.org/en/stable/extend_kedro/architecture_overview.html
I'm loading a base from BigQuery and spliting each row to run in another pipeline
If you are calling separate Kedro pipeline you can simply inject those identifier as part of the runtime_params
I do have a pipeline that load the BQ table, split that data into rows, that is saved dynamically on catalog.
Once that orchestration pipeline is done, the model pipeline runs.
def create_pipeline(**kwargs) -> Pipeline: params = load_catalog_params() print("input_tables entry from parameters/general.yml: ", params["input_tables"]) all_pipelines = [] for group in range(int(params['n_cores'])): p = Pipeline([ node(func = train_setup, inputs = ['imagens', 'masks', 'split_data_input', f"train_config_params_{group}"], outputs = [f"metrics_{group}", f"epoch_loss_{group}", f"validity_metrics_{group}", f"model_save_{group}", f"train_params_{group}"], name = f"train_setup_{group}") ],tags = "training") all_pipelines += [p] return reduce(add, all_pipelines)
Otherwise I am thinkin using namespace pipeline + dataset factory. So the catalog look like
{run_identifier}_xxx_dataset: filepath: {run_identifier}/some_folder/some_dataset.parquetThen in those namespace pipeline you use the run_identifier as part of the input/output name
so the outputs from that train_setup that I would like to be able to add the run_idenifier into the path
so that for every whole run, I'll have a folder with date/run_idenifier/data..
I see, I don't have an immediate solution. This is tricky because this is having a runtime output defining the dataset, which has been initiated way before the node is run.
https://docs.kedro.org/en/stable/extend_kedro/architecture_overview.html
The other way is likely using hook
to override the catalog during node run.
Alternatively, if you can split this into two separate Kedro run, this would be very simple.
First run to generate the rows to run:
Second run are kedro run with runtime_params, that read the result of the previous run, from a table potentially.
The downside is that you cannot use a single kedro run
command to do what you think as a whole job. In an orchestrator it shouldn't be a problem since you can treat two run as a job and it doesn't have to be mapped 1:!
Hey do you have a documention on how to implement that example you gave: namespace pipeline + dataset....
{run_identifier}_xxx_dataset: filepath: {run_identifier}/some_folder/some_dataset.parquet
let see if this help you.
Docs:
nice...
I get the part of the namespace now, but the layer one, where its defined? I mean, the namespace is define in the node... and layer?
I see, this is using both namespace and factory at the same time. For example, if you only use factory but not namespace, it may look like this:
{layer}_some_dataset: filepath: data/{layer}/data.pq
ok, but I just don't know where the value "layer" is coming from, sorry about that, I'm somewhat new with more advanced Kedro concepts, so I don't know about the factory
So for example, when you have a node without any dataset factory:
node(my_func, inputs="some_data", outputs="my_fav_dataset")then your catalog may look like:
some_data: ... {some}_dataset: filepath: data/{some}/dataset.pq
outputs
will automatically match the 2nd dataset in the catalog. Without a pattern, Kedro default to an in-memory dataset that is thrown away after the run.so how would look a node and catalog that have a layer? just to see if I truly got it
don't worry about that, I can see the confusion as it requires both node
and catalog
to reason about the dataset. It's not super clear in the doc I will try to add some explanation.
ohhh that will be the layer? or I can call it anything? as long as follow a somewhat pattern in that mentioned rank...
some_data: ... {some}_dataset_{abc}: filepath: data/{some}/dataset.pq layer: {abc} {some}_dataset: filepath: data/{some}/dataset.pq
kedro catalog rank
to help you to understand the resolution.kedro_dataset
, will match {some}_dataset
, while kedro_dataset_something
will match {some}_
<i><code>dataset_{abc}</code></i>We use parse
as the underlying library, which is a bit like reverse of f-string
. This example should help you to understand more:
>>> from parse import compile >>> p = compile("It's {}, I love it!") >>> print(p) <Parser "It's {}, I love it!"> >>> p.parse("It's spam, I love it!") <Result ('spam',) {}>
I see now, I believe that I'll be able to solve that issue I'm having with these features.
I'll use a for loop inside the node to extract the information I need and use as layer
it would be possible to access a data from a input in the pipeline node definition?
Added some docs here: https://github.com/kedro-org/kedro/pull/4308
The build is stuck for some reason but you can review this temporarily with this link: https://5500-kedroorg-kedro-ov91qdu83us.ws-us116.gitpod.io/docs/build/html/data/kedro_dataset_factories.html
but like in that example you shared, you create a list of months, that you use as namespace to save the outputs, would be possible to access a value from the catalog instead of creating that list?
yeah, the are created in another pipeline, basically it would be accessing a catalog entry inside the pipeline.py file... like you did there with the months, but accensing a entry...
months = ["jan", "feb", "mar", "apr"] # instead use: months = catalog.load('months') # lets say... something like that
It is possible though not the most convention way: https://github.com/kedro-org/kedro/issues/2627#issuecomment-1691596460
If you can afford two runs, this may be a simpler approach: https://github.com/noklam/kedro-example/blob/master/conditional-kedro-runs/conditional_run.py
Hey
I do have a param:
conf/base/parameters/test.yml
group_id: null
I'm trying to update it during a pipeline run by catalog.save(), is it possible?
I manage to workaround what I needed, its just the namespace that I'm not making to work for some reason