Join the Kedro community

Home
Members
Mohamed El Guendouz
M
Mohamed El Guendouz
Offline, last seen 4 days ago
Joined October 2, 2024

Hello 🙂 I'd like to read a BigQuery table using spark.Dataset, but I'm getting an error saying that I need to configure the project ID. Has anyone encountered this issue before?

Spark Session :

spark.jars.packages: io.delta:delta-spark_2.12:3.2.0
spark.jars: <a target="_blank" rel="noopener noreferrer" href="https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar,https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.36.1/spark-bigquery-with-dependencies_2.12-0.36.1.jar">https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar,https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.36.1/spark-bigquery-with-dependencies_2.12-0.36.1.jar</a>
spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.hadoop.fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

Error :
DatasetError: Failed while loading data from dataset SparkDataset(file_format=bigquery, filepath=/tmp/dummy.parquet, load_args={'table': project_id.dataset_id.table_id}, save_args={}).
An error occurred while calling o45.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) [Guice/ErrorInCustomProvider]: IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment.  Please set a project ID using the 
builder.
  at SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:102)
  while locating SparkBigQueryConfig

Learn more:
  <a target="_blank" rel="noopener noreferrer" href="https://github.com/google/guice/wiki/ERROR_IN_CUSTOM_PROVIDER">https://github.com/google/guice/wiki/ERROR_IN_CUSTOM_PROVIDER</a>

1 error

======================
Full classname legend:
======================
SparkBigQueryConfig:          "com.google.cloud.spark.bigquery.SparkBigQueryConfig"
SparkBigQueryConnectorModule: "com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule"
========================
End of classname legend:
========================

13 comments
L
M
N

Hello everyone 😄 !

I'm currently using kedro-airflow to generate my Airflow DAGs from my Kedro project. I followed the recommendation in the documentation and used a custom template to adapt the DAG for execution on Cloud Composer.
According to the documentation, it is possible to create TaskGroups if needed: Kedro-Airflow Documentation.

I’d like to group multiple nodes into TaskGroups, but I can't find any parameters that are automatically passed to the Jinja2 template to enable this grouping.

Has anyone done this before? Or does anyone know exactly what the documentation is referring to?

Thanks in advance!

8 comments
D
M
M

Subject: Dependency Issue Between Two Nodes in Kedro

Hello everyone,

I’m facing an issue regarding dependency management between two Nodes in Kedro, and I’d appreciate your insights 🙂

I have a Node A that is supposed to be a dependency for Node B. However, Node A does not return any data as output, which prevents me from creating an explicit link between these two Nodes in the pipeline. As a result, Node B can execute before Node A, which is not the desired behavior.

My question is: how can I force Kedro to treat Node B as dependent on Node A, even if Node A doesn’t produce any output data? Is there a clean way to define an explicit dependency between two Nodes in this case?

Thanks in advance for your help! 😊

6 comments
D
M

Subject: How to Initialize a Delta Table That Doesn't Exist Using Kedro?
Hello everyone,
I’m facing an issue related to the Delta library. When I attempt to read a Delta table using spark.DeltaTableDataset, I receive a message stating that the table does not exist. This is expected since the table hasn't been created yet. However, my goal is to initialize the table with data that I will subsequently provide.
Unfortunately, the DeltaTableDataset does not support write operations. Does anyone know how to handle the initialization of a Delta table in this scenario?
Currently, I am working on a custom hook using the @hook_impl decorator:

@hook_impl
def before_dataset_loaded(self, dataset_name: str, node: Node) -> None:
    # My logic to initialize the Delta table
The idea is to initialize the Delta table (if it doesn’t already exist) using PySpark within this hook. However, I am struggling to dynamically retrieve the schema of the table for its creation.
If anyone has encountered a similar situation or has insights on how to resolve this, I would greatly appreciate your help!
Thank you in advance for your support!

2 comments
H
M

Hello 🙂
I would like to know if, when generating the Airflow DAG for a Kedro project using the kedro-airflow tool, is it possible to create a separate DAG for each pipeline in the project rather than a single DAG per project? If so, how can I configure each DAG to specify start times and other parameters for each DAG corresponding to each pipeline in the project?

3 comments
M
J

Hello, I would like to know how to manage merge conditions with Kedro for Delta tables. For example: whenMatchedUpdate() ...
https://docs.delta.io/latest/delta-update.html

ISSUE: Deployment of Kedro Pipelines on GCP with Dataproc and Cloud Composer

Description
I am conducting a POC with Kedro on a GCP environment and need assistance deploying my Kedro project in a GCP-compatible format. The goal is to package the Kedro project for execution on Cloud Dataproc clusters.

The intended workflow is as follows:

  1. Create a separate Dataproc cluster for each Kedro pipeline.
  2. Execute the pipelines on their respective Dataproc clusters.
  3. Use Cloud Composer (Airflow) to orchestrate the process.
  4. Data is stored in GCS buckets.

I have not found clear documentation or guidelines on how to structure or deploy Kedro projects for this specific setup. Any guidance or resources to achieve this would be greatly appreciated.

Requirements
  • Package Kedro project for GCP compatibility.
  • Deploy and run pipelines on Dataproc clusters.
  • Orchestrate pipeline execution using Cloud Composer.
  • GCS as the storage location for data.

This request is urgent, as it is critical for the POC success and subsequent project deployment.

3 comments
D
A
M

Hello,
I would like to work with Delta Tables using PySpark in a GCS bucket, but I'm having trouble using spark.DeltaTableDataset:

table_name:
  type: spark.DeltaTableDataset
  filepath: "<a target="_blank" rel="noopener noreferrer" href="gs://XXXX/poc-kedro/table_name/*.parquet">gs://XXXX/poc-kedro/table_name/*.parquet</a>"
Could you tell me what might be wrong with this?
Additionally, could you explain how to specify the credentials for accessing the table with this Dataset?

24 comments
R
M
N

Hi everyone,

I’m a Data Engineer, and my team is working on multiple pipelines, each addressing different use cases (1 use case = 1 pipeline). We have both ingestion pipelines and export pipelines delivering data to various clients.
We’re considering grouping certain nodes into a common library to be shared across these pipelines. I wanted to ask if this is considered a good practice within the Kedro framework. If so, could you recommend an approach or best practices for implementing this?
Additionally, do you have any recommendations for structuring a Kedro project when working with multiple pipelines like this?
Thanks in advance for your help!

Best regards,
El Guendouz Mohamed

3 comments
T
R
M