Connecting ClickHouse® to Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used by millions around the world. In this section we’ll cover how to connect a ClickHouse® cluster to a Kafka server. With the connection configured, you can subscribe to topics on the Kafka server, store messages from the Kafka server in a ClickHouse database, then use the power of ClickHouse for real-time analytics against those messages.
ClickHouse has a Kafka table engine that lets you tie Kafka and ClickHouse together. As you would expect, the Altinity Cloud Manager (ACM) makes it extremely easy to configure the Kafka table engine.
Setting up your Kafka connection
To set up the connection between your Kafka server and your ClickHouse cluster, go to the cluster view and click the Kafka Connection Setup menu item on the TOOLS menu:
Figure 1 - The Kafka Connection Setup menu item
This takes you to the Kafka Connection Setup dialog. To start, fill in the broker string and the topic name you want to connect to this ClickHouse cluster:
Figure 2 - The Kafka Connection Setup dialog
The broker string is one or more hostnames with an optional port number for each, separated by commas. The example in Figure 2 is a wide-open, locally hosted Kafka server with no authentication whatsoever. Useful for testing, but not a good idea in the real world. You’ll probably need something like this:
Figure 3 - The Kafka Connection Setup dialog with authentication parameters
The section configuring your Kafka connection below has all the details on adding parameters.
When you’ve entered all the values you need, click CHECK to test the connection to the broker and topic. If all goes well, you’ll see something like this:
Figure 4 - A successful connection
On the other hand, if there’s any problem with the connection, you’ll see an error message:
Figure 5 - A connection error
Obviously you’ll need to fix the error before you continue.
The Config name field at the bottom is the name of the ClickHouse configuration file that will be stored in your ClickHouse cluster. In this example, the configuration is named retail_data
, so the XML here will be stored in the file config.d/retail_data.xml
. In that configuration file, the Config name becomes an XML element inside a <named_collection>
element. The example in Figure 2 above creates an element named <retail_data>
, as shown in Figure 4. That means the config name must be a valid XML element name. If not, you’ll get an error message:
Figure 6 - An invalid connection name
We won’t go into the details here, but an XML element name should start with a letter or an underscore. We also recommend that you use underscores instead of hyphens; that will make things simpler when you use this configuration to create a table with the Kafka table engine. (In other words, use retail_data
instead of retail-data
.)
We don’t recommend it, but once the configuration is created, you can edit the XML as needed from the Server Settings list:
Figure 7 - The XML configuration file in the list of server settings
See the section Working with server settings for all the details on modifying server settings. Be aware that the Kafka Connection Setup dialog overwrites a configuration file if it exists. We recommend that you only work with the configuration files through the Kafka Connection Setup dialog.
Those are the basics of creating a Kafka connection, but you’ll almost certainly need to add configuration parameters to the connection as well. Read on….
Configuring your Kafka connection
Depending on how your Kafka server is set up, there are a number of other connection parameters you need to set. There are preset groups of options for common Kafka providers, as well as custom options that let you define the value for any parameter you need. We’ll go through those now.
In this section we’re configuring the details of ClickHouse’s connection to Kafka. That includes details such as the URL of the Kafka server, the name of the topic, and any authentication parameters you need. Those parameters can go in your Kafka configuration file. There are other parameters that configure a ClickHouse table that uses the Kafka engine; those are not part of a Kafka configuration file. (More on the Kafka table engine in a minute.)
A word about certificates
If the connection to your Kafka server requires certificates, you need to go through a couple of steps:
- Create a new file in the
config.d
directory. To do that, create a new setting with the name of your certificate, then paste the value of the certificate in the text box. Here’s an example for the fileservice.cert
:
Figure 8 - Creating a certificate file in theconfig.d
directory
See the section Working with server settings for all the details on modifying server settings. - Once you’ve created the settings for all the certificates you need, you can specify the locations of those certificates. The certificates are in the directory
/etc/clickhouse-server/config.d
. Forservice.cert
, the location is/etc/clickhouse-server/config.d/service.cert
.
Preset option groups
There are several groups of preset options available under the ADD PRESET button:
Figure 9 - The Kafka Preset options menu
Each set of options is targeted for a specific platform or Kafka deployment type, but check with your Kafka provider to see which parameters you need. In addition to the parameters added for you automatically, you can add your own custom parameters if needed.
Amazon MSK parameters
If your Kafka server is hosted by Amazon Managed Streaming for Apache Kafka (MSK), three parameters are added to the dialog:
security.protocol
: Available options areplaintext
,ssl
,sasl_plaintext
, orsasl_ssl
.sasl.username
andsasl.password
- Your SASL username and password
The parameters will look like this:
Figure 10 - Amazon MSK parameters
Values with a down arrow icon let you select from a list of values; other value fields let you type whatever you need.
You also need to create a VPC connection to your MSK service. See the page Amazon VPC endpoint for Amazon MSK for complete details on creating the VPC connection.
SASL/SCRAM parameters
security.protocol
: Available options areplaintext
,ssl
,sasl_plaintext
, orsasl_ssl
.sasl.mechanism
: Available options areGSSAPI
,PLAIN
,SCRAM-SHA-256
,SCRAM-SHA-512
, orOAUTHBEARER
.sasl.username
andsasl.password
- Your SASL username and password
The parameters will look like this:
Figure 11 - SASL/SCRAM parameters
Inline Kafka certificates parameters
ssl.key.pem
: The path to yourkey.pem
file.ssl.certificate.pem
: The path to yourcertificate.pem
file.
See the section A word about certificates above for details on working with certificates.
The parameters will look like this:
Figure 12 - Inline Kafka certificates parameters
Kerberos parameters
security.protocol
: Available options areplaintext
,ssl
,sasl_plaintext
, orsasl_ssl
.sasl.kerberos.keytab
: The path to your.keytab
file. See the section A word about certificates above for details on working with certificates.sasl.kerberos.principal
: The name of your Kerberos principal.
The parameters will look like this:
Figure 13 - Kerberos parameters
Confluent Cloud parameters
The Confluent documentation site has the details of all the Confluent parameters. The ones provided for you automatically are:
security.protocol
: Available options areplaintext
,ssl
,sasl_plaintext
, orsasl_ssl
.sasl.mechanism
: Available options areGSSAPI
,PLAIN
,SCRAM-SHA-256
,SCRAM-SHA-512
, orOAUTHBEARER
.sasl.username
andsasl.password
: Your SASL username and passwordauto.offset.reset
: Confluent supports three predefined values:smallest
,latest
, andnone
, although you may not need this parameter at all. Any other value throws an exception. See the auto.offset.reset documentation for all the details.ssl.endpoint.identification.algorithm
:https
is the only option supported. This is typically used only with older servers; you can click the trash can icon to delete it if you don’t need it.ssl.ca.location:
probe
is the only option supported. As withssl.endpoint.identification.algorithm
, this is typically used only with older servers; you can click the trash can icon to delete it if you don’t need it.
Check the Confluent site to see which parameters you need and the values you should use. Here’s a working example:
Figure 14 - Confluent Cloud parameters
Adding other configuration options
The ADD OPTION button lets you add other configuration parameters. You can use a predefined option or create a custom options:
Figure 15 - The ADD OPTION menu
Predefined options
There are seven predefined options:
security.protocol
:plaintext
,ssl
,sasl_plaintext
, orsasl_ssl
.sasl.mechanism
:GSSAPI
,PLAIN
,SCRAM-SHA-256
,SCRAM-SHA-512
, orOAUTHBEARER
.sasl.username
andsasl.password
ssl.ca.location
:probe
is the only option supportedenable.ssl.certificate.verification
:true
orfalse
ssl.endpoint.identification.algorithm
:https
is the only option supporteddebug
:all
is the only option supported
Custom options
A custom option simply gives you entry fields for a name and a value, letting you define any parameters your Kafka server needs. As an example, a connection to a Kafka topic hosted on Aiven cloud looks like this:
Figure 16 - Custom parameters to connect to Aiven Cloud
Once your parameters are set, click the CHECK button to make sure the connection to your Kafka server is configured correctly.
Working with Kafka data in ClickHouse
Now that we’ve defined our connection to a Kafka server, we need to set up our ClickHouse infrastructure to get data from Kafka and put it into ClickHouse. We’ll go through an example with a Kafka server with a topic named retail_data
. The topic contains JSON documents that look like this:
{
"transaction_id": "T43379",
"customer_id": "C004",
"timestamp": "2024-09-12T16:16:38.935Z",
"items": [
{
"item_id": "I005",
"quantity": 3,
"price": 100
},
{
"item_id": "I001",
"quantity": 1,
"price": 40
}
],
"total_amount": 340,
"store_id": "S001"
}
Each document represents a sale of some number of items to a particular customer in a particular store. Because there can be any number of items in an order, we’ll store item data in a separate table, with the transaction_id
acting as a foreign key if we need to do a join on the tables.
The architecture looks like this:
Figure 17 - Architecture to consume Kafka data in ClickHouse
We’ll set up three types of things to consume the topic’s data in the sales
database:
- A table with engine type
Kafka
that receives data from the Kafka server (kafka_sales_data
) - Materialized Views to take data from the
kafka_sales_data
table and store it in twoReplicatedMergeTree
tables (kafka_to_sales_transactions
andkafka_to_sales_items
). - Tables with engine type
ReplicatedMergeTree
to hold the data (sales_transactions
andsales_items
)
Creating table with a Kafka engine
First we’ll create a table with a Kafka engine. This will use the Kafka configuration we created earlier to consume messages from the topic on the Kafka server:
CREATE TABLE sales.kafka_sales_data
(
`transaction_id` String,
`customer_id` String,
`timestamp` String,
`items` Array(Tuple(item_id String, quantity UInt32, price Float64)),
`total_amount` Float64,
`store_id` String
)
ENGINE = Kafka(retail_data)
SETTINGS kafka_format = 'JSONEachRow', kafka_num_consumers = 1
The fields in the database map to the fields in the JSON document. Because there can be more than one item in each document, we use Array(Tuple(...))
to retrieve the items from each document. We’re using our Kafka configuration retail_data
, and we’re using the kafka_format
parameter to define the format of the data we’ll get from Kafka. See the Kafka engine documentation for a complete list of settings for the Kafka engine.
Be aware that if your configuration name contains characters like a hyphen (kafka-37
), you need to put the name of the configuration in double quotes:
ENGINE = Kafka("kafka-37")
Using kafka-37
without quotes returns the error DB::Exception: Bad cast from type DB::ASTFunction to DB::ASTLiteral
as ClickHouse tries to process the configuration name as the mathematical expression kafka - 37
.
Creating Materialized Views
Now we need Materialized Views that take data as it is received by the Kafka engine and store it in the appropriate tables. Here’s how we populate the transactions table:
CREATE MATERIALIZED VIEW sales.kafka_to_sales_transactions
TO sales.sales_transactions
(
`transaction_id` String,
`customer_id` String,
`timestamp` DateTime,
`total_amount` Float64,
`store_id` String
)
AS SELECT
transaction_id,
customer_id,
parseDateTimeBestEffort(timestamp) AS timestamp,
total_amount,
store_id
FROM sales.kafka_sales_data
And here’s the Materialized View to populate the table of items:
CREATE MATERIALIZED VIEW sales.kafka_to_sales_items
TO sales.sales_items
(
`transaction_id` String,
`item` Tuple(item_id String, quantity UInt32, price Float64),
`item_id` String,
`quantity` UInt32,
`price` Float64,
`store_id` String
)
AS SELECT
transaction_id,
arrayJoin(items) AS item,
item.1 AS item_id,
item.2 AS quantity,
item.3 AS price,
store_id
FROM sales.kafka_sales_data
Creating tables to hold data from Kafka
We use two tables, one to hold the basic information about a sale, and another to hold all the item details for every sale. Here’s the schema for the sales data:
CREATE TABLE sales.sales_transactions
(
`transaction_id` String,
`customer_id` String,
`timestamp` DateTime,
`total_amount` Float64,
`store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_transactions', '{replica}')
ORDER BY (store_id, timestamp, transaction_id)
SETTINGS index_granularity = 8192
The schema that stores the details of each item is similarly straightforward:
CREATE TABLE sales.sales_items
(
`transaction_id` String,
`item_id` String,
`quantity` UInt32,
`price` Float64,
`store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_items', '{replica}')
ORDER BY (transaction_id, item_id)
SETTINGS index_granularity = 8192
Sample data
As sales are reported on the Kafka topic, our tables are populated. We’ll look at some sample data from the Cluster Explorer. First the sales.sales_transactions
table:
┌─transaction_id─┬─customer_id─┬───────────timestamp─┬─total_amount─┬─store_id─┐
1. │ T79867 │ C004 │ 2024-09-12 16:16:07 │ 690 │ S001 │
2. │ T35561 │ C003 │ 2024-09-12 16:16:18 │ 120 │ S001 │
3. │ T90469 │ C002 │ 2024-09-12 16:16:22 │ 80 │ S001 │
4. │ T21884 │ C002 │ 2024-09-12 16:16:25 │ 430 │ S001 │
5. │ T78661 │ C002 │ 2024-09-12 16:16:32 │ 200 │ S001 │
└────────────────┴─────────────┴─────────────────────┴──────────────┴──────────┘
And now the sales.sales_items
table:
┌─transaction_id─┬─item_id─┬─quantity─┬─price─┬─store_id─┐
1. │ T10476 │ I006 │ 2 │ 10 │ S001 │
2. │ T10476 │ I008 │ 1 │ 10 │ S001 │
3. │ T10476 │ I010 │ 2 │ 20 │ S001 │
4. │ T11741 │ I010 │ 1 │ 100 │ S003 │
5. │ T12091 │ I004 │ 1 │ 10 │ S004 │
└────────────────┴─────────┴──────────┴───────┴──────────┘
At this point we’ve set up a connection to a topic on a Kafka server and created the tables and materialized views we need to store that data in ClickHouse. Now we can write queries to analyze sales data in real time, view live sales data in a tool like Grafana, or any number of other useful things.