In this lesson we will:
- Explore connecting ClickHouse to Kafka in order to ingest real-time streaming data.
Introduction
Because ClickHouse is so fast, it is common to use it for storing and analysing high volumes of event based data such as clickstream, logs or IOT data. In this situation, you would often be sourcing this data from Kafka, which is the leading event streaming platform being used today. For this reason, ClickHouse has developed a strong offering for integration with Kafka.
What Is Kafka
Kafka is Kafka is the leading open source platform for real time event streaming. It allows us to transfer data from source to destination in a highly performant, scalable and reliable way.
Ensemble also host a full training course on Kafka.
Kafka Table Engine
The most common thing data engineers need to do is to subscribe to data which is being published onto Kafka topics, and consume it directly into a ClickHouse table.
The first step in this process is to use a table which is backed by the Kafka table engine. At the time of creation, we will need to specify details about the Kafka connection, including the broker URL, the topic and the consumer group name.
create table orders_queue ( order_id integer, description varchar ) ENGINE=Kafka()
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'new_pizza_orders',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow'
This command will create a table that is listening to a topic on the Kafka broker which is running on our training virtual machine.
Let's test it at this stage. In a new tmux pane we can start the Kafka console producer to send a test message:
cd /kafka_2.13-3.0.0
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic new_pizza_orders
And publish the following message:
{ "order_id" : 321, "description" : "margherita pizza" }
If we then go back to our ClickHouse client and query the table:
select * from orders_queue;
We should see the record has been ingested into ClickHouse directly from Kafka:
__order_id___description_______
_ 321 _ margherita pizza _
_______________________________
Destination Table
The Kafka table engine backing this table is not appropriate for long term storage. In fact, if we were to query the view a second time, the row above would not show because it is only intended to be read once.
For this reason a second step is needed to take data from this Kafka table and place it into longer term storage.
We typically do this by creating a destination table, and using a ClickHouse materialised view to populate that table as new data streams in. The destination table can be created like so:
create table orders( order_id integer, description varchar )
And the final step is to move data from the Kafka queue table to the destination table using the materialised view:
create materialized view orders_mv
POPULATE into orders as
select order_id, description from orders_queue;
Because ClickHouse materialised views are actually insert triggers, this ensures that the logic is executed for each record inserted into the underlying orders table.
We can test this end to end process by inserting a new row into our Kafka console producer:
{ "order_id" : 321, "description" : "margherita pizza" }
And selecting from our table:
select * from orders;
Which should show that both rows have been streamed in.
Configuring Ingestion properties
There are some choices to be made around how Kafka ingests the data. Generally, ingesting in batches is more efficient, but this leads to delays. How you configure Kafka therefore depends on the particular requirements of your users.
create table xxx