Building a Kafka playground on AWS — Part 3: Streaming database events and querying with KSQL

Maikel Penz
11 min readMar 2, 2020

This is Part 3 of this blog series and we are now going to make use of the architecture described on Parts 1 and 2 to stream database events to Kafka and consume them through KSQL.

All steps described on this post can be reproduced by deploying my github project called kafka-aws-deployment. This project uses terraform to deploy the architecture to AWS and all of the deployment steps are also detailed on its README file.

First step is to either clone it through git or to download it locally to your machine.

git clone https://github.com/maikelpenz/kafka-aws-deployment

Important: the AWS infrastructure part of this project is not free of cost. Remember to destroy the architecture when you stop working with it.

Introducing two more technologies

Two additional technologies are introduced in this post as they are required for our database use case.

AWS RDS

Our source Postgres database is hosted on a AWS service called AWS RDS. I didn’t touch on RDS previously because it is not actually part of the architecture but a service needed to reproduce our use case.

Debezium

Previously I mentioned a framework responsible for facilitating moving data in and out of Kafka called Kafka Connect. For our use case we are going to use an open-source project that uses Kafka Connect called Debezium. Debezium is a distributed platform for Change Data Capture (CDC) and it works by listening to database changes and pushing them into Kafka. We deploy Debezium as the source image for our Kafka Connect container.

Requirements

These are some requirements that must be fulfilled before we start:

Pre-deployment

Before we deploy the architecture to AWS there are a few variables that must be set according to your preference on your local copy of the repository. You will see on each of these files that there are examples of how to fill them up to help you out.

Global variables
File: kafka-aws-deployment\aws-infrastructure\configuration\global-config.tfvars

Global Variables

ECS variables
File: kafka-aws-deployment\aws-infrastructure\configuration\ecs-config.tfvars

ECS Variables

MSK variables
File: kafka-aws-deployment\aws-infrastructure\configuration\msk-config.tfvars

MSK Variables

RDS variables
File: kafka-aws-deployment\aws-infrastructure\configuration\rds-config.tfvars

RDS Variables

Terraform backend variables
File: kafka-aws-deployment\aws-infrastructure\configuration\terraform-backend.tfvars

Terraform Backend Variables

Deploying the Architecture

There are two ways to deploy the architecture. One is by using make, which is a tool used to put all deployment steps behind a single command or by manually deploying the terraform projects. If you decide to perform a manual deploy please check this section of the github project’s README file, otherwise just keep following the instructions.

The code is split into 4 terraform projects: VPC, MSK, ECS and RDS. To deploy all projects at once please use your command line tool of preference and run the following from the root level of the github project.

make deploykad

This will take between 20 and 30 minutes. Once the deployment finishes let’s verify our services are up. Log into your AWS account and jump into the following services:

  • AWS MSK: Check if an active MSK cluster appears as part of the Clusters list
MSK Cluster
  • AWS ECS: A cluster called kad-ecs must be in place with six active services.
ECS Cluster
  • AWS RDS: A RDS Postgres database should be available.
RDS Database
  • AWS EC2 (Load Balancer): Go to the EC2 service and choose Load Balancers from the left pane. You should see a Load Balancer named kad-ecs-application-lb. Copy the DNS name - from the table listing load balancers - and on your browser access ports 9000 (Schema Registry UI), 9001(Kafka Connect UI) and 9002(Rest API). Make sure they are all available.
Load Balancer

http://<DNS NAME>:9000/

http://<DNS NAME>:9001/

http://<DNS NAME>:9002/

Configuring Kafka to stream database events

With all services up and running it’s time to configure CDC.

1 - Setting up our Postgres database:

Note: the RDS database has been intentionally placed on a public subnet so we can access it from our own machine and reproduce the database inserts. This is not recommended for a production deploy.

Use a database tool of choice to connect to the RDS database. I will use DBEaver for our example.

There is an icon on the left pane (Database Navigator) called “New Database Connection”. Create a new Postgres connection.

Host: the RDS endpoint (found on RDS under Connectivity & security).
E.g: kad-rds.csqffiobkrka.ap-southeast-2.rds.amazonaws.com
Port: 5432
Database: postgres
Username: <as informed on the pre-deployment section>
Password: <as informed on the pre-deployment section>

On DBeaver double click on your connection listed on the Database Navigator. If you see a database named “postgres” you have successfully connected to the database. Choose the top menu SQL Editor > New SQL Editor. We can now write SQL queries to interact with the database.

