Join the Kedro community

A
M
M
M
D
Members
Abhishek Bhatia
A
Abhishek Bhatia
Offline, last seen 8 hours ago
Joined October 10, 2024

Kedro + GetInData Folks! :kedro:

I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo

On submitting the job

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project my-project \
    --region=europe-central2 \
    --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \
    --service-account dataproc-worker@my-project.iam.gserviceaccount.com \
    --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \
    -- \
    run

Entry point script contains the following:

import os
from kedro.framework import cli

os.chdir("/home/kedro")
cli.main()


I am getting the following error:


[10/15/24 17:30:21] INFO     Loading data from               data_catalog.py:343
                             'example_iris_data'                                
                             (SparkDataSet)...                                  
[10/15/24 17:30:22] WARNING  There are 3 nodes that have not run.  runner.py:178
                             You can resume the pipeline run by                 
                             adding the following argument to your              
                             previous command:                                  
                                                                                
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load          │
│                                                                              │
│   183 │   │   self._logger.debug("Loading %s", str(self))                    │
│   184 │   │                                                                  │
│   185 │   │   try:                                                           │
│ ❱ 186 │   │   │   return self._load()                                        │
│   187 │   │   except DataSetError:                                           │
│   188 │   │   │   raise                                                      │
│   189 │   │   except Exception as exc:                                       │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ message = 'Failed while loading data from data set                       │ │
│ │           SparkDataSet(file_format=csv, filepath=g'+2319                 │ │
│ │    self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object │ │
│ │           at 0x7f4163077730>                                             │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat │
│ aset.py:380 in _load                                                         │
│                                                                              │
│   377 │                                                                      │
│   378 │   def _load(self) -> DataFrame:                                      │
│   379 │   │   load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get │
│ ❱ 380 │   │   read_obj = self._get_spark().read                              │
│   381 │   │                                                                  │
│   382 │   │   # Pass schema if defined                                       │
│   383 │   │   if self._schema:                                               │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>'               │ │
│ │      self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet      │ │
│ │             object at 0x7f4163077730>                                    │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read                    │
│                                                                              │
│   1703 │   │   |100|Hyukjin Kwon|                                            │
│   1704 │   │   +---+------------+                                            │
│   1705 │   │   """                                                           │
│ ❱ 1706 │   │   return DataFrameReader(self)                                  │
│   1707 │                                                                     │
│   1708 │   @property                                                         │
│   1709 │   def readStream(self) -> DataStreamReader:                         │
│                                                                              │
│ ╭────────────────────────────── locals ──────────────────────────────╮       │
│ │ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │       │
│ ╰────────────────────────────────────────────────────────────────────╯       │
│                                                                              │
│ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__               │
│                                                                              │
│     67 │   """                                                               │
│     68 │                                                                     │
│     69 │   def __init__(self, spark: "SparkSession"):                        │
│ ❱   70 │   │   self._jreader = spark._jsparkSession.read()                   │
│     71 │   │   self._spark = spark                                           │
│     72 │                                                                     │
│     73 │   def _df(self, jdf: JavaObject) -> "DataFrame":                    │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  self = <pyspark.sql.readwriter.DataFrameReader object at                │ │
│ │         0x7f41631fa700>                                                  │ │
│ │ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40>      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in │
│ __call__                                                                     │
│                                                                              │
│ [Errno 20] Not a directory:                                                  │
│ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py'       │
│                                                                              │
│ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco      │
│                                                                              │
│   182 │   │   │   if not isinstance(converted, UnknownException):            │
│   183 │   │   │   │   # Hide where the exception came from that shows a non- │
│   184 │   │   │   │   # JVM exception message.                               │
│ ❱ 185 │   │   │   │   raise converted from None                              │
│   186 │   │   │   else:                                                      │
│   187 │   │   │   │   raise                                                  │
│   188                                                                        │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │         a = (                                                            │ │
│ │             │   'xro91',                                                 │ │
│ │             │   <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, │ │
│ │             │   'o88',                                                   │ │
│ │             │   'read'                                                   │ │
│ │             )                                                            │ │
│ │ converted = IllegalArgumentException()                                   │ │
│ │         f = <function get_return_value at 0x7f417b8c0310>                │ │
│ │        kw = {}                                                           │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────╯
IllegalArgumentException: The value of property spark.app.name must not be null


Almost 100% sure that this error is not due to my any mis-spec in my Dockerfile or requirements, because it works perfectly if I change the entrpoint script to the following:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True)
print(df.show())

