hey guys, does anyone had this issue before?
AttributeError: 'CustomDataCatalog' object has no attribute '_data_sets'
this is being created by a custom hook you have running, it will be registered in settings.py
I did installed kedro into a conda environment, had a jupyter notebook created on a folder and tried to run:
%load_ext kedro.ipython %reload_kedro ../Thats where the error happend
and strangely enough on the file _hooks.py, it doesn't have any of that:
self.datasets = catalog._data_sets
but from the logs it looks like you have an after_context_created
hook which is trying to access the attribute of this custom catalog class.
Indeed I have some Custom Hooks
class ProjectHooks: def __init__(self): self.client = None self.gcp_project = None self.checked_datasets = [] self.data_quality_report = DataQualityReport() def report_export(self, catalog): minimal_report = self.data_quality_report.create_technical_report(hide_not_tested=True) print("\n--- DATA QUALITY RESULTS ---\n") print(minimal_report) print("----------------------------") detailed_report = self.data_quality_report.create_technical_report() catalog.save("data_quality_report", detailed_report) id = catalog.load("params:job_name") bq_report = self.data_quality_report.create_dataframe_report(id) catalog.save("bq_data_quality_report", bq_report) @property def _logger(self): return logging.getLogger(self.__class__.__name__) @staticmethod def _create_log(msg, run_params, catalog): save_bq_logs = run_params["extra_params"].get("save_bq_logs", False) save_bq_logs = save_bq_logs == True or save_bq_logs == "True" if save_bq_logs: if not isinstance(msg, list): msg = [msg] log = pd.DataFrame({"status": msg, "datetime": [dt.datetime.now()] * len(msg)}) catalog.save("bq_logs", log) @staticmethod def _call_bq_sheets_tables_update(resource_id): now = dt.datetime.now() now = dt.datetime(now.year, now.month, now.day, tzinfo=dt.timezone.utc) # create run transfer_client = bigquery_datatransfer.DataTransferServiceClient() response = transfer_client.schedule_transfer_runs( parent=resource_id, start_time=now, end_time=now ).runs[0] print(f"Google Sheets Tables Update - RUNNING") # wait run to be finished while True: run_response = transfer_client.get_transfer_run(name=response.name) run_status = run_response.state.name if run_status in ['PENDING', 'RUNNING']: time.sleep(5) elif run_status == 'SUCCEEDED': print("Google Sheets Tables Update - DONE") break else: raise NameError(f'********** Error from run_status') @hook_impl def before_pipeline_run(self, run_params, pipeline, catalog) -> None: if catalog.exists("params:save_parameters") and catalog.load("params:save_parameters"): catalog.save("all_parameters", dict(catalog.load("parameters"))) if catalog.exists("params:scheduled_query_resource_id"): self._call_bq_sheets_tables_update(catalog.load("params:scheduled_query_resource_id")) pipeline_name = run_params["pipeline_name"] self._create_log(f"Starting {pipeline_name} pipeline", run_params, catalog) @hook_impl def after_pipeline_run(self, run_params, run_result, pipeline, catalog) -> None: pipeline_name = run_params["pipeline_name"] self._create_log(f"Finished {pipeline_name} pipeline", run_params, catalog) self.report_export(catalog) @hook_impl def on_pipeline_error(self, error, run_params, pipeline, catalog) -> None: pipeline_name = run_params["pipeline_name"] self._create_log(f"Error occurred in {pipeline_name} pipeline.\n{error}", run_params, catalog) self.report_export(catalog) @hook_impl def before_node_run(self, node, catalog, inputs, is_async, session_id): for k, v in inputs.items(): if k == "params:snapshot_date" and isinstance(v, str): inputs[k] = pd.to_datetime(v).date() if isinstance(v, dict): inputs[k] = DefaultMunch.fromDict(v) self._save_bq_backup_tables(catalog, inputs, exclude=self.checked_datasets) apply_data_quality_tests(inputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report) self.checked_datasets.extend(list(inputs.keys())) return inputs @hook_impl def after_node_run(self, node, catalog, outputs, is_async, session_id): self._save_bq_backup_tables(catalog, outputs, exclude=self.checked_datasets) apply_data_quality_tests(outputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report) self.checked_datasets.extend(list(outputs.keys())) return None @staticmethod def _save_bq_backup_tables(catalog, inputs, exclude): if not (catalog.exists("params:save_bq_tables") and catalog.load("params:save_bq_tables")): return inputs = {name: inputs[name] for name in inputs if name not in exclude} for name in inputs: if is_dataset_from_bq(name, catalog): catalog.save(f"bq_backup_{name}", inputs[name])
even when using grep -r "catalog._data_sets" its not possible to locate it
It looks like you’re using the old version of kedro-viz. Cause by default it should access datasets via catalog._datasets
and catalog._data_sets
for Kedro 0.18.x
https://github.com/kedro-org/kedro-viz/blob/65f2c5ac6ee82a5c707d87ed4c277132418c5a2d/package/kedro_viz/integrations/kedro/hooks.py#L30