Hi Team! :kedro:
KedroSession
in code โ
MemoryDataSet
then persistence won't be needed, saving up on I/O time. However, transcoding would be a problem in this case. Any ideas?Hi Team! :kedro:
My kedro pipeline is just stuck even before running any nodes
[11/14/24 17:09:07] WARNING /root/.venv/lib/python3.9/site-packages/kedro/framework/startup.py:99 warnings.py:109 : KedroDeprecationWarning: project_version in pyproject.toml is deprecated, use kedro_init_version instead warnings.warn( [11/14/24 17:09:15] INFO Kedro project project session.py:365 [11/14/24 17:09:17] WARNING /root/.venv/lib/python3.9/site-packages/kedro/framework/session/sessi warnings.py:109 on.py:267: KedroDeprecationWarning: Jinja2TemplatedConfigLoader will be deprecated in Kedro 0.19. Please use the OmegaConfigLoader instead. To consult the documentation for OmegaConfigLoader, see here: <a target="_blank" rel="noopener noreferrer" href="https://docs.kedro.org/en/stable/configuration/advanced_configuration">https://docs.kedro.org/en/stable/configuration/advanced_configuration</a> .html#omegaconfigloader warnings.warn( Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/11/14 17:09:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [11/14/24 17:12:53] WARNING /root/.venv/lib/python3.9/site-packages/pyspark/pandas/__init__.py:49 warnings.py:109 : UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched. warnings.warn(
0.18.14
3.9
Hi Team,
Is there a way to not run certain kedro hooks when kedro viz loads? I have a spark hook defined which runs everytime I run kedro viz which I want to disable.
Thanks! ๐
Hi Kedroids! :kedro:
Is there a way to override global parameters through a cli kedro pipeline trigger?
kedro run --pipeline <my_pipeline> --params "<my_list_of_global_params>"
Kedro + GetInData Folks! :kedro:
I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo
On submitting the job
gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \ --project my-project \ --region=europe-central2 \ --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \ --service-account dataproc-worker@my-project.iam.gserviceaccount.com \ --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \ -- \ run
import os from kedro.framework import cli os.chdir("/home/kedro") cli.main()
[10/15/24 17:30:21] INFO Loading data from data_catalog.py:343 'example_iris_data' (SparkDataSet)... [10/15/24 17:30:22] WARNING There are 3 nodes that have not run. runner.py:178 You can resume the pipeline run by adding the following argument to your previous command: โญโโโโโโโโโโโโโโโโโโโโโ Traceback (most recent call last) โโโโโโโโโโโโโโโโโโโโโโโฎ โ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load โ โ โ โ 183 โ โ self._logger.debug("Loading %s", str(self)) โ โ 184 โ โ โ โ 185 โ โ try: โ โ โฑ 186 โ โ โ return self._load() โ โ 187 โ โ except DataSetError: โ โ 188 โ โ โ raise โ โ 189 โ โ except Exception as exc: โ โ โ โ โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ locals โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ โ โ โ message = 'Failed while loading data from data set โ โ โ โ SparkDataSet(file_format=csv, filepath=g'+2319 โ โ โ โ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object โ โ โ โ at 0x7f4163077730> โ โ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ โ โ โ โ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat โ โ aset.py:380 in _load โ โ โ โ 377 โ โ โ 378 โ def _load(self) -> DataFrame: โ โ 379 โ โ load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get โ โ โฑ 380 โ โ read_obj = self._get_spark().read โ โ 381 โ โ โ โ 382 โ โ # Pass schema if defined โ โ 383 โ โ if self._schema: โ โ โ โ โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ locals โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ โ โ โ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>' โ โ โ โ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet โ โ โ โ object at 0x7f4163077730> โ โ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ โ โ โ โ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read โ โ โ โ 1703 โ โ |100|Hyukjin Kwon| โ โ 1704 โ โ +---+------------+ โ โ 1705 โ โ """ โ โ โฑ 1706 โ โ return DataFrameReader(self) โ โ 1707 โ โ โ 1708 โ @property โ โ 1709 โ def readStream(self) -> DataStreamReader: โ โ โ โ โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ locals โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ โ โ โ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> โ โ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ โ โ โ โ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__ โ โ โ โ 67 โ """ โ โ 68 โ โ โ 69 โ def __init__(self, spark: "SparkSession"): โ โ โฑ 70 โ โ self._jreader = spark._jsparkSession.read() โ โ 71 โ โ self._spark = spark โ โ 72 โ โ โ 73 โ def _df(self, jdf: JavaObject) -> "DataFrame": โ โ โ โ โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ locals โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ โ โ โ self = <pyspark.sql.readwriter.DataFrameReader object at โ โ โ โ 0x7f41631fa700> โ โ โ โ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> โ โ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ โ โ โ โ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in โ โ __call__ โ โ โ โ [Errno 20] Not a directory: โ โ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py' โ โ โ โ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco โ โ โ โ 182 โ โ โ if not isinstance(converted, UnknownException): โ โ 183 โ โ โ โ # Hide where the exception came from that shows a non- โ โ 184 โ โ โ โ # JVM exception message. โ โ โฑ 185 โ โ โ โ raise converted from None โ โ 186 โ โ โ else: โ โ 187 โ โ โ โ raise โ โ 188 โ โ โ โ โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ locals โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ โ โ โ a = ( โ โ โ โ โ 'xro91', โ โ โ โ โ <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, โ โ โ โ โ 'o88', โ โ โ โ โ 'read' โ โ โ โ ) โ โ โ โ converted = IllegalArgumentException() โ โ โ โ f = <function get_return_value at 0x7f417b8c0310> โ โ โ โ kw = {} โ โ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ โ โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ IllegalArgumentException: The value of property spark.app.name must not be null
Dockerfile
or requirements, because it works perfectly if I change the entrpoint script to the following:from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SimpleApp").getOrCreate() df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True) print(df.show())
Hey Kedroids! :kedro:
(Apologies in advance for the long message but would really really appreciate a good discussion on below from the kedro community! ๐ )
I have a usecase of deploying kedro pipelines using VertexAI SDK.
https://my.web.app/api/v1/some-task
RUN mkdir /usr/kedro WORKDIR /usr/kedro/ COPY . .
src/entrypoint.py
,from kedro.framework import cli import os os.chdir("/usr/kedro") cli.main()
cli.main()
picks up sys.argv to infer pipeline name and parameters so one could thatkedro run --pipeline <my_pipeline> --params=param_key1=value1,param_key2=2.0
Hi kedroids :kedro:
We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.
We use the bigquery client simply to trigger the SQL query on bigquery as follows:
def trigger_query_on_bigquery( query: str, ): client = bigquery.Client() query_job = client.query_and_wait(query) return True
def create_retail_data_primary_pipeline() -> Pipeline: nodes = [ node( func=trigger_prm_customer_on_big_query, outputs="prm_customer@status", ), node( func=trigger_prm_transaction_detail_ecom_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_ecom@status", ), node( func=trigger_prm_transaction_detail_retail_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_retail@status", ), node( func=trigger_prm_transaction_detail_on_big_query, inputs=[ "prm_transaction_detail_ecom@status", "prm_transaction_detail_retail@status", "prm_product_hierarchy@status", "prm_customer@status", ], outputs="prm_transaction_detail@status", ), node( func=trigger_prm_incident_on_big_query, outputs="prm_incident@status", ), node( func=trigger_prm_product_hierarchy_on_big_query, outputs="prm_product_hierarchy@status", ), ]
@status
(which is just True
), and then use the actual bigquery spark.SparkDataset
transcoded entry versions of these datasets in downstream pipeline to enforce the order.prm_product_hierarchy@bigquery
dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.