Join the Kedro community

Updated last month

Kedro pyspark job submission issues with dataproc serverless

Kedro + GetInData Folks! :kedro:

I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo

On submitting the job

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project my-project \
    --region=europe-central2 \
    --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \
    --service-account dataproc-worker@my-project.iam.gserviceaccount.com \
    --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \
    -- \
    run

Entry point script contains the following:

import os
from kedro.framework import cli

os.chdir("/home/kedro")
cli.main()


I am getting the following error:


[10/15/24 17:30:21] INFO     Loading data from               data_catalog.py:343
                             'example_iris_data'                                
                             (SparkDataSet)...                                  
[10/15/24 17:30:22] WARNING  There are 3 nodes that have not run.  runner.py:178
                             You can resume the pipeline run by                 
                             adding the following argument to your              
                             previous command:                                  
                                                                                
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load          โ”‚
โ”‚                                                                              โ”‚
โ”‚   183 โ”‚   โ”‚   self._logger.debug("Loading %s", str(self))                    โ”‚
โ”‚   184 โ”‚   โ”‚                                                                  โ”‚
โ”‚   185 โ”‚   โ”‚   try:                                                           โ”‚
โ”‚ โฑ 186 โ”‚   โ”‚   โ”‚   return self._load()                                        โ”‚
โ”‚   187 โ”‚   โ”‚   except DataSetError:                                           โ”‚
โ”‚   188 โ”‚   โ”‚   โ”‚   raise                                                      โ”‚
โ”‚   189 โ”‚   โ”‚   except Exception as exc:                                       โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚ message = 'Failed while loading data from data set                       โ”‚ โ”‚
โ”‚ โ”‚           SparkDataSet(file_format=csv, filepath=g'+2319                 โ”‚ โ”‚
โ”‚ โ”‚    self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object โ”‚ โ”‚
โ”‚ โ”‚           at 0x7f4163077730>                                             โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat โ”‚
โ”‚ aset.py:380 in _load                                                         โ”‚
โ”‚                                                                              โ”‚
โ”‚   377 โ”‚                                                                      โ”‚
โ”‚   378 โ”‚   def _load(self) -> DataFrame:                                      โ”‚
โ”‚   379 โ”‚   โ”‚   load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get โ”‚
โ”‚ โฑ 380 โ”‚   โ”‚   read_obj = self._get_spark().read                              โ”‚
โ”‚   381 โ”‚   โ”‚                                                                  โ”‚
โ”‚   382 โ”‚   โ”‚   # Pass schema if defined                                       โ”‚
โ”‚   383 โ”‚   โ”‚   if self._schema:                                               โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>'               โ”‚ โ”‚
โ”‚ โ”‚      self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet      โ”‚ โ”‚
โ”‚ โ”‚             object at 0x7f4163077730>                                    โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read                    โ”‚
โ”‚                                                                              โ”‚
โ”‚   1703 โ”‚   โ”‚   |100|Hyukjin Kwon|                                            โ”‚
โ”‚   1704 โ”‚   โ”‚   +---+------------+                                            โ”‚
โ”‚   1705 โ”‚   โ”‚   """                                                           โ”‚
โ”‚ โฑ 1706 โ”‚   โ”‚   return DataFrameReader(self)                                  โ”‚
โ”‚   1707 โ”‚                                                                     โ”‚
โ”‚   1708 โ”‚   @property                                                         โ”‚
โ”‚   1709 โ”‚   def readStream(self) -> DataStreamReader:                         โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ       โ”‚
โ”‚ โ”‚ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> โ”‚       โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ       โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__               โ”‚
โ”‚                                                                              โ”‚
โ”‚     67 โ”‚   """                                                               โ”‚
โ”‚     68 โ”‚                                                                     โ”‚
โ”‚     69 โ”‚   def __init__(self, spark: "SparkSession"):                        โ”‚
โ”‚ โฑ   70 โ”‚   โ”‚   self._jreader = spark._jsparkSession.read()                   โ”‚
โ”‚     71 โ”‚   โ”‚   self._spark = spark                                           โ”‚
โ”‚     72 โ”‚                                                                     โ”‚
โ”‚     73 โ”‚   def _df(self, jdf: JavaObject) -> "DataFrame":                    โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚  self = <pyspark.sql.readwriter.DataFrameReader object at                โ”‚ โ”‚
โ”‚ โ”‚         0x7f41631fa700>                                                  โ”‚ โ”‚
โ”‚ โ”‚ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40>      โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in โ”‚
โ”‚ __call__                                                                     โ”‚
โ”‚                                                                              โ”‚
โ”‚ [Errno 20] Not a directory:                                                  โ”‚
โ”‚ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py'       โ”‚
โ”‚                                                                              โ”‚
โ”‚ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco      โ”‚
โ”‚                                                                              โ”‚
โ”‚   182 โ”‚   โ”‚   โ”‚   if not isinstance(converted, UnknownException):            โ”‚
โ”‚   183 โ”‚   โ”‚   โ”‚   โ”‚   # Hide where the exception came from that shows a non- โ”‚
โ”‚   184 โ”‚   โ”‚   โ”‚   โ”‚   # JVM exception message.                               โ”‚
โ”‚ โฑ 185 โ”‚   โ”‚   โ”‚   โ”‚   raise converted from None                              โ”‚
โ”‚   186 โ”‚   โ”‚   โ”‚   else:                                                      โ”‚
โ”‚   187 โ”‚   โ”‚   โ”‚   โ”‚   raise                                                  โ”‚
โ”‚   188                                                                        โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ locals โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚
โ”‚ โ”‚         a = (                                                            โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'xro91',                                                 โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'o88',                                                   โ”‚ โ”‚
โ”‚ โ”‚             โ”‚   'read'                                                   โ”‚ โ”‚
โ”‚ โ”‚             )                                                            โ”‚ โ”‚
โ”‚ โ”‚ converted = IllegalArgumentException()                                   โ”‚ โ”‚
โ”‚ โ”‚         f = <function get_return_value at 0x7f417b8c0310>                โ”‚ โ”‚
โ”‚ โ”‚        kw = {}                                                           โ”‚ โ”‚
โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
IllegalArgumentException: The value of property spark.app.name must not be null


Almost 100% sure that this error is not due to my any mis-spec in my Dockerfile or requirements, because it works perfectly if I change the entrpoint script to the following:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True)
print(df.show())

R
A
A
5 comments

Hi Abhishek, I am new to dataproc, as I was looking for the accepted arguments for --properties flag, I could not find spark.app.name . Could you try without passing that ? I do not see the git repo passing the same -

gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
    --project gid-ml-ops-sandbox \
    --region=europe-west1\
    --container-image=gcr.io/gid-ml-ops-sandbox/pyspark-tutorial-mb:20220920105 \
    --service-account kedro-pyspark-dataproc@gid-ml-ops-sandbox.iam.gserviceaccount.com \
    --properties spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 -- \
    run 

Yes initially tried without passing spark.app.name but got the same error.

Appreciate any suggestions here cc:

Hi, I was a bit busy lately, I'll have a look at it tomorrow, but I don't promise solutions as I'm not familiar with this particular example. It's 2 years old and wasn't updated, likely some api changed. Can u just confirm which kedro version are you using?

So, it got solved by the following:

  • Dataproc runtime version: 1.1
  • Dataproc ships and mounts pyspark installation: 3.3.4
  • Since SparkDataset installs pyspark: pin to 3.3.4
  • (although google docs say not to install our own pyspark)

Then it worked ๐Ÿ™‚

Add a reply
Sign up and join the conversation on Slack