The only database step for now is to create a table we will use to stream events to Kafka.

CREATE TABLE teams(
team_id integer PRIMARY KEY,
team_name varchar(50),
team_country varchar(50)
)

2 - Creating a Source Connector on Kafka Connect

Use your browser to access the Kafka Connect UI (Load balancer DNS on port 9001) and create a new connector.

Click on the blue New button and select PostgresConnector. Paste the properties as shown below and replace the <placeholder> variables.

name=PostgresConnector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=<username>
database.password=<password>
database.dbname=postgres
tasks.max=1
database.server.name=kadrds
database.port=5432
plugin.name=wal2json_rds
schema.whitelist=public
database.hostname=<RDS endpoint>
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope

Side Note: One issue I faced when consuming database events for the first time was that messages weren’t coming in a simple field:value format. Debezium by default generates messages with a complex structure (adding extra metadata information) that is not expected from consumers like KSQL. To get around it I added the twotransforms settings on the bottom of the configuration above to extract only the information I wanted.

3 - Accessing KSQL to query the events

We are going to query KSQL by connecting to the EC2 instance through SSH. First, to identify what instance is running our KSQL container please go to AWS ECS, click on our kad-ecs cluster, select the tab tasks and click on the task id (first column) related to the kad-kafka-ksql:xx task definition.

Identifying the ECS Task ID for KSQL

Find the EC2 instance id property, click on it (this will take you to the EC2 service), and copy the public DNS so we can connect to it.

ECS Task Instance
Finding Instance’s public DNS

On my computer I use git bash but feel free to use a client like Putty or any other tool of preference to SSH into your instance. The SSH command requires the Instance DNS (that we just identified by following the steps above) and also the SSH Key name (this is the value set to the variable “ecs_key_name” as part of the ECS variables on the Pre-deployment section).

ssh -i <YOUR EC2 KEY>.pem ec2-user@<EC2 INSTANCE PUBLIC DNS>

Once you are on your instance run docker ps to list the running containers. Find the container with the confluentinc/cp-ksql-server image and copy the value from the CONTAINER ID column. Now access the container in shell mode by running the following:

docker exec -it <CONTAINER ID> /bin/bash

We are now inside the container running KSQL. To access KSQL’s CLI simply run ksql.

KSQL Cli

Run the command SHOW TOPICS to list topics that currently exist on Kafka.

You might notice that the listed topics are all Kafka managed and there’s no topic for our teams table (created earlier). That’s because Debezium fires the topic creation only once we insert a record in the table. Let’s go back to DBeaver (or the database tool you are using) and create our first team.

insert into teams(team_id, team_name, team_country) values (1, 'Gremio', 'Brazil')

If we run the SHOW TOPICS again you will see we now have a topic for our teamstable.

If you access the Schema Registry UI (port 9000) you will also see that a schema was created for our table.

Back to KSQL, something important to note is that KSQL doesn’t interact with topics but offers abstractions named TABLES and STREAMS. The difference between the two of them is that STREAMS present data in motion, showing all events as they happen and TABLES are a materialized view of the stream of events. To exemplify, consider that on top of a topic that holds banking transactions we have a STREAM presenting all transactions as they happen and a TABLE that shows the current balance per person.

In our case let’s create a STREAM called teams and point to our Kafka topic. Run the following query on KSQL:

CREATE STREAM teams
WITH (KAFKA_TOPIC='kadrds.public.teams',
VALUE_FORMAT='AVRO');

Time to see data coming through !
Run a KSQL query to list new teams as we create them.

SELECT TEAM_ID, TEAM_NAME, TEAM_COUNTRY FROM TEAMS;
KSQL Stream query

You will notice that no new data is coming through. That’s because it will only capture new events. Leave the query running, go to DBeaver and insert more records.

insert into teams(team_id, team_name, team_country) values (2, 'Internacional', 'Brazil');
insert into teams(team_id, team_name, team_country) values (3, 'Corinthians', 'Brazil');
insert into teams(team_id, team_name, team_country) values (4, 'Flamengo', 'Brazil');
insert into teams(team_id, team_name, team_country) values (5, 'Fortaleza', 'Brazil');
insert into teams(team_id, team_name, team_country) values (6, 'Bahia', 'Brazil');
insert into teams(team_id, team_name, team_country) values (7, 'Manchester City', 'England');
insert into teams(team_id, team_name, team_country) values (8, 'Borussia Dortmund', 'Germany');

If we switch to the KSQL screen again you will new teams are showing up.