2 comments
R
A

Hey Kedroids! :kedro:

(Apologies in advance for the long message but would really really appreciate a good discussion on below from the kedro community! 🙂 )

I have a usecase of deploying kedro pipelines using VertexAI SDK.

  1. In the production system (web app), I want to be able to trigger a kedro pipeline (or multiple pipelines) with specified parameters (say from the UI).
  2. Let's say we have a API endpoint https://my.web.app/api/v1/some-task
  1. Body includes parameters to trigger 1 or multiple kedro pipelines as a Vertex AI DAG

My VertexAI DAG has a combination of nodes (steps), and each node:

  1. May or may not be a kedro pipeline
  2. May be a pyspark workload running on dataproc or non spark workload running on a single compute VM
  3. May run a bigquery job
  4. May or may not run in a docker container

Let's take the example of submitting a kedro pipeline on Dataproc serverless running on a custom docker container using VertexAI SDK.

Questions:

  1. Do you package the kedro code as part of the Docker container or just the dependencies?

For example, i have seen this done alot which packages the kedro code as well:

RUN mkdir /usr/kedro
WORKDIR /usr/kedro/
COPY . .

which means copying the whole project, and then in the src/entrypoint.py ,

from kedro.framework import cli
import os

os.chdir("/usr/kedro")
cli.main()

2. Do I need to package my kedro project as a wheel file and submit it with the job to Dataproc? If so, how have you seen that done with DataprocPySparkBatchOp?

3. How do you recommend to pass dynamic parameters to the kedro pipeline run?

As I understand cli.main() picks up sys.argv to infer pipeline name and parameters so one could that

kedro run --pipeline <my_pipeline> --params=param_key1=value1,param_key2=2.0

Is there a better recommended way of doing this?

Thanks alot and hoping for a good discussion! 🙂

3 comments
R
A

Hi kedroids :kedro:

We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.

We use the bigquery client simply to trigger the SQL query on bigquery as follows:

def trigger_query_on_bigquery(
    query: str,
):
    client = bigquery.Client()
    query_job = client.query_and_wait(query)

    return True

The kedro dag to schedule multiple queries in order looks as follows:

def create_retail_data_primary_pipeline() -> Pipeline:
    nodes = [
        node(
            func=trigger_prm_customer_on_big_query,
            outputs="prm_customer@status",
        ),
        node(
            func=trigger_prm_transaction_detail_ecom_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_ecom@status",
        ),
        node(
            func=trigger_prm_transaction_detail_retail_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_retail@status",
        ),
        node(
            func=trigger_prm_transaction_detail_on_big_query,
            inputs=[
                "prm_transaction_detail_ecom@status",
                "prm_transaction_detail_retail@status",
                "prm_product_hierarchy@status",
                "prm_customer@status",
            ],
            outputs="prm_transaction_detail@status",
        ),
        node(
            func=trigger_prm_incident_on_big_query,
            outputs="prm_incident@status",
        ),
        node(
            func=trigger_prm_product_hierarchy_on_big_query,
            outputs="prm_product_hierarchy@status",
        ),

    ]

since the node can't output the dataframe itself, we output a transcoded entry with @status (which is just True), and then use the actual bigquery spark.SparkDataset transcoded entry versions of these datasets in downstream pipeline to enforce the order.

So I will use prm_product_hierarchy@bigquery dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.

Is there a better way to do this?

14 comments
d
A
D