Join the Kedro community

Updated 3 weeks ago

Storing a Spark DataFrame in Delta Format Using Kedro

Hello Kedro Community,
I am working on a project where I need to store a Spark DataFrame in Delta format using Kedro. Specifically, I want to ensure that the data is stored in a specific way, as shown in the following function:

python

def export_results_to_delta(summary_df, output_path="/mnt/success5/Success5_results/metric_changes"):
    if DeltaTable.isDeltaTable(spark, output_path):
        DeltaTable.forPath(spark, output_path).alias("target").merge(
            summary_df.alias("source"),
            """target.reference_id = source.reference_id AND 
               target.country = source.country AND 
               target.provider_id = source.provider_id AND 
               target.matching_run_id = source.matching_run_id"""
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    else:
        summary_df.write.format("delta").mode("overwrite").partitionBy(
            "country", "matching_run_id", "provider_id"
        ).save(output_path)
Is it possible to create a catalog entry in Kedro that allows me to store the dataset in this manner? If so, could you please provide an example of how to configure the catalog entry?
Thank you in advance for your help!

J
1 comment

hi @Carlos Prieto - Tomtom!

I notice that our kedro_datasets.spark.DeltaTableDataset is read-only for historical reasons. still, have a look at the code

https://github.com/kedro-org/kedro-plugins/blob/1a5e0f/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py

my advice would be to create your own, with more or less this structure (pseudocode):

class SparkDeltaDataset(AbstractDataset):
  def save(self, data: spark.DataFrame) -> None:
    if DeltaTable.isDeltaTable(spark := get_spark(), output_path := self._filepath):
      (
        DeltaTable.forPath(spark, output_path).alias(self._alias).merge(
          data,
          self._merge_predicate,
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
      )
    else:
      write_cmd = (
        data
        .write
        .format("delta")
        .mode("overwrite")
      )
      if partition_cols := self._partition_cols:
        write_cmd = write_cmd.partitionBy(*partition_cols)
      write_cmd.save(output_path)

Add a reply
Sign up and join the conversation on Slack