Building a Kafka playground on AWS — Part 3: Streaming database events and querying with KSQL
This is Part 3 of this blog series and we are now going to make use of the architecture described on Parts 1 and 2 to stream database events to Kafka and consume them through KSQL.
All steps described on this post can be reproduced by deploying my github project called kafka-aws-deployment. This project uses terraform to deploy the architecture to AWS and all of the deployment steps are also detailed on its README file.
First step is to either clone it through git or to download it locally to your machine.
git clone https://github.com/maikelpenz/kafka-aws-deployment
Important: the AWS infrastructure part of this project is not free of cost. Remember to destroy the architecture when you stop working with it.
Introducing two more technologies
Two additional technologies are introduced in this post as they are required for our database use case.
AWS RDS
Our source Postgres database is hosted on a AWS service called AWS RDS. I didn’t touch on RDS previously because it is not actually part of the architecture but a service needed to reproduce our use case.
Debezium
Previously I mentioned a framework responsible for facilitating moving data in and out of Kafka called Kafka Connect. For our use case we are going to use an open-source project that uses Kafka Connect called Debezium. Debezium is a distributed platform for Change Data Capture (CDC) and it works by listening to database changes and pushing them into Kafka. We deploy Debezium as the source image for our Kafka Connect container.
Requirements
These are some requirements that must be fulfilled before we start:
- Create an AWS Account;
- Create an EC2 Key Pair: to enable SSH access to the EC2 instances created through ECS;
- Create a S3 Bucket: To store the deployment artifacts;
- Install Terraform;
- Install the AWS CLI and configure a profile on your machine;
- Install Make (optional but recommended).
Pre-deployment
Before we deploy the architecture to AWS there are a few variables that must be set according to your preference on your local copy of the repository. You will see on each of these files that there are examples of how to fill them up to help you out.
Global variables
File: kafka-aws-deployment\aws-infrastructure\configuration\global-config.tfvars
ECS variables
File: kafka-aws-deployment\aws-infrastructure\configuration\ecs-config.tfvars
MSK variables
File: kafka-aws-deployment\aws-infrastructure\configuration\msk-config.tfvars
RDS variables
File: kafka-aws-deployment\aws-infrastructure\configuration\rds-config.tfvars
Terraform backend variables
File: kafka-aws-deployment\aws-infrastructure\configuration\terraform-backend.tfvars
Deploying the Architecture
There are two ways to deploy the architecture. One is by using make, which is a tool used to put all deployment steps behind a single command or by manually deploying the terraform projects. If you decide to perform a manual deploy please check this section of the github project’s README file, otherwise just keep following the instructions.
The code is split into 4 terraform projects: VPC, MSK, ECS and RDS. To deploy all projects at once please use your command line tool of preference and run the following from the root level of the github project.
make deploykad
This will take between 20 and 30 minutes. Once the deployment finishes let’s verify our services are up. Log into your AWS account and jump into the following services:
- AWS MSK: Check if an active MSK cluster appears as part of the Clusters list
- AWS ECS: A cluster called kad-ecs must be in place with six active services.
- AWS RDS: A RDS Postgres database should be available.
- AWS EC2 (Load Balancer): Go to the EC2 service and choose Load Balancers from the left pane. You should see a Load Balancer named kad-ecs-application-lb. Copy the DNS name - from the table listing load balancers - and on your browser access ports 9000 (Schema Registry UI), 9001(Kafka Connect UI) and 9002(Rest API). Make sure they are all available.
Configuring Kafka to stream database events
With all services up and running it’s time to configure CDC.
1 - Setting up our Postgres database:
Note: the RDS database has been intentionally placed on a public subnet so we can access it from our own machine and reproduce the database inserts. This is not recommended for a production deploy.
Use a database tool of choice to connect to the RDS database. I will use DBEaver for our example.
There is an icon on the left pane (Database Navigator) called “New Database Connection”. Create a new Postgres connection.
Host: the RDS endpoint (found on RDS under Connectivity & security).
E.g: kad-rds.csqffiobkrka.ap-southeast-2.rds.amazonaws.com
Port: 5432
Database: postgres
Username: <as informed on the pre-deployment section>
Password: <as informed on the pre-deployment section>
On DBeaver double click on your connection listed on the Database Navigator. If you see a database named “postgres” you have successfully connected to the database. Choose the top menu SQL Editor > New SQL Editor. We can now write SQL queries to interact with the database.
The only database step for now is to create a table we will use to stream events to Kafka.
CREATE TABLE teams(
team_id integer PRIMARY KEY,
team_name varchar(50),
team_country varchar(50)
)
2 - Creating a Source Connector on Kafka Connect
Use your browser to access the Kafka Connect UI (Load balancer DNS on port 9001) and create a new connector.
Click on the blue New button and select PostgresConnector. Paste the properties as shown below and replace the <placeholder> variables.
name=PostgresConnector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=<username>
database.password=<password>
database.dbname=postgres
tasks.max=1
database.server.name=kadrds
database.port=5432
plugin.name=wal2json_rds
schema.whitelist=public
database.hostname=<RDS endpoint>
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
Side Note: One issue I faced when consuming database events for the first time was that messages weren’t coming in a simple
field:value
format. Debezium by default generates messages with a complex structure (adding extra metadata information) that is not expected from consumers like KSQL. To get around it I added the twotransforms
settings on the bottom of the configuration above to extract only the information I wanted.
3 - Accessing KSQL to query the events
We are going to query KSQL by connecting to the EC2 instance through SSH. First, to identify what instance is running our KSQL container please go to AWS ECS, click on our kad-ecs cluster, select the tab tasks and click on the task id (first column) related to the kad-kafka-ksql:xx task definition.
Find the EC2 instance id property, click on it (this will take you to the EC2 service), and copy the public DNS so we can connect to it.
On my computer I use git bash but feel free to use a client like Putty or any other tool of preference to SSH into your instance. The SSH command requires the Instance DNS (that we just identified by following the steps above) and also the SSH Key name (this is the value set to the variable “ecs_key_name” as part of the ECS variables on the Pre-deployment section).
ssh -i <YOUR EC2 KEY>.pem ec2-user@<EC2 INSTANCE PUBLIC DNS>
Once you are on your instance run docker ps
to list the running containers. Find the container with the confluentinc/cp-ksql-server
image and copy the value from the CONTAINER ID
column. Now access the container in shell mode by running the following:
docker exec -it <CONTAINER ID> /bin/bash
We are now inside the container running KSQL. To access KSQL’s CLI simply run ksql.
Run the command SHOW TOPICS
to list topics that currently exist on Kafka.
You might notice that the listed topics are all Kafka managed and there’s no topic for our teams
table (created earlier). That’s because Debezium fires the topic creation only once we insert a record in the table. Let’s go back to DBeaver (or the database tool you are using) and create our first team.
insert into teams(team_id, team_name, team_country) values (1, 'Gremio', 'Brazil')
If we run the SHOW TOPICS
again you will see we now have a topic for our teams
table.
If you access the Schema Registry UI (port 9000) you will also see that a schema was created for our table.
Back to KSQL, something important to note is that KSQL doesn’t interact with topics but offers abstractions named TABLES and STREAMS. The difference between the two of them is that STREAMS present data in motion, showing all events as they happen and TABLES are a materialized view of the stream of events. To exemplify, consider that on top of a topic that holds banking transactions we have a STREAM presenting all transactions as they happen and a TABLE that shows the current balance per person.
In our case let’s create a STREAM called teams
and point to our Kafka topic. Run the following query on KSQL:
CREATE STREAM teams
WITH (KAFKA_TOPIC='kadrds.public.teams',
VALUE_FORMAT='AVRO');
Time to see data coming through !
Run a KSQL query to list new teams as we create them.
SELECT TEAM_ID, TEAM_NAME, TEAM_COUNTRY FROM TEAMS;
You will notice that no new data is coming through. That’s because it will only capture new events. Leave the query running, go to DBeaver and insert more records.
insert into teams(team_id, team_name, team_country) values (2, 'Internacional', 'Brazil');
insert into teams(team_id, team_name, team_country) values (3, 'Corinthians', 'Brazil');
insert into teams(team_id, team_name, team_country) values (4, 'Flamengo', 'Brazil');
insert into teams(team_id, team_name, team_country) values (5, 'Fortaleza', 'Brazil');
insert into teams(team_id, team_name, team_country) values (6, 'Bahia', 'Brazil');
insert into teams(team_id, team_name, team_country) values (7, 'Manchester City', 'England');
insert into teams(team_id, team_name, team_country) values (8, 'Borussia Dortmund', 'Germany');
If we switch to the KSQL screen again you will new teams are showing up.
Let’s say that we also want to create a live count of the number of teams per country. In this case we create a table based on an aggregated query running on top of our stream. Run the following on KSQL:
CREATE TABLE TEAMS_COUNT_PER_COUNTRY
AS
SELECT COUNT(*) AS COUNT, TEAM_COUNTRY
FROM TEAMS
GROUP BY TEAM_COUNTRY;
Now run the following to consume the table:
SELECT TEAM_COUNTRY, COUNT FROM TEAMS_COUNT_PER_COUNTRY;
Switch to DBeaver and insert more teams.
insert into teams(team_id, team_name, team_country) values (9, 'Galatasaray', 'Turkey');
insert into teams(team_id, team_name, team_country) values (10, 'Juventus', 'Italy');
insert into teams(team_id, team_name, team_country) values (11, 'AFC Ajax', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (12, 'SK Slavia Prague', 'Czech Republic');
insert into teams(team_id, team_name, team_country) values (13, 'FC Basel', 'Switzerland');
insert into teams(team_id, team_name, team_country) values (14, 'Lazio', 'Italy');
insert into teams(team_id, team_name, team_country) values (15, 'Red Bull Salzburg', 'Austria');
insert into teams(team_id, team_name, team_country) values (16, 'Club Brugge', 'Belgium');
insert into teams(team_id, team_name, team_country) values (17, 'Napoli', 'Italy');
insert into teams(team_id, team_name, team_country) values (18, 'PSV Eindhoven', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (19, 'Galatasaray', 'Turkey');
insert into teams(team_id, team_name, team_country) values (20, 'Juventus', 'Italy');
insert into teams(team_id, team_name, team_country) values (21, 'AFC Ajax', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (22, 'SK Slavia Prague', 'Czech Republic');
insert into teams(team_id, team_name, team_country) values (23, 'FC Basel', 'Switzerland');
We can now see on KSQL a live update of number of teams per Country.
There are plenty of KSQL queries, joins and aggregations you can write. To explore and learn more about it I suggest you check out Confluent’s KSQL examples.
4 — Evolving the Schema
Something very important for any data platform is the ability to evolve the schema of our data. We are now going to add a new column to the teams table on Postgres and see what needs to be done downstream.
On DBeaver let’s add a new column called team_stadium_name and insert a new team.
alter table teams
add team_stadium_name varchar(80);insert into teams(team_id, team_name, team_country, team_stadium_name) values (24, 'Chapecoense', 'Brazil', 'Arena Conda');
A new version of the schema is automatically created on Schema Registry. Please access the Schema Registry UI (port 9000) to see the new column.
Unfortunately at the time of writing this blog post KSQL doesn’t offer a way to update a stream or table. The recommendation is to delete and recreate the objects. This may become a problem if you have stream dependencies as they also need to be recreated.
I am going to recreate the table and the stream we created earlier.
drop table TEAMS_COUNT_PER_COUNTRY;drop stream TEAMS;CREATE STREAM teams
WITH (KAFKA_TOPIC='kadrds.public.teams',
VALUE_FORMAT='AVRO');CREATE TABLE TEAMS_COUNT_PER_COUNTRY
AS
SELECT COUNT(*) AS COUNT, TEAM_COUNTRY
FROM TEAMS
GROUP BY TEAM_COUNTRY;
Now we can see the new column on KSQL by inserting records to the database table.
insert into teams(team_id, team_name, team_country, team_stadium_name) values (26, 'Santos', 'Brazil', 'Vila Belmiro');
Conclusion
This concludes my series of posts about running Kafka on AWS and Streaming database events. I would like to reiterate that I built this to help me explore Kafka and that a proper production use case raises operational concerns that haven’t been covered here.
Something I identified while researching about streaming platforms and Kafka is that while there is plenty of good content out there the decision about which architecture to go with will depend entirely on the team you are working with and your specific business requirements. Streaming is a hot topic at the moment, new projects/features are coming up every day and this might change the way you implement your solution*.
*ksqldb for example came up while writing this post :)
Thanks for reading and please leave your feedback !
If you missed parts 1 and 2: