Connecting ClickHouse® to Apache Kafka®

How to connect a ClickHouse® cluster to a Kafka server

Apache Kafka is an open-source distributed event streaming platform used by millions around the world. In this section we’ll cover how to connect a ClickHouse® cluster to a Kafka server. With the connection configured, you can subscribe to topics on the Kafka server, store messages from the Kafka server in a ClickHouse database, then use the power of ClickHouse for real-time analytics against those messages.

ClickHouse has a Kafka table engine that lets you tie Kafka and ClickHouse together. As you would expect, the Altinity Cloud Manager (ACM) makes it extremely easy to configure the Kafka table engine.

Setting up your Kafka connection

To set up the connection between your Kafka server and your ClickHouse cluster, go to the cluster view and click the Kafka Connections menu item on the CONFIGURE menu:

Kafka Connection Setup menu

Figure 1 - The Kafka Connection Setup menu item

This takes you to the Kafka Connections dialog. This dialog lets you configure any number of Kafka configuration files, each of which contains any number of configured connections to Kafka servers.

Initial Kafka Connections dialog

Figure 2 - The initial Kafka Connections dialog

In Figure 2, there are no Kafka settings files. Click the button to create one. That takes you to this dialog:

A new Kafka settings file

Figure 3 - The New Kafka Settings File dialog

The fields in Figure 3 create a new Kafka settings file named kafka_settings.xml in the config.d directory. A settings file contains configuration information for some number of Kafka connections. Here we’re creating a new configuration named localhost_testing that connects to the topic named retail_data on a Kafka server at 2.tcp.ngrok.io:14624. Typically you would also define a number of connection parameters for this connection, but for this example we’re connecting to a Kafka server with no authentication required. (We obviously don’t recommend that; we’ll get to more realistic examples in the section Configuring your Kafka connection below.)

Click CHECK to test the connection. If the ACM successfully connects to the Kafka server and the topic you specified, you’ll see a dialog like this:

A successful connection to a Kafka server

Figure 4 - Configuration information for a successful Kafka connection

Click the button to save these settings into the Kafka settings file. You’ll see your new settings file and its one configuration:

A Kafka settings file with one configuration

Figure 5 - The new Kafka settings file with a single configuration

From this dialog you can click the button to edit a given configuration or click the button to delete one. You’ll be asked to confirm your choice if you ask to delete a configuration. And as you would expect, the button lets you create a new configuration in the current settings file and the button lets you create a new settings file altogether.

Be aware that deleting the last configuration in a settings file will also delete the settings file altogether. You’ll be asked to confirm that choice:

Deleting the last configuration

Figure 6 - Deleting the last configuration in a Kafka settings file

Some complications:

First of all, if there’s any problem with the configuration parameters you entered, you’ll see an error message:

A configuration error

Figure 7 - A configuration error

Obviously you’ll need to fix the error before you continue.

Secondly, the value in the Configuration name field in the dialog becomes the name of an XML element in the Kafka settings file. In Figure 3, the configuration is named localhost_testing, so the configuration settings will be stored in the XML element <localhost_testing> as shown in Figure 4. That, of course, means the configuration name must be a valid XML element name. If not (a configuration named 42, for example), you’ll get an error message:

Bad configuration name

Figure 8 - The error message for a connection name that can’t be used as an XML element name

We won’t go into the details here, but an XML element name should start with a letter or an underscore. We also recommend that you use underscores instead of hyphens; that will make things simpler when you use this configuration to create a table with the Kafka table engine. (In other words, use localhost_testing instead of localhost-testing.)

A Kafka settings file can contain multiple connections to multiple Kafka servers and/or topics:

Multiple connections in a single Kafka settings file

Figure 9 - A Kafka settings file with multiple connections

Click DONE to save the file and exit the dialog.

Once the settings file is created, it appears in the list of settings in the Server Settings list:

Kafka settings file

Figure 10 - The Kafka settings file in the list of server settings

Although you can edit the XML file directly, we don’t recommend it. Any changes you make in the Kafka Connections dialog will overwrite any changes you make directly. You’ll get a confirmation message if you try to edit the file directly:

Warning message for editing an XML file directly

Figure 11 - Warning message against editing XML directly

If you’re sure this is what you want to do, see the section Configuring Settings for all the details on modifying server settings.

Those are the basics of creating a Kafka connection, but you’ll almost certainly need to add configuration parameters to the connection as well. Read on….

Configuring your Kafka connection

Depending on how your Kafka server is set up, there are a number of other connection parameters you need to set. There are preset groups of options for common Kafka providers, as well as custom options that let you define the value for any parameter you need. We’ll go through those now.

In this section we’re configuring the details of ClickHouse’s connection to Kafka. That includes details such as the URL of the Kafka server, the name of the topic, and any authentication parameters you need. Those parameters can go in your Kafka configuration file. There are other parameters that configure a ClickHouse table that uses the Kafka engine; those are not part of a Kafka configuration file. (More on the Kafka table engine in a minute.)

A word about certificates

If the connection to your Kafka server requires certificates, you need to go through a couple of steps:

  1. Create a new file in the config.d directory. To do that, create a new setting with the name of your certificate, then paste the value of the certificate in the text box. Here’s an example for the file service.cert:

    Creating a certificate file
    Figure 12 - Creating a certificate file in the config.d directory

    See the section Configuring Settings for all the details on modifying server settings.
  2. Once you’ve created the settings for all the certificates you need, you can specify the locations of those certificates. The certificates are in the directory /etc/clickhouse-server/config.d. For service.cert, the location is /etc/clickhouse-server/config.d/service.cert.

Preset option groups

There are several groups of preset options available under the ADD PRESET button:

The Kafka Preset options menu

Figure 13 - The Kafka Preset options menu

Each set of options is targeted for a specific platform or Kafka deployment type, but check with your Kafka provider to see which parameters you need. In addition to the parameters added for you automatically, you can add your own custom parameters if needed.

Amazon MSK parameters

If your Kafka server is hosted by Amazon Managed Streaming for Apache Kafka (MSK), three parameters are added to the dialog:

  • security.protocol: Available options are plaintext, ssl, sasl_plaintext, or sasl_ssl.
  • sasl.username and sasl.password - Your SASL username and password

The parameters will look like this:

Amazon MSK parameters

Figure 14 - Amazon MSK parameters

Values with a down arrow icon let you select from a list of values; other value fields let you type whatever you need.

You also need to create a VPC connection to your MSK service. See the page Amazon VPC endpoint for Amazon MSK for complete details on creating the VPC connection.

SASL/SCRAM parameters

  • security.protocol: Available options are plaintext, ssl, sasl_plaintext, or sasl_ssl.
  • sasl.mechanism: Available options are GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, or OAUTHBEARER.
  • sasl.username and sasl.password - Your SASL username and password

The parameters will look like this:

SASL/SCRAM parameters

Figure 15 - SASL/SCRAM parameters

Inline Kafka certificates parameters

  • ssl.key.pem: The path to your key.pem file.
  • ssl.certificate.pem: The path to your certificate.pem file.

See the section A word about certificates above for details on working with certificates.

The parameters will look like this:

Inline Kafka certificates parameters

Figure 16 - Inline Kafka certificates parameters

Kerberos parameters

  • security.protocol: Available options are plaintext, ssl, sasl_plaintext, or sasl_ssl.
  • sasl.kerberos.keytab: The path to your .keytab file. See the section A word about certificates above for details on working with certificates.
  • sasl.kerberos.principal: The name of your Kerberos principal.

The parameters will look like this:

Kerberos parameters

Figure 17 - Kerberos parameters

Confluent Cloud parameters

The Confluent documentation site has the details of all the Confluent parameters. The ones provided for you automatically are:

  • security.protocol: Available options are plaintext, ssl, sasl_plaintext, or sasl_ssl.
  • sasl.mechanism: Available options are GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, or OAUTHBEARER.
  • sasl.username and sasl.password: Your SASL username and password
  • auto.offset.reset: Confluent supports three predefined values: smallest, latest, and none, although you may not need this parameter at all. Any other value throws an exception. See the auto.offset.reset documentation for all the details.
  • ssl.endpoint.identification.algorithm: https is the only option supported. This is typically used only with older servers; you can click the trash can icon to delete it if you don’t need it.
  • ssl.ca.location: The location of your SSL certificate file. See the discussion of certificates above for more information on working with certificates. As with ssl.endpoint.identification.algorithm, this is typically used only with older servers; you can click the trash can icon to delete it if you don’t need it.

