Working with Kafka® data in ClickHouse®

Putting things together

Now that we’ve defined our connection to a Kafka® server, we need to set up our ClickHouse® infrastructure to get data from Kafka and put it into ClickHouse. We’ll go through an example with a Kafka server with a topic named retail_data. (For a complete ClickHouse and Kafka application, see the article Connecting ClickHouse® and Apache Kafka® in the Altinity blog.)

The Kafka topic contains JSON documents that look like this:

{
  "transaction_id": "T43379",
  "customer_id": "C004",
  "timestamp": "2024-09-12T16:16:38.935Z",
  "items": [
    {
      "item_id": "I005",
      "quantity": 3,
      "price": 100
    },
    {
      "item_id": "I001",
      "quantity": 1,
      "price": 40
    }
  ],
  "total_amount": 340,
  "store_id": "S001"
}

Each document represents a sale of some number of items to a particular customer in a particular store. Because there can be any number of items in an order, we’ll store item data in a separate table, with the transaction_id acting as a foreign key if we need to do a join on the tables.

The architecture looks like this:

Custom parameters

Figure 1 - Architecture to consume Kafka data in ClickHouse

We’ll set up three types of things to consume the topic’s data in the sales database:

  • A table with engine type Kafka that receives data from the Kafka server (kafka_sales_data)
  • Tables with engine type ReplicatedMergeTree to hold the data (sales_transactions and sales_items)
  • Materialized Views (kafka_to_sales_transactions and kafka_to_sales_items) to take data from the kafka_sales_data table and store it in our two ReplicatedMergeTree tables.

Creating table with a Kafka engine

First we’ll create a table with a Kafka engine. This will use the Kafka configuration we created earlier to consume messages from the topic on the Kafka server:

CREATE TABLE sales.kafka_sales_data
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `items` Array(Tuple(item_id String, quantity UInt32, price Float64)),
    `total_amount` Float64,
    `store_id` String
)
ENGINE = Kafka(aiven)
SETTINGS kafka_format = 'JSONEachRow', date_time_input_format = 'best_effort'

The fields in the database map to the fields in the JSON document. Because there can be more than one item in each document, we use Array(Tuple(...)) to retrieve the items from each document.

We’re using the Kafka configuration aiven from the settings file in Figure 9, and we’re using the kafka_format parameter JSONEachRow to define the format of the data we’ll get from Kafka. The setting date_time_input_format = 'best_effort' tells the Kafka engine to try to parse the string value timestamp into a DateTime value. We don’t know what applications are writing data to the Kafka topic, and we don’t know what date format those applications might use, so this is a safe way to create DateTime values. Doing that conversion on the data as it comes in from Kafka ensures that we’ll have a valid DateTime value whenever we use that data.

See the Kafka engine documentation for a complete list of settings for the Kafka engine.

Be aware that if your configuration name contains characters like a hyphen (kafka-37), you need to put the name of the configuration in double quotes:

ENGINE = Kafka("kafka-37")

Using kafka-37 without quotes returns the error DB::Exception: Bad cast from type DB::ASTFunction to DB::ASTLiteral as ClickHouse tries to process the configuration name as the mathematical expression kafka - 37.

Creating tables to hold data from Kafka

We use two tables, one to hold the basic information about a sale, and another to hold all the item details for every sale. Here’s the schema for the sales data:

CREATE TABLE sales.sales_transactions
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `total_amount` Float64,
    `store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_transactions', '{replica}')
ORDER BY (store_id, timestamp, transaction_id)
SETTINGS index_granularity = 8192

The schema that stores the details of each item is similarly straightforward:

CREATE TABLE sales.sales_items
(
    `transaction_id` String,
    `item_id` String,
    `quantity` UInt32,
    `price` Float64,
    `store_id` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sales_items', '{replica}')
ORDER BY (transaction_id, item_id)
SETTINGS index_granularity = 8192

Creating Materialized Views

Finally, we need Materialized Views that take data as it is received by the Kafka engine and store it in the appropriate tables. Here’s how we populate the transactions table:

CREATE MATERIALIZED VIEW sales.kafka_to_sales_transactions 
  TO sales.sales_transactions
(
    `transaction_id` String,
    `customer_id` String,
    `timestamp` DateTime,
    `total_amount` Float64,
    `store_id` String
)
AS SELECT
    transaction_id,
    customer_id,
    parseDateTimeBestEffort(timestamp) AS timestamp,
    total_amount,
    store_id
FROM sales.kafka_sales_data

And here’s the Materialized View to populate the table of items:

CREATE MATERIALIZED VIEW sales.kafka_to_sales_items 
  TO sales.sales_items
(
    `transaction_id` String,
    `item` Tuple(item_id String, quantity UInt32, price Float64),
    `item_id` String,
    `quantity` UInt32,
    `price` Float64,
    `store_id` String
)
AS SELECT
    transaction_id,
    arrayJoin(items) AS item,
    item.1 AS item_id,
    item.2 AS quantity,
    item.3 AS price,
    store_id
FROM sales.kafka_sales_data

Sample data

As sales are reported on the Kafka topic, our tables are populated. We’ll look at some sample data from the Cluster Explorer. First the sales.sales_transactions table:

   ┌─transaction_id─┬─customer_id─┬───────────timestamp─┬─total_amount─┬─store_id─┐
1. │ T79867         │ C004        │ 2024-09-12 16:16:07 │          690 │ S001     │
2. │ T35561         │ C003        │ 2024-09-12 16:16:18 │          120 │ S001     │
3. │ T90469         │ C002        │ 2024-09-12 16:16:22 │           80 │ S001     │
4. │ T21884         │ C002        │ 2024-09-12 16:16:25 │          430 │ S001     │
5. │ T78661         │ C002        │ 2024-09-12 16:16:32 │          200 │ S001     │
   └────────────────┴─────────────┴─────────────────────┴──────────────┴──────────┘

And now the sales.sales_items table:

   ┌─transaction_id─┬─item_id─┬─quantity─┬─price─┬─store_id─┐
1. │ T10476         │ I006    │        2 │    10 │ S001     │
2. │ T10476         │ I008    │        1 │    10 │ S001     │
3. │ T10476         │ I010    │        2 │    20 │ S001     │
4. │ T11741         │ I010    │        1 │   100 │ S003     │
5. │ T12091         │ I004    │        1 │    10 │ S004     │
   └────────────────┴─────────┴──────────┴───────┴──────────┘

At this point we’ve set up a connection to a topic on a Kafka server and created the tables and materialized views we need to store that data in ClickHouse. Now we can write queries to analyze sales data in real time, view live sales data in a tool like Grafana, or any number of other useful things.