Hello Kedro Community,
I am working on a project where I need to store a Spark DataFrame in Delta format using Kedro. Specifically, I want to ensure that the data is stored in a specific way, as shown in the following function:
python
def export_results_to_delta(summary_df, output_path="/mnt/success5/Success5_results/metric_changes"): if DeltaTable.isDeltaTable(spark, output_path): DeltaTable.forPath(spark, output_path).alias("target").merge( summary_df.alias("source"), """target.reference_id = source.reference_id AND target.country = source.country AND target.provider_id = source.provider_id AND target.matching_run_id = source.matching_run_id""" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() else: summary_df.write.format("delta").mode("overwrite").partitionBy( "country", "matching_run_id", "provider_id" ).save(output_path)Is it possible to create a catalog entry in Kedro that allows me to store the dataset in this manner? If so, could you please provide an example of how to configure the catalog entry?
hi @Carlos Prieto - Tomtom!
I notice that our kedro_datasets.spark.DeltaTableDataset
is read-only for historical reasons. still, have a look at the code
https://github.com/kedro-org/kedro-plugins/blob/1a5e0f/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py
my advice would be to create your own, with more or less this structure (pseudocode):
class SparkDeltaDataset(AbstractDataset): def save(self, data: spark.DataFrame) -> None: if DeltaTable.isDeltaTable(spark := get_spark(), output_path := self._filepath): ( DeltaTable.forPath(spark, output_path).alias(self._alias).merge( data, self._merge_predicate, ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute() ) else: write_cmd = ( data .write .format("delta") .mode("overwrite") ) if partition_cols := self._partition_cols: write_cmd = write_cmd.partitionBy(*partition_cols) write_cmd.save(output_path)