Join the Kedro community

Updated 2 months ago

Defining a Default BigQuery Dataset Project-Wide

When using BigQuery Datasets how do you define a default dataset project wide?

d
L
J
12 comments

So you can define defaults this way
https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html

But I would encourage Ibis these days over any of the pandas datadets when working with sql

Are you referring to dataset in this case as a Kedro or BigQuery concept? If you like to use a default BigQuery dataset project wide, you can set that via a global.

# Globals.yml
bq_dataset: dataset_name_here

# Catalog.yaml
dataset:
  type: YouBigQuereBigQueryTableDatasetHere
  dataset: ${globals:bq_dataset}
  table: nodes
  filepath: ${globals:paths.int}/rtx_kg2/nodes
  save_args:
    mode: overwrite
    labels:
      kg: rtx_kg2
      git_sha: ${globals:git_sha}

We have a custom implementation of a BigQuery dataset that uses Spark under the hood, but registers the table in BigQuery as an external dataset for interactive analysis. Happy to share if that helps you.

Thank you Laurens, that's what I was looking for!

class BigQueryTableDataset(SparkDataset):
    """Implementation fo a BigQueryTableDataset.

    The class delegates dataset save and load invocations to the native SparkDataset
    and registers the dataset into BigQuery through External Data Configuration.
    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        filepath: str,
        project_id: str,
        dataset: str,
        table: str,
        identifier: str,
        file_format: str,
        load_args: dict[str, Any] = None,
        save_args: dict[str, Any] = None,
        version: Version = None,
        credentials: dict[str, Any] = None,
        metadata: dict[str, Any] = None,
        **kwargs,
    ) -> None:
        """Creates a new instance of ``BigQueryTableDataset``.

        Args:
            project_id: project identifier.
            dataset: Name of the BigQuery dataset.
            table: name of the table.
            identifier: unique identfier of the table.
            file_format: file format to use
            load_args: Arguments to pass to the load method.
            save_args: Arguments to pass to the save
            version: Version of the dataset.
            credentials: Credentials to connect to the Neo4J instance.
            metadata: Metadata to pass to neo4j connector.
            kwargs: Keyword Args passed to parent.
        """
        self._project_id = project_id
        self._path = filepath
        self._format = file_format
        self._labels = save_args.pop("labels", {})

        self._table = self._sanitize_name(f"{table}_{identifier}")
        self._dataset_id = f"{self._project_id}.{self._sanitize_name(dataset)}"

        self._client = bigquery.Client(project=self._project_id)

        super().__init__(
            filepath=filepath,
            file_format=file_format,
            save_args=save_args,
            load_args=load_args,
            credentials=credentials,
            version=version,
            metadata=metadata,
            **kwargs,
        )

    def _load(self) -> Any:
        SparkHooks._initialize_spark()
        return super()._load()

    def _save(self, data: DataFrame) -> None:
        # Invoke saving of the underlying spark dataset
        super()._save(data)

        # Ensure dataset exists
        self._create_dataset()

        # Create external table
        external_config = bigquery.ExternalConfig(self._format.upper())
        external_config.source_uris = [f"{self._path}/*.{self._format}"]

        # Register the external table
        table = bigquery.Table(f"{self._dataset_id}.{self._table}")
        table.labels = self._labels
        table.external_data_configuration = external_config
        table = self._client.create_table(table, exists_ok=False)

    def _create_dataset(self) -> str:
        try:
            self._client.get_dataset(self._dataset_id)
            print(f"Dataset {self._dataset_id} already exists")
        except exceptions.NotFound:
            print(f"Dataset {self._dataset_id} is not found, will attempt creating it now.")

            # Dataset doesn't exist, so let's create it
            dataset = bigquery.Dataset(self._dataset_id)
            # dataset.location = "US"  # Specify the location, e.g., "US" or "EU"

            dataset = self._client.create_dataset(dataset, timeout=30)
            print(f"Created dataset {self._project_id}.{dataset.dataset_id}")

    @staticmethod
    def _sanitize_name(name: str) -> str:
        """Function to sanitise BigQuery table or dataset identifiers.

        Args:
            name: str
        Returns:
            Sanitized name
        """
        return re.sub(r"[^a-zA-Z0-9_]", "_", str(name))

This is an implementation that loads and saves the table as a plain dataset, but registers in BigQuery for exploratory analysis (which is more cost effective than storing in BQ directly depending on your data access patterns)

What is the advantage of ibis? Since there are no bigquery specific ibis datasets in kedro, I am assuming you just configure the ibis.TableDataset?

Yeah ibis generates sql behind the scenes so there is minimal infrastructure overhead and also the same code works with 20+ alternative backends

I also like the idea of using BigQuery in prod, but duckdb in dev

yeah I think that's quite cool and I think it kinda depends on ur pipeline setup. We have a base env that uses Spark datasets all over the place, and then a cloud env that uses the dataset above to move some stuff to BigQuery (so we don't really use BQ as a query engine).

On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDataset

On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDataset
This is another argument for Ibis, you'll have to write a custom dataset to do that

Add a reply
Sign up and join the conversation on Slack