Hello! :kedro:
I am on kedro 0.18.14
using a custom config loader based on TemplatedConfigLoader
. Is there a way to access globals defined in globals.yml
in kedro nodes?
Hi Team!
I am trying to read a bigquery table using the spark.SparkDataSet
with an arbitrary query as follows
trx_agg_data: type: spark.SparkDataSet file_format: bigquery load_args: viewsEnabled: true query: | SELECT ph.category, MAX(trx.sales) FROM {project}.{dataset}.trx_data trx LEFT JOIN {project}.{dataset}.prod_hierarchy ph filepath: <a target="_blank" rel="noopener noreferrer" href="gs://my-bucket/trx_agg_data.parquet">gs://my-bucket/trx_agg_data.parquet</a>
filepath
is not in the correct format (BigQuery expected <project>.<dataset>.<table>
), but I am trying to read it with a query.spark.read.format("bigquery").option("query", "SELECT ph.category, MAX(trx.sales) FROM {project}.{dataset}.trx_data trx LEFT JOIN {project}.{dataset}.prod_hierarchy ph" ).load()
spark.SparkDataSet
does not have this functionality. Should I create a custom dataset here?Hi Team! :kedro:
KedroSession
in code ✅ MemoryDataSet
then persistence won't be needed, saving up on I/O time. However, transcoding would be a problem in this case. Any ideas?Hi Team! :kedro:
My kedro pipeline is just stuck even before running any nodes
[11/14/24 17:09:07] WARNING /root/.venv/lib/python3.9/site-packages/kedro/framework/startup.py:99 warnings.py:109 : KedroDeprecationWarning: project_version in pyproject.toml is deprecated, use kedro_init_version instead warnings.warn( [11/14/24 17:09:15] INFO Kedro project project session.py:365 [11/14/24 17:09:17] WARNING /root/.venv/lib/python3.9/site-packages/kedro/framework/session/sessi warnings.py:109 on.py:267: KedroDeprecationWarning: Jinja2TemplatedConfigLoader will be deprecated in Kedro 0.19. Please use the OmegaConfigLoader instead. To consult the documentation for OmegaConfigLoader, see here: <a target="_blank" rel="noopener noreferrer" href="https://docs.kedro.org/en/stable/configuration/advanced_configuration">https://docs.kedro.org/en/stable/configuration/advanced_configuration</a> .html#omegaconfigloader warnings.warn( Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/11/14 17:09:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [11/14/24 17:12:53] WARNING /root/.venv/lib/python3.9/site-packages/pyspark/pandas/__init__.py:49 warnings.py:109 : UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched. warnings.warn(
0.18.14
3.9
Hi Team,
Is there a way to not run certain kedro hooks when kedro viz loads? I have a spark hook defined which runs everytime I run kedro viz which I want to disable.
Thanks! 🙂
Hi Kedroids! :kedro:
Is there a way to override global parameters through a cli kedro pipeline trigger?
kedro run --pipeline <my_pipeline> --params "<my_list_of_global_params>"
Kedro + GetInData Folks! :kedro:
I am following this repo to submit a kedro pyspark job to dataproc serverless: https://github.com/getindata/kedro-pyspark-dataproc-demo
On submitting the job
gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \ --project my-project \ --region=europe-central2 \ --container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \ --service-account dataproc-worker@my-project.iam.gserviceaccount.com \ --properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \ -- \ run
import os from kedro.framework import cli os.chdir("/home/kedro") cli.main()
[10/15/24 17:30:21] INFO Loading data from data_catalog.py:343 'example_iris_data' (SparkDataSet)... [10/15/24 17:30:22] WARNING There are 3 nodes that have not run. runner.py:178 You can resume the pipeline run by adding the following argument to your previous command: ╭───────────────────── Traceback (most recent call last) ──────────────────────╮ │ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load │ │ │ │ 183 │ │ self._logger.debug("Loading %s", str(self)) │ │ 184 │ │ │ │ 185 │ │ try: │ │ ❱ 186 │ │ │ return self._load() │ │ 187 │ │ except DataSetError: │ │ 188 │ │ │ raise │ │ 189 │ │ except Exception as exc: │ │ │ │ ╭───────────────────────────────── locals ─────────────────────────────────╮ │ │ │ message = 'Failed while loading data from data set │ │ │ │ SparkDataSet(file_format=csv, filepath=g'+2319 │ │ │ │ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object │ │ │ │ at 0x7f4163077730> │ │ │ ╰──────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat │ │ aset.py:380 in _load │ │ │ │ 377 │ │ │ 378 │ def _load(self) -> DataFrame: │ │ 379 │ │ load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get │ │ ❱ 380 │ │ read_obj = self._get_spark().read │ │ 381 │ │ │ │ 382 │ │ # Pass schema if defined │ │ 383 │ │ if self._schema: │ │ │ │ ╭───────────────────────────────── locals ─────────────────────────────────╮ │ │ │ load_path = '<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>' │ │ │ │ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet │ │ │ │ object at 0x7f4163077730> │ │ │ ╰──────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read │ │ │ │ 1703 │ │ |100|Hyukjin Kwon| │ │ 1704 │ │ +---+------------+ │ │ 1705 │ │ """ │ │ ❱ 1706 │ │ return DataFrameReader(self) │ │ 1707 │ │ │ 1708 │ @property │ │ 1709 │ def readStream(self) -> DataStreamReader: │ │ │ │ ╭────────────────────────────── locals ──────────────────────────────╮ │ │ │ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │ │ │ ╰────────────────────────────────────────────────────────────────────╯ │ │ │ │ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__ │ │ │ │ 67 │ """ │ │ 68 │ │ │ 69 │ def __init__(self, spark: "SparkSession"): │ │ ❱ 70 │ │ self._jreader = spark._jsparkSession.read() │ │ 71 │ │ self._spark = spark │ │ 72 │ │ │ 73 │ def _df(self, jdf: JavaObject) -> "DataFrame": │ │ │ │ ╭───────────────────────────────── locals ─────────────────────────────────╮ │ │ │ self = <pyspark.sql.readwriter.DataFrameReader object at │ │ │ │ 0x7f41631fa700> │ │ │ │ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │ │ │ ╰──────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in │ │ __call__ │ │ │ │ [Errno 20] Not a directory: │ │ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py' │ │ │ │ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco │ │ │ │ 182 │ │ │ if not isinstance(converted, UnknownException): │ │ 183 │ │ │ │ # Hide where the exception came from that shows a non- │ │ 184 │ │ │ │ # JVM exception message. │ │ ❱ 185 │ │ │ │ raise converted from None │ │ 186 │ │ │ else: │ │ 187 │ │ │ │ raise │ │ 188 │ │ │ │ ╭───────────────────────────────── locals ─────────────────────────────────╮ │ │ │ a = ( │ │ │ │ │ 'xro91', │ │ │ │ │ <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, │ │ │ │ │ 'o88', │ │ │ │ │ 'read' │ │ │ │ ) │ │ │ │ converted = IllegalArgumentException() │ │ │ │ f = <function get_return_value at 0x7f417b8c0310> │ │ │ │ kw = {} │ │ │ ╰──────────────────────────────────────────────────────────────────────────╯ │ ╰──────────────────────────────────────────────────────────────────────────────╯ IllegalArgumentException: The value of property spark.app.name must not be null
Dockerfile
or requirements, because it works perfectly if I change the entrpoint script to the following:from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SimpleApp").getOrCreate() df = spark.read.csv("<a target="_blank" rel="noopener noreferrer" href="gs://aa-dev-crm-users/abhishek/misc/iris.csv">gs://aa-dev-crm-users/abhishek/misc/iris.csv</a>", inferSchema=True, header=True) print(df.show())
Hey Kedroids! :kedro:
(Apologies in advance for the long message but would really really appreciate a good discussion on below from the kedro community! 🙂 )
I have a usecase of deploying kedro pipelines using VertexAI SDK.
https://my.web.app/api/v1/some-task
RUN mkdir /usr/kedro WORKDIR /usr/kedro/ COPY . .
src/entrypoint.py
,from kedro.framework import cli import os os.chdir("/usr/kedro") cli.main()
cli.main()
picks up sys.argv to infer pipeline name and parameters so one could thatkedro run --pipeline <my_pipeline> --params=param_key1=value1,param_key2=2.0
Hi kedroids :kedro:
We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline.
We use the bigquery client simply to trigger the SQL query on bigquery as follows:
def trigger_query_on_bigquery( query: str, ): client = bigquery.Client() query_job = client.query_and_wait(query) return True
def create_retail_data_primary_pipeline() -> Pipeline: nodes = [ node( func=trigger_prm_customer_on_big_query, outputs="prm_customer@status", ), node( func=trigger_prm_transaction_detail_ecom_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_ecom@status", ), node( func=trigger_prm_transaction_detail_retail_on_big_query, inputs=["prm_product_hierarchy@status"], outputs="prm_transaction_detail_retail@status", ), node( func=trigger_prm_transaction_detail_on_big_query, inputs=[ "prm_transaction_detail_ecom@status", "prm_transaction_detail_retail@status", "prm_product_hierarchy@status", "prm_customer@status", ], outputs="prm_transaction_detail@status", ), node( func=trigger_prm_incident_on_big_query, outputs="prm_incident@status", ), node( func=trigger_prm_product_hierarchy_on_big_query, outputs="prm_product_hierarchy@status", ), ]
@status
(which is just True
), and then use the actual bigquery spark.SparkDataset
transcoded entry versions of these datasets in downstream pipeline to enforce the order.prm_product_hierarchy@bigquery
dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.