When using BigQuery Datasets how do you define a default dataset project wide?
So you can define defaults this way
https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html
But I would encourage Ibis these days over any of the pandas datadets when working with sql
Are you referring to dataset
in this case as a Kedro or BigQuery concept? If you like to use a default BigQuery dataset project wide, you can set that via a global.
# Globals.yml bq_dataset: dataset_name_here
# Catalog.yaml dataset: type: YouBigQuereBigQueryTableDatasetHere dataset: ${globals:bq_dataset} table: nodes filepath: ${globals:paths.int}/rtx_kg2/nodes save_args: mode: overwrite labels: kg: rtx_kg2 git_sha: ${globals:git_sha}
class BigQueryTableDataset(SparkDataset): """Implementation fo a BigQueryTableDataset. The class delegates dataset save and load invocations to the native SparkDataset and registers the dataset into BigQuery through External Data Configuration. """ def __init__( # noqa: PLR0913 self, *, filepath: str, project_id: str, dataset: str, table: str, identifier: str, file_format: str, load_args: dict[str, Any] = None, save_args: dict[str, Any] = None, version: Version = None, credentials: dict[str, Any] = None, metadata: dict[str, Any] = None, **kwargs, ) -> None: """Creates a new instance of ``BigQueryTableDataset``. Args: project_id: project identifier. dataset: Name of the BigQuery dataset. table: name of the table. identifier: unique identfier of the table. file_format: file format to use load_args: Arguments to pass to the load method. save_args: Arguments to pass to the save version: Version of the dataset. credentials: Credentials to connect to the Neo4J instance. metadata: Metadata to pass to neo4j connector. kwargs: Keyword Args passed to parent. """ self._project_id = project_id self._path = filepath self._format = file_format self._labels = save_args.pop("labels", {}) self._table = self._sanitize_name(f"{table}_{identifier}") self._dataset_id = f"{self._project_id}.{self._sanitize_name(dataset)}" self._client = bigquery.Client(project=self._project_id) super().__init__( filepath=filepath, file_format=file_format, save_args=save_args, load_args=load_args, credentials=credentials, version=version, metadata=metadata, **kwargs, ) def _load(self) -> Any: SparkHooks._initialize_spark() return super()._load() def _save(self, data: DataFrame) -> None: # Invoke saving of the underlying spark dataset super()._save(data) # Ensure dataset exists self._create_dataset() # Create external table external_config = bigquery.ExternalConfig(self._format.upper()) external_config.source_uris = [f"{self._path}/*.{self._format}"] # Register the external table table = bigquery.Table(f"{self._dataset_id}.{self._table}") table.labels = self._labels table.external_data_configuration = external_config table = self._client.create_table(table, exists_ok=False) def _create_dataset(self) -> str: try: self._client.get_dataset(self._dataset_id) print(f"Dataset {self._dataset_id} already exists") except exceptions.NotFound: print(f"Dataset {self._dataset_id} is not found, will attempt creating it now.") # Dataset doesn't exist, so let's create it dataset = bigquery.Dataset(self._dataset_id) # dataset.location = "US" # Specify the location, e.g., "US" or "EU" dataset = self._client.create_dataset(dataset, timeout=30) print(f"Created dataset {self._project_id}.{dataset.dataset_id}") @staticmethod def _sanitize_name(name: str) -> str: """Function to sanitise BigQuery table or dataset identifiers. Args: name: str Returns: Sanitized name """ return re.sub(r"[^a-zA-Z0-9_]", "_", str(name))
This is an implementation that loads and saves the table as a plain dataset, but registers in BigQuery for exploratory analysis (which is more cost effective than storing in BQ directly depending on your data access patterns)
What is the advantage of ibis? Since there are no bigquery specific ibis datasets in kedro, I am assuming you just configure the ibis.TableDataset?
Yeah ibis generates sql behind the scenes so there is minimal infrastructure overhead and also the same code works with 20+ alternative backends
yeah I think that's quite cool and I think it kinda depends on ur pipeline setup. We have a base env
that uses Spark datasets all over the place, and then a cloud
env that uses the dataset above to move some stuff to BigQuery (so we don't really use BQ as a query engine).
On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDataset
On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDatasetThis is another argument for Ibis, you'll have to write a custom dataset to do that