Stream capturing events

Let’s say that we also want to create a live count of the number of teams per country. In this case we create a table based on an aggregated query running on top of our stream. Run the following on KSQL:

CREATE TABLE TEAMS_COUNT_PER_COUNTRY 
AS
SELECT COUNT(*) AS COUNT, TEAM_COUNTRY
FROM TEAMS
GROUP BY TEAM_COUNTRY;

Now run the following to consume the table:

SELECT TEAM_COUNTRY, COUNT FROM TEAMS_COUNT_PER_COUNTRY;
Consuming aggregation

Switch to DBeaver and insert more teams.

insert into teams(team_id, team_name, team_country) values (9, 'Galatasaray', 'Turkey');
insert into teams(team_id, team_name, team_country) values (10, 'Juventus', 'Italy');
insert into teams(team_id, team_name, team_country) values (11, 'AFC Ajax', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (12, 'SK Slavia Prague', 'Czech Republic');
insert into teams(team_id, team_name, team_country) values (13, 'FC Basel', 'Switzerland');
insert into teams(team_id, team_name, team_country) values (14, 'Lazio', 'Italy');
insert into teams(team_id, team_name, team_country) values (15, 'Red Bull Salzburg', 'Austria');
insert into teams(team_id, team_name, team_country) values (16, 'Club Brugge', 'Belgium');
insert into teams(team_id, team_name, team_country) values (17, 'Napoli', 'Italy');
insert into teams(team_id, team_name, team_country) values (18, 'PSV Eindhoven', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (19, 'Galatasaray', 'Turkey');
insert into teams(team_id, team_name, team_country) values (20, 'Juventus', 'Italy');
insert into teams(team_id, team_name, team_country) values (21, 'AFC Ajax', 'Netherlands');
insert into teams(team_id, team_name, team_country) values (22, 'SK Slavia Prague', 'Czech Republic');
insert into teams(team_id, team_name, team_country) values (23, 'FC Basel', 'Switzerland');

We can now see on KSQL a live update of number of teams per Country.

Aggregated list of teams per country

There are plenty of KSQL queries, joins and aggregations you can write. To explore and learn more about it I suggest you check out Confluent’s KSQL examples.

4 — Evolving the Schema

Something very important for any data platform is the ability to evolve the schema of our data. We are now going to add a new column to the teams table on Postgres and see what needs to be done downstream.

On DBeaver let’s add a new column called team_stadium_name and insert a new team.

alter table teams
add team_stadium_name varchar(80);
insert into teams(team_id, team_name, team_country, team_stadium_name) values (24, 'Chapecoense', 'Brazil', 'Arena Conda');

A new version of the schema is automatically created on Schema Registry. Please access the Schema Registry UI (port 9000) to see the new column.

New column automatically added to Schema Registry

Unfortunately at the time of writing this blog post KSQL doesn’t offer a way to update a stream or table. The recommendation is to delete and recreate the objects. This may become a problem if you have stream dependencies as they also need to be recreated.

I am going to recreate the table and the stream we created earlier.

drop table TEAMS_COUNT_PER_COUNTRY;drop stream TEAMS;CREATE STREAM teams
WITH (KAFKA_TOPIC='kadrds.public.teams',
VALUE_FORMAT='AVRO');
CREATE TABLE TEAMS_COUNT_PER_COUNTRY
AS
SELECT COUNT(*) AS COUNT, TEAM_COUNTRY
FROM TEAMS
GROUP BY TEAM_COUNTRY;

Now we can see the new column on KSQL by inserting records to the database table.

insert into teams(team_id, team_name, team_country, team_stadium_name) values (26, 'Santos', 'Brazil', 'Vila Belmiro');
New column being listed

Conclusion

This concludes my series of posts about running Kafka on AWS and Streaming database events. I would like to reiterate that I built this to help me explore Kafka and that a proper production use case raises operational concerns that haven’t been covered here.

Something I identified while researching about streaming platforms and Kafka is that while there is plenty of good content out there the decision about which architecture to go with will depend entirely on the team you are working with and your specific business requirements. Streaming is a hot topic at the moment, new projects/features are coming up every day and this might change the way you implement your solution*.

*ksqldb for example came up while writing this post :)

Thanks for reading and please leave your feedback !

If you missed parts 1 and 2:

--

--

Maikel Penz

Data Engineering Team Lead @ Plexure | AWS Certified Solution Architect | LinkedIn: https://www.linkedin.com/in/maikel-alexsander-penz/