Check the Confluent site to see which parameters you need and the values you should use. Here’s a working example:

Confluent Cloud parameters

Figure 18 - Confluent Cloud parameters

Adding other configuration options

The ADD OPTION button lets you add other configuration parameters. You can use a predefined option or create a custom options:

The Kafka Add Option menu

Figure 19 - The ADD OPTION menu

Predefined options

There are seven predefined options:

  • security.protocol: plaintext, ssl, sasl_plaintext, or sasl_ssl.
  • sasl.mechanism: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, or OAUTHBEARER.
  • sasl.username and sasl.password
  • ssl.ca.location: the location of your SSL certificate
  • enable.ssl.certificate.verification: true or false
  • ssl.endpoint.identification.algorithm: https is the only option supported
  • debug: all is the only option supported

Custom options

A custom option simply gives you entry fields for a name and a value, letting you define any parameters your Kafka server needs. As an example, a connection to a Kafka topic hosted on Aiven cloud looks like this:

Aiven parameters

Figure 20 - Custom parameters to connect to Aiven Cloud

Once your parameters are set, click the CHECK button to make sure the connection to your Kafka server is configured correctly.

Working with Kafka data in ClickHouse

Now that we’ve defined our connection to a Kafka server, we need to set up our ClickHouse infrastructure to get data from Kafka and put it into ClickHouse. We’ll go through an example with a Kafka server with a topic named retail_data. (For a complete ClickHouse and Kafka application, see the article Connecting ClickHouse® and Apache Kafka® in the Altinity blog.)

The Kafka topic contains JSON documents that look like this:

{
  "transaction_id": "T43379",
  "customer_id": "C004",
  "timestamp": "2024-09-12T16:16:38.935Z",
  "items": [
    {
      "item_id": "I005",
      "quantity": 3,
      "price": 100
    },
    {
      "item_id": "I001",
      "quantity": 1,
      "price": 40
    }
  ],
  "total_amount": 340,
  "store_id": "S001"
}

Each document represents a sale of some number of items to a particular customer in a particular store. Because there can be any number of items in an order, we’ll store item data in a separate table, with the transaction_id acting as a foreign key if we need to do a join on the tables.

The architecture looks like this:

Custom parameters

Figure 21 - Architecture to consume Kafka data in ClickHouse

We’ll set up three types of things to consume the topic’s data in the sales database:

  • A table with engine type Kafka that receives data from the Kafka server (kafka_sales_data)
  • Tables with engine type ReplicatedMergeTree to hold the data (sales_transactions and sales_items)
  • Materialized Views (kafka_to_sales_transactions and kafka_to_sales_items) to take data from the kafka_sales_data table and store it in our two ReplicatedMergeTree tables.

Creating table with a Kafka engine

First we’ll create a table with a Kafka engine. This will use the Kafka configuration we created earlier to consume messages from the topic on the Kafka server:

CREATE TABLE sales.kafka_sales_data
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `items` Array(Tuple(item_id String, quantity UInt32, price Float64)),
    `total_amount` Float64,
    `store_id` String
)
ENGINE = Kafka(aiven)
SETTINGS kafka_format = 'JSONEachRow', date_time_input_format = 'best_effort'

The fields in the database map to the fields in the JSON document. Because there can be more than one item in each document, we use Array(Tuple(...)) to retrieve the items from each document.

We’re using the Kafka configuration aiven from the settings file in Figure 9, and we’re using the kafka_format parameter JSONEachRow to define the format of the data we’ll get from Kafka. The setting date_time_input_format = 'best_effort' tells the Kafka engine to try to parse the string value timestamp into a DateTime value. We don’t know what applications are writing data to the Kafka topic, and we don’t know what date format those applications might use, so this is a safe way to create DateTime values. Doing that conversion on the data as it comes in from Kafka ensures that we’ll have a valid DateTime value whenever we use that data.

