Join the Kedro community

M
M
M
D
M

Using Ibis as a Data Connector for Kedro ETL Pipeline

Hey Everyone, I am writing a basic ETL pipeline using Kedro. My data sources are Postgresql and AWS EFS and the destination Storage is again a SQL backend only.
As per some of the discussions in the forum , I found ibis as the best choice for good data connector as it can connect to many of the backends and thus provides flexibility in switching the backends and ofcourse the pythonic way of writing complex queries.

This is actually regarding ibis , So I have used Ibis to extract data from a postgres backend. For extraction i have used simple functions like .select(), filter(), joins() .

Did some transformations and pushed the transformed data as a csv file to S3.

Now one of the node would like to read data from S3 and push the data into our warehouse which is again an sql backend for this use case. How do we do that now , can i somehow utilise ibis again ?

any headsup guys ?

D
V
J
33 comments

You can read files from cloud storage, including S3, using Ibis. In the current TableDataset implementation, you can do this read from filepath.

(Note that there is a plan to create a separate FileDataset to read and write from files, but I don't have that PR up yet.)

But how do we move the file to a database using ibis ?

What do you mean "move to a database"? Do you want to create a non-temporary table?

So i would like to read a file from s3 into a node and push this data into a sql database

I am looking for insert functionality

Insert into existing table?

document_extraction_ibis:
type: ibis.TableDataset
table_name: document_classification
connection:
backend : postgres
host : ##
port : ##
database : ##
user : ##
password : ##

This is the existing table from which I have read the data and have done the transformations.

Now I would like to save the transformed data into a csv format on S3 for dataset versioning basically.

Once that is done I would like to pull the csv from s3 and save to another database with an existing table . But it is a different database and a different table not the one from where we read it .

To create a new table:

When you load the file using ibis.TableDataset I to your node, you will have an Ibis table object. Return that from the node (don't need to modify), and write to a different ibis.TableDataset

Thanks for your response. I got it now, but i wanted to know using TableDataset can we achieve the below operations -

  1. if this process can append new entries to the existing table in the sql backend.
  2. Also, is there a way we can update existing records in an existing sql table

  1. if this process can append new entries to the existing table in the sql backend.
  2. Also, is there a way we can update existing records in an existing sql table
Not right now, but I was thinking to mention this. We can create an append mode, because Ibis does support insert.

do you think there are some other workarounds for now that I can do to append data and make updates too

You can always modify the dataset (basically, create a custom dataset that copy-pastes the code) to support this functionality. See https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/ibis/table_dataset.py#L180-L181 — what you need is to basically create something like mode parameter for the dataset, and if mode=="append" use insert (https://ibis-project.org/backends/duckdb#ibis.backends.duckdb.Backend.insert)

How soon do you need this? 😅

I'm a little stretched thin, and there's already a different ibis.TableDataset enhancement I've promised to do this weekend, but I could try to address this next week.

Do you think we can implement update queries in sql as well by implementing a custom dataset.

Do you think we can implement update queries in sql as well by implementing a custom dataset.
Can you give an example?

Suppose I have a pandas dataframe which i have created by reading data from sql db and have already done Transformations , it looks like as shown below -

id , col_b , col_c

  1. 100, a , b

You can assume we have many such records in the pandas dataframe.


Now, there exists an sql table in a database with similar schema and some records.


id , col_b , col_c
  1. 100, a , 100


An update query in sql simply updates any given column. In this case, we would like to update the col_c whose current value is 100 since in the pandas dataframe the record with id=100 has a different value in col_c i.e "b" .


An update query should basically update the existing rows in an sql table nothing fancy.

Got it, you're looking for UPSERT functionality, right?

oh yeah you got it now 🙂

So now please tell me if we can do inserts as well as upserts using existing ibis integration with kedro.

Infact I do not see these functionalities with any of the existing data connectors in kedro-datasets

Earlier I was trying to make use of SqlTableDataset and SqlQueryDataset but there also i hit a roadblock.

Existing Ibis integration: No, it will require some code modification. Right now, you can only create table or view + choose to overwrite

Insert: We can definitely enable this in the Kedro dataset code (i.e. I'm happy to put it on my list of things to do and take care of it, can probably get it done by sometime next week). There's no challenge here that I can think of, since Ibis supports insert method already; just need to make sure end up with a reasonable design. 🙂

Upsert: As far as I'm aware, Ibis doesn't support upsert functionality out of the box (I did see mention of this in the H1 roadmap, but don't see work done on it). I am confirming with the team. That said, Ibis supports "escape hatches" to be able to leverage underlying functionality; in this case, the dataset could implement upsert using the raw_sql() method, at least until Ibis natively support upserting.

That said, a raw_sql()-based upsert may or may not work cleanly for all backends (a big part of the value of Ibis is that it unifies the syntax for doing this across different SQL dialects). We can probably figure out a way to unblock you based on the backends you use (Postgres?) in the interim, even if the same may not work perfectly for MS SQL or another backend.

I am going to convert this thread to a GitHub issue on the kedro-plugins repo, as the ideal solution—especially for upsert—will require some work on the Ibis side, and not too many Ibis team members on the Kedro Slack. Does that work? If you can share your GitHub username, I'll tag you in there—or, if you want, feel free to open the issue yourself!

Refer this comment 🙂

Oh nice, and I've linked it before. 😅 Thanks!

Yeaah i went through these discussions and was trying to find out some ways to get my current poc done.

OK, so there's no reason we can't implement UPSERT in Ibis; it's just something we were waiting for user asks for, we definitely have those now, and we have it on the list of things to consider for Ibis Q4 planning. I will push for it's inclusion.

That said, I'm guessing you don't want to wait that long; I can implement an initial version with raw_sql and make sure it works for you (again, shouldn't be hard), if you're down to collaborate on this.

Realistically, creating a fork of the ibis.TableDataset in your project is probably the right way to go. 🙂 That way we can implement some of this functionality quicker, and you also won't have to wait for a new Kedro-Datasets release.

Add a reply
Sign up and join the conversation on Slack
Join