Join the Kedro community

Updated 3 weeks ago

Attribute Error: 'CustomDataCatalog' Object Has No Attribute '_data_sets'

hey guys, does anyone had this issue before?

AttributeError: 'CustomDataCatalog' object has no attribute '_data_sets'

d
T
E
11 comments

this is being created by a custom hook you have running, it will be registered in settings.py

we can't really diagnose what's going on without seeing what you're doing there

I did installed kedro into a conda environment, had a jupyter notebook created on a folder and tried to run:

%load_ext kedro.ipython
%reload_kedro ../
Thats where the error happend

and strangely enough on the file _hooks.py, it doesn't have any of that:

self.datasets = catalog._data_sets

It looks like this project is using a CustomDataCatalog class too

but from the logs it looks like you have an after_context_created hook which is trying to access the attribute of this custom catalog class.

Indeed I have some Custom Hooks

class ProjectHooks:
    def __init__(self):
        self.client = None
        self.gcp_project = None
        self.checked_datasets = []
        self.data_quality_report = DataQualityReport()

    def report_export(self, catalog):
        minimal_report = self.data_quality_report.create_technical_report(hide_not_tested=True)
        print("\n--- DATA QUALITY RESULTS ---\n")
        print(minimal_report)
        print("----------------------------")

        detailed_report = self.data_quality_report.create_technical_report()
        catalog.save("data_quality_report", detailed_report)

        id = catalog.load("params:job_name")
        bq_report = self.data_quality_report.create_dataframe_report(id)
        catalog.save("bq_data_quality_report", bq_report)

    @property
    def _logger(self):
        return logging.getLogger(self.__class__.__name__)

    @staticmethod
    def _create_log(msg, run_params, catalog):
        save_bq_logs = run_params["extra_params"].get("save_bq_logs", False)
        save_bq_logs = save_bq_logs == True or save_bq_logs == "True"
        if save_bq_logs:
            if not isinstance(msg, list):
                msg = [msg]
            log = pd.DataFrame({"status": msg, "datetime": [dt.datetime.now()] * len(msg)})
            catalog.save("bq_logs", log)

    @staticmethod
    def _call_bq_sheets_tables_update(resource_id):
        now = dt.datetime.now()
        now = dt.datetime(now.year, now.month, now.day, tzinfo=dt.timezone.utc)

        # create run
        transfer_client = bigquery_datatransfer.DataTransferServiceClient()
        response = transfer_client.schedule_transfer_runs(
            parent=resource_id,
            start_time=now,
            end_time=now
        ).runs[0]

        print(f"Google Sheets Tables Update - RUNNING")

        # wait run to be finished
        while True:
            run_response = transfer_client.get_transfer_run(name=response.name)
            run_status = run_response.state.name

            if run_status in ['PENDING', 'RUNNING']:
                time.sleep(5)
            elif run_status == 'SUCCEEDED':
                print("Google Sheets Tables Update - DONE")
                break
            else:
                raise NameError(f'********** Error from run_status')

    @hook_impl
    def before_pipeline_run(self, run_params, pipeline, catalog) -> None:
        if catalog.exists("params:save_parameters") and catalog.load("params:save_parameters"):
            catalog.save("all_parameters", dict(catalog.load("parameters")))
        if catalog.exists("params:scheduled_query_resource_id"):
            self._call_bq_sheets_tables_update(catalog.load("params:scheduled_query_resource_id"))
        pipeline_name = run_params["pipeline_name"]
        self._create_log(f"Starting {pipeline_name} pipeline", run_params, catalog)

    @hook_impl
    def after_pipeline_run(self, run_params, run_result, pipeline, catalog) -> None:
        pipeline_name = run_params["pipeline_name"]
        self._create_log(f"Finished {pipeline_name} pipeline", run_params, catalog)
        self.report_export(catalog)

    @hook_impl
    def on_pipeline_error(self, error, run_params, pipeline, catalog) -> None:
        pipeline_name = run_params["pipeline_name"]
        self._create_log(f"Error occurred in {pipeline_name} pipeline.\n{error}", run_params, catalog)
        self.report_export(catalog)

    @hook_impl
    def before_node_run(self, node, catalog, inputs, is_async, session_id):
        for k, v in inputs.items():
            if k == "params:snapshot_date" and isinstance(v, str):
                inputs[k] = pd.to_datetime(v).date()
            if isinstance(v, dict):
                inputs[k] = DefaultMunch.fromDict(v)

        self._save_bq_backup_tables(catalog, inputs, exclude=self.checked_datasets)
        apply_data_quality_tests(inputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report)
        self.checked_datasets.extend(list(inputs.keys()))
        return inputs

    @hook_impl
    def after_node_run(self, node, catalog, outputs, is_async, session_id):
        self._save_bq_backup_tables(catalog, outputs, exclude=self.checked_datasets)
        apply_data_quality_tests(outputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report)
        self.checked_datasets.extend(list(outputs.keys()))
        return None

    @staticmethod
    def _save_bq_backup_tables(catalog, inputs, exclude):
        if not (catalog.exists("params:save_bq_tables") and catalog.load("params:save_bq_tables")):
            return

        inputs = {name: inputs[name] for name in inputs if name not in exclude}
        for name in inputs:
            if is_dataset_from_bq(name, catalog):
                catalog.save(f"bq_backup_{name}", inputs[name])

but none of them do call that attribute...

even when using grep -r "catalog._data_sets" its not possible to locate it

It looks like you’re using the old version of kedro-viz. Cause by default it should access datasets via catalog._datasets and catalog._data_sets for Kedro 0.18.x
https://github.com/kedro-org/kedro-viz/blob/65f2c5ac6ee82a5c707d87ed4c277132418c5a2d/package/kedro_viz/integrations/kedro/hooks.py#L30

I'll try using a new version

it did work @Elena Khaustova appreciate the help

Add a reply
Sign up and join the conversation on Slack