See the Kafka engine documentation for a complete list of settings for the Kafka engine.

Be aware that if your configuration name contains characters like a hyphen (kafka-37), you need to put the name of the configuration in double quotes:

ENGINE = Kafka("kafka-37")

Using kafka-37 without quotes returns the error DB::Exception: Bad cast from type DB::ASTFunction to DB::ASTLiteral as ClickHouse tries to process the configuration name as the mathematical expression kafka - 37.

Creating tables to hold data from Kafka

We use two tables, one to hold the basic information about a sale, and another to hold all the item details for every sale. Here’s the schema for the sales data:

CREATE TABLE sales.sales_transactions
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `total_amount` Float64,
    `store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_transactions', '{replica}')
ORDER BY (store_id, timestamp, transaction_id)
SETTINGS index_granularity = 8192

The schema that stores the details of each item is similarly straightforward:

CREATE TABLE sales.sales_items
(
    `transaction_id` String,
    `item_id` String,
    `quantity` UInt32,
    `price` Float64,
    `store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_items', '{replica}')
ORDER BY (transaction_id, item_id)
SETTINGS index_granularity = 8192

Creating Materialized Views

Finally, we need Materialized Views that take data as it is received by the Kafka engine and store it in the appropriate tables. Here’s how we populate the transactions table:

CREATE MATERIALIZED VIEW sales.kafka_to_sales_transactions 
  TO sales.sales_transactions
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `total_amount` Float64,
    `store_id` String
)
AS SELECT
    transaction_id,
    customer_id,
    parseDateTimeBestEffort(timestamp) AS timestamp,
    total_amount,
    store_id
FROM sales.kafka_sales_data

And here’s the Materialized View to populate the table of items:

CREATE MATERIALIZED VIEW sales.kafka_to_sales_items 
  TO sales.sales_items
(
    `transaction_id` String,
    `item` Tuple(item_id String, quantity UInt32, price Float64),
    `item_id` String,
    `quantity` UInt32,
    `price` Float64,
    `store_id` String
)
AS SELECT
    transaction_id,
    arrayJoin(items) AS item,
    item.1 AS item_id,
    item.2 AS quantity,
    item.3 AS price,
    store_id
FROM sales.kafka_sales_data

Sample data

As sales are reported on the Kafka topic, our tables are populated. We’ll look at some sample data from the Cluster Explorer. First the sales.sales_transactions table:

   ┌─transaction_id─┬─customer_id─┬───────────timestamp─┬─total_amount─┬─store_id─┐
1. │ T79867         │ C004        │ 2024-09-12 16:16:07 │          690 │ S001     │
2. │ T35561         │ C003        │ 2024-09-12 16:16:18 │          120 │ S001     │
3. │ T90469         │ C002        │ 2024-09-12 16:16:22 │           80 │ S001     │
4. │ T21884         │ C002        │ 2024-09-12 16:16:25 │          430 │ S001     │
5. │ T78661         │ C002        │ 2024-09-12 16:16:32 │          200 │ S001     │
   └────────────────┴─────────────┴─────────────────────┴──────────────┴──────────┘

And now the sales.sales_items table:

   ┌─transaction_id─┬─item_id─┬─quantity─┬─price─┬─store_id─┐
1. │ T10476         │ I006    │        2 │    10 │ S001     │
2. │ T10476         │ I008    │        1 │    10 │ S001     │
3. │ T10476         │ I010    │        2 │    20 │ S001     │
4. │ T11741         │ I010    │        1 │   100 │ S003     │
5. │ T12091         │ I004    │        1 │    10 │ S004     │
   └────────────────┴─────────┴──────────┴───────┴──────────┘

At this point we’ve set up a connection to a topic on a Kafka server and created the tables and materialized views we need to store that data in ClickHouse. Now we can write queries to analyze sales data in real time, view live sales data in a tool like Grafana, or any number of other useful things.