Join the Kedro community

M
M
M
D
M

Kedro pyspark job submission issues with dataproc serverless

Kedro + GetInData Folks! :kedro:

I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo

On submitting the job

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project my-project \
    --region=europe-central2 \
    --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \
    --service-account dataproc-worker@my-project.iam.gserviceaccount.com \
    --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \
    -- \
    run

Entry point script contains the following:

import os
from kedro.framework import cli

os.chdir("/home/kedro")
cli.main()


I am getting the following error:


[10/15/24 17:30:21] INFO     Loading data from               data_catalog.py:343
                             'example_iris_data'                                
                             (SparkDataSet)...                                  
[10/15/24 17:30:22] WARNING  There are 3 nodes that have not run.  runner.py:178
                             You can resume the pipeline run by                 
                             adding the following argument to your              
                             previous command:                                  
                                                                                
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load          │
│                                                                              │
│   183 │   │   self._logger.debug("Loading %s", str(self))                    │
│   184 │   │                                                                  │
│   185 │   │   try:                                                           │
│ ❱ 186 │   │   │   return self._load()                                        │
│   187 │   │   except DataSetError:                                           │
│   188 │   │   │   raise                                                      │
│   189 │   │   except Exception as exc:                                       │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ message = 'Failed while loading data from data set                       │ │
│ │           SparkDataSet(file_format=csv, filepath=g'+2319                 │ │
│ │    self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object │ │
│ │           at 0x7f4163077730>                                             │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat │
│ aset.py:380 in _load                                                         │
│                                                                              │
│   377 │                                                                      │
│   378 │   def _load(self) -> DataFrame:                                      │
│   379 │   │   load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get │
│ ❱ 380 │   │   read_obj = self._get_spark().read                              │
│   381 │   │                                                                  │
│   382 │   │   # Pass schema if defined                                       │
│   383 │   │   if self._schema:                                               │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>'               │ │
│ │      self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet      │ │
│ │             object at 0x7f4163077730>                                    │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read                    │
│                                                                              │
│   1703 │   │   |100|Hyukjin Kwon|                                            │
│   1704 │   │   +---+------------+                                            │
│   1705 │   │   """                                                           │
│ ❱ 1706 │   │   return DataFrameReader(self)                                  │
│   1707 │                                                                     │
│   1708 │   @property                                                         │
│   1709 │   def readStream(self) -> DataStreamReader:                         │
│                                                                              │
│ ╭────────────────────────────── locals ──────────────────────────────╮       │
│ │ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │       │
│ ╰────────────────────────────────────────────────────────────────────╯       │
│                                                                              │
│ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__               │
│                                                                              │
│     67 │   """                                                               │
│     68 │                                                                     │
│     69 │   def __init__(self, spark: "SparkSession"):                        │
│ ❱   70 │   │   self._jreader = spark._jsparkSession.read()                   │
│     71 │   │   self._spark = spark                                           │
│     72 │                                                                     │
│     73 │   def _df(self, jdf: JavaObject) -> "DataFrame":                    │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  self = <pyspark.sql.readwriter.DataFrameReader object at                │ │
│ │         0x7f41631fa700>                                                  │ │
│ │ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40>      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in │
│ __call__                                                                     │
│                                                                              │
│ [Errno 20] Not a directory:                                                  │
│ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py'       │
│                                                                              │
│ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco      │
│                                                                              │
│   182 │   │   │   if not isinstance(converted, UnknownException):            │
│   183 │   │   │   │   # Hide where the exception came from that shows a non- │
│   184 │   │   │   │   # JVM exception message.                               │
│ ❱ 185 │   │   │   │   raise converted from None                              │
│   186 │   │   │   else:                                                      │
│   187 │   │   │   │   raise                                                  │
│   188                                                                        │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │         a = (                                                            │ │
│ │             │   'xro91',                                                 │ │
│ │             │   <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, │ │
│ │             │   'o88',                                                   │ │
│ │             │   'read'                                                   │ │
│ │             )                                                            │ │
│ │ converted = IllegalArgumentException()                                   │ │
│ │         f = <function get_return_value at 0x7f417b8c0310>                │ │
│ │        kw = {}                                                           │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────╯
IllegalArgumentException: The value of property spark.app.name must not be null


Almost 100% sure that this error is not due to my any mis-spec in my Dockerfile or requirements, because it works perfectly if I change the entrpoint script to the following:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True)
print(df.show())

R
A
2 comments

Hi Abhishek, I am new to dataproc, as I was looking for the accepted arguments for --properties flag, I could not find spark.app.name . Could you try without passing that ? I do not see the git repo passing the same -

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project gid-ml-ops-sandbox \
    --region=europe-west1\
    --container-image=gcr.io/gid-ml-ops-sandbox/pyspark-tutorial-mb:20220920105 \
    --service-account kedro-pyspark-dataproc@gid-ml-ops-sandbox.iam.gserviceaccount.com \
    --properties spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 -- \
    run 

Yes initially tried without passing spark.app.name but got the same error.

Add a reply
Sign up and join the conversation on Slack
Join