Join the Kedro community

Home
Members
Abhishek Bhatia
A
Abhishek Bhatia
Offline, last seen 6 days ago
Joined October 10, 2024

Hi Team! :kedro:

  • I have deployed my model inference pipeline as kedro pipelines served as a Dockerized web API.
  • The implementation around input data, and parameters from input HTTP request is handled, and I am able to run the kedro pipeline by initializing the KedroSession in code โœ…


However, I am concerned about kedro pipeline run time per request, which is too high (~1 minute).
Questions:

  1. Is there a way to reduce kedro startup time?
  2. My pipelines have a lot of persistent catalog entries. I have an idea, that if I convert every entry into MemoryDataSet then persistence won't be needed, saving up on I/O time. However, transcoding would be a problem in this case. Any ideas?
  3. Any other ways to speedup kedro init and general pipeline run?

Ideally want to make 0 changes between the actual kedro pipeline and the inference kedro pipeline.

Thanks! ๐Ÿ™‚

3 comments
Y
A
A

Hi Team! :kedro:

My kedro pipeline is just stuck even before running any nodes

[11/14/24 17:09:07] WARNING  /root/.venv/lib/python3.9/site-packages/kedro/framework/startup.py:99 warnings.py:109
                             : KedroDeprecationWarning: project_version in pyproject.toml is                      
                             deprecated, use kedro_init_version instead                                           
                               warnings.warn(                                                                     
                                                                                                                  
[11/14/24 17:09:15] INFO     Kedro project project                                                  session.py:365
[11/14/24 17:09:17] WARNING  /root/.venv/lib/python3.9/site-packages/kedro/framework/session/sessi warnings.py:109
                             on.py:267: KedroDeprecationWarning: Jinja2TemplatedConfigLoader will                 
                             be deprecated in Kedro 0.19. Please use the OmegaConfigLoader                        
                             instead. To consult the documentation for OmegaConfigLoader, see                     
                             here:                                                                                
                             <a target="_blank" rel="noopener noreferrer" href="https://docs.kedro.org/en/stable/configuration/advanced_configuration">https://docs.kedro.org/en/stable/configuration/advanced_configuration</a>                
                             .html#omegaconfigloader                                                              
                               warnings.warn(                                                                     
                                                                                                                  
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 17:09:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[11/14/24 17:12:53] WARNING  /root/.venv/lib/python3.9/site-packages/pyspark/pandas/__init__.py:49 warnings.py:109
                             : UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not                
                             set. It is required to set this environment variable to '1' in both                  
                             driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark                 
                             will set it for you but it does not work if there is a Spark context                 
                             already launched.                                                                    
                               warnings.warn(  

  • kedro: 0.18.14
  • python: 3.9
  • Running inside a docker container (since requirements don't compile on M* macs)

I understand this is too less information to help, but I have the same problem. Is there any place I could look into to see where it is stuck?

8 comments
J
A

Hi Team,

Is there a way to not run certain kedro hooks when kedro viz loads? I have a spark hook defined which runs everytime I run kedro viz which I want to disable.

Thanks! ๐Ÿ™‚

19 comments
d
A
R
R

Hi Kedroids! :kedro:

Is there a way to override global parameters through a cli kedro pipeline trigger?

kedro run --pipeline <my_pipeline> --params "<my_list_of_global_params>"

6 comments
D
A

Kedro + GetInData Folks! :kedro:

I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo

On submitting the job

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project my-project \
    --region=europe-central2 \
    --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \
    --service-account dataproc-worker@my-project.iam.gserviceaccount.com \
    --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \
    -- \
    run

Entry point script contains the following:

import os
from kedro.framework import cli

os.chdir("/home/kedro")
cli.main()


I am getting the following error:


[10/15/24 17:30:21] INFO     Loading data from               data_catalog.py:343
                             'example_iris_data'                                
                             (SparkDataSet)...                                  
[10/15/24 17:30:22] WARNING  There are 3 nodes that have not run.  runner.py:178
                             You can resume the pipeline run by                 
                             adding the following argument to your              
                             previous command:                                  
                                                                                
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load          โ”‚
โ”‚                                                                              โ”‚
โ”‚   183 โ”‚   โ”‚   self._logger.debug("Loading %s", str(self))                    โ”‚
โ”‚   184 โ”‚   โ”‚                                                                  โ”‚
โ”‚   185 โ”‚   โ”‚   try:                                                           โ”‚
โ”‚ โฑ 186 โ”‚   โ”‚   โ”‚   return self._load()                                        โ”‚
โ”‚   187 โ”‚   โ”‚   except DataSetError:                                           โ”‚
โ”‚   188 โ”‚   โ”‚   โ”‚   raise                                                      โ”‚
โ”‚   189 โ”‚   โ”‚   except Exception as exc:                                       โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚ message = 'Failed while loading data from data set                       โ”‚ โ”‚
โ”‚ โ”‚           SparkDataSet(file_format=csv, filepath=g'+2319                 โ”‚ โ”‚
โ”‚ โ”‚    self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object โ”‚ โ”‚
โ”‚ โ”‚           at 0x7f4163077730>                                             โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat โ”‚
โ”‚ aset.py:380 in _load                                                         โ”‚
โ”‚                                                                              โ”‚
โ”‚   377 โ”‚                                                                      โ”‚
โ”‚   378 โ”‚   def _load(self) -> DataFrame:                                      โ”‚
โ”‚   379 โ”‚   โ”‚   load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get โ”‚
โ”‚ โฑ 380 โ”‚   โ”‚   read_obj = self._get_spark().read                              โ”‚
โ”‚   381 โ”‚   โ”‚                                                                  โ”‚
โ”‚   382 โ”‚   โ”‚   # Pass schema if defined                                       โ”‚
โ”‚   383 โ”‚   โ”‚   if self._schema:                                               โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>'               โ”‚ โ”‚
โ”‚ โ”‚      self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet      โ”‚ โ”‚
โ”‚ โ”‚             object at 0x7f4163077730>                                    โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read                    โ”‚
โ”‚                                                                              โ”‚
โ”‚   1703 โ”‚   โ”‚   |100|Hyukjin Kwon|                                            โ”‚
โ”‚   1704 โ”‚   โ”‚   +---+------------+                                            โ”‚
โ”‚   1705 โ”‚   โ”‚   """                                                           โ”‚
โ”‚ โฑ 1706 โ”‚   โ”‚   return DataFrameReader(self)                                  โ”‚
โ”‚   1707 โ”‚                                                                     โ”‚
โ”‚   1708 โ”‚   @property                                                         โ”‚
โ”‚   1709 โ”‚   def readStream(self) -> DataStreamReader:                         โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ       โ”‚
โ”‚ โ”‚ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> โ”‚       โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ       โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__               โ”‚
โ”‚                                                                              โ”‚
โ”‚     67 โ”‚   """                                                               โ”‚
โ”‚     68 โ”‚                                                                     โ”‚
โ”‚     69 โ”‚   def __init__(self, spark: "SparkSession"):                        โ”‚
โ”‚ โฑ   70 โ”‚   โ”‚   self._jreader = spark._jsparkSession.read()                   โ”‚
โ”‚     71 โ”‚   โ”‚   self._spark = spark                                           โ”‚
โ”‚     72 โ”‚                                                                     โ”‚
โ”‚     73 โ”‚   def _df(self, jdf: JavaObject) -> "DataFrame":                    โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚  self = <pyspark.sql.readwriter.DataFrameReader object at                โ”‚ โ”‚
โ”‚ โ”‚         0x7f41631fa700>                                                  โ”‚ โ”‚
โ”‚ โ”‚ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40>      โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in โ”‚
โ”‚ __call__                                                                     โ”‚
โ”‚                                                                              โ”‚
โ”‚ [Errno 20] Not a directory:                                                  โ”‚
โ”‚ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py'       โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco      โ”‚
โ”‚                                                                              โ”‚
โ”‚   182 โ”‚   โ”‚   โ”‚   if not isinstance(converted, UnknownException):            โ”‚
โ”‚   183 โ”‚   โ”‚   โ”‚   โ”‚   # Hide where the exception came from that shows a non- โ”‚
โ”‚   184 โ”‚   โ”‚   โ”‚   โ”‚   # JVM exception message.                               โ”‚
โ”‚ โฑ 185 โ”‚   โ”‚   โ”‚   โ”‚   raise converted from None                              โ”‚
โ”‚   186 โ”‚   โ”‚   โ”‚   else:                                                      โ”‚
โ”‚   187 โ”‚   โ”‚   โ”‚   โ”‚   raise                                                  โ”‚
โ”‚   188                                                                        โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚         a = (                                                            โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'xro91',                                                 โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'o88',                                                   โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'read'                                                   โ”‚ โ”‚
โ”‚ โ”‚             )                                                            โ”‚ โ”‚
โ”‚ โ”‚ converted = IllegalArgumentException()                                   โ”‚ โ”‚
โ”‚ โ”‚         f = <function get_return_value at 0x7f417b8c0310>                โ”‚ โ”‚
โ”‚ โ”‚        kw = {}                                                           โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
IllegalArgumentException: The value of property spark.app.name must not be null


Almost 100% sure that this error is not due to my any mis-spec in my Dockerfile or requirements, because it works perfectly if I change the entrpoint script to the following:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True)
print(df.show())

5 comments
R
A
A

Hey Kedroids! :kedro:

(Apologies in advance for the long message but would really really appreciate a good discussion on below from the kedro community! ๐Ÿ™‚ )

I have a usecase of deploying kedro pipelines using VertexAI SDK.

  1. In the production system (web app), I want to be able to trigger a kedro pipeline (or multiple pipelines) with specified parameters (say from the UI).
  2. Let's say we have a API endpoint https://my.web.app/api/v1/some-task
  1. Body includes parameters to trigger 1 or multiple kedro pipelines as a Vertex AI DAG

My VertexAI DAG has a combination of nodes (steps), and each node:

  1. May or may not be a kedro pipeline
  2. May be a pyspark workload running on dataproc or non spark workload running on a single compute VM
  3. May run a bigquery job
  4. May or may not run in a docker container

Let's take the example of submitting a kedro pipeline on Dataproc serverless running on a custom docker container using VertexAI SDK.

Questions:

  1. Do you package the kedro code as part of the Docker container or just the dependencies?

For example, i have seen this done alot which packages the kedro code as well:

RUN mkdir /usr/kedro
WORKDIR /usr/kedro/
COPY . .

which means copying the whole project, and then in the src/entrypoint.py ,

from kedro.framework import cli
import os

os.chdir("/usr/kedro")
cli.main()

2. Do I need to package my kedro project as a wheel file and submit it with the job to Dataproc? If so, how have you seen that done with DataprocPySparkBatchOp?

3. How do you recommend to pass dynamic parameters to the kedro pipeline run?

As I understand cli.main() picks up sys.argv to infer pipeline name and parameters so one could that

kedro run --pipeline <my_pipeline> --params=param_key1=value1,param_key2=2.0

Is there a better recommended way of doing this?

Thanks alot and hoping for a good discussion! ๐Ÿ™‚

3 comments
R
A

Hi kedroids :kedro:

We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.

We use the bigquery client simply to trigger the SQL query on bigquery as follows:

def trigger_query_on_bigquery(
    query: str,
):
    client = bigquery.Client()
    query_job = client.query_and_wait(query)

    return True

The kedro dag to schedule multiple queries in order looks as follows:

def create_retail_data_primary_pipeline() -> Pipeline:
    nodes = [
        node(
            func=trigger_prm_customer_on_big_query,
            outputs="prm_customer@status",
        ),
        node(
            func=trigger_prm_transaction_detail_ecom_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_ecom@status",
        ),
        node(
            func=trigger_prm_transaction_detail_retail_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_retail@status",
        ),
        node(
            func=trigger_prm_transaction_detail_on_big_query,
            inputs=[
                "prm_transaction_detail_ecom@status",
                "prm_transaction_detail_retail@status",
                "prm_product_hierarchy@status",
                "prm_customer@status",
            ],
            outputs="prm_transaction_detail@status",
        ),
        node(
            func=trigger_prm_incident_on_big_query,
            outputs="prm_incident@status",
        ),
        node(
            func=trigger_prm_product_hierarchy_on_big_query,
            outputs="prm_product_hierarchy@status",
        ),

    ]

since the node can't output the dataframe itself, we output a transcoded entry with @status (which is just True), and then use the actual bigquery spark.SparkDataset transcoded entry versions of these datasets in downstream pipeline to enforce the order.

So I will use prm_product_hierarchy@bigquery dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.

Is there a better way to do this?

14 comments
d
A
D