Working with Replication

Using the ReplicatedMergeTree engine

At this point our ClickHouse® cluster has replication enabled. Whether you used Zookeeper or ClickHouse Keeper, we can now create ReplicatedMergeTree tables that automatically keep our replicas in sync.

First of all, let’s look at the infrastructure beneath our ClickHouse cluster:

kubectl get all -n quick

We’ve got two pods, three services, and two statefulsets:

NAME                                READY   STATUS    RESTARTS        AGE
pod/chi-cluster01-cluster01-0-0-0   1/1     Running   1 (8m23s ago)   19m
pod/chi-cluster01-cluster01-0-1-0   1/1     Running   0               7m39s

NAME                                  TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
service/chi-cluster01-cluster01-0-0   ClusterIP   None         <none>        9000/TCP,8123/TCP,9009/TCP   35m
service/chi-cluster01-cluster01-0-1   ClusterIP   None         <none>        9000/TCP,8123/TCP,9009/TCP   7m23s
service/clickhouse-cluster01          ClusterIP   None         <none>        8123/TCP,9000/TCP            35m

NAME                                           READY   AGE
statefulset.apps/chi-cluster01-cluster01-0-0   1/1     30m
statefulset.apps/chi-cluster01-cluster01-0-1   1/1     7m44s

Current database structure

The analytics database currently has two tables on each of our four hosts. We can take a look at any of the pods:

kubectl exec -it chi-cluster01-cluster01-0-0-0 -n quick -- clickhouse-client
SELECT
    name,
    engine
FROM system.tables
WHERE database = 'analytics'

We’ve got our MergeTree table, as we’d expect:

   ┌─name───────┬─engine────┐
1. │ page_views │ MergeTree │
   └────────────┴───────────┘

1 row in set. Elapsed: 0.003 sec.

👉 Type exit to end the clickhouse-client session.

We set up page_views to hold our data. Let’s run SELECT * FROM analytics.page_views against all of the pods:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "SELECT * from analytics.page_views;"
done
Running query on chi-cluster01-cluster01-0-0-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	USA
2025-01-01 12:05:00	102	/products	facebook.com	desktop	Canada
2025-01-01 12:10:00	103	/cart	twitter.com	tablet	UK
2025-01-02 14:00:00	101	/checkout	google.com	mobile	USA
2025-01-06 08:20:00	110	/blog	twitter.com	desktop	Australia
Running query on chi-cluster01-cluster01-0-1-0...

As you can see, host chi-cluster01-cluster01-0-0-0 has data, but its replica, chi-cluster01-cluster01-0-1-0, doesn’t have any.

(By the way, we’ll use this for...do technique to execute a single command on multiple nodes. If we need to do something more in-depth, we’ll connect to a node and use clickhouse-connect interactively.)

What we need is a new table with a ReplicatedMergeTree engine. When we insert data into that table, the new data is written to the replicas. (It replicates updates and deletes as well.) Having multiple synchronized copies of data is one of the basic principles of highly available systems, and the ReplicatedMergeTree gives us that.

So let’s create the table. We’ll do this using ON CLUSTER to put a new table with that engine on both hosts:

kubectl exec -it chi-cluster01-cluster01-0-0-0 -n quick -- clickhouse-client
CREATE TABLE analytics.page_views_replicated ON CLUSTER cluster01
(
    event_time DateTime,
    user_id UInt32,
    page_url String,
    referrer_url String,
    device String,
    country String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/analytics/page_views', '{replica}')
ORDER BY event_time;

That creates the table on both hosts:

   ┌─host────────────────────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
1. │ chi-cluster01-cluster01-0-1 │ 9000 │      0 │       │                   1 │                0 │
2. │ chi-cluster01-cluster01-0-0 │ 9000 │      0 │       │                   0 │                0 │
   └─────────────────────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

2 rows in set. Elapsed: 0.067 sec.

👉 Type exit to end the clickhouse-client session.

So we’ve got our ReplicatedMergeTree tables; let’s query them.

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "SELECT * from analytics.page_views_replicated;"
done

Our results are underwhelming:

Running query on chi-cluster01-cluster01-0-0-0...
Running query on chi-cluster01-cluster01-0-1-0...

Our ReplicatedMergeTree table has the same schema as our MergeTree table, but it isn’t initialized with any data. We’ll have to do that ourselves. It’s easy enough; we’ll just insert everything from analytics.page_views into analytics.page_views_replicated:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "INSERT INTO analytics.page_views_replicated SELECT * FROM analytics.page_views;"
done

Now we should get the same results when we query each shard and its replica:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "SELECT * from analytics.page_views_replicated;"
done
Running query on chi-cluster01-cluster01-0-0-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	USA
2025-01-01 12:05:00	102	/products	facebook.com	desktop	Canada
2025-01-01 12:10:00	103	/cart	twitter.com	tablet	UK
2025-01-02 14:00:00	101	/checkout	google.com	mobile	USA
2025-01-06 08:20:00	110	/blog	twitter.com	desktop	Australia
Running query on chi-cluster01-cluster01-0-1-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	USA
2025-01-01 12:05:00	102	/products	facebook.com	desktop	Canada
2025-01-01 12:10:00	103	/cart	twitter.com	tablet	UK
2025-01-02 14:00:00	101	/checkout	google.com	mobile	USA
2025-01-06 08:20:00	110	/blog	twitter.com	desktop	Australia

All the replicas are in sync. Now if we insert data into a host, it should be copied to its replica as well. Here goes:

kubectl exec -it chi-cluster01-cluster01-0-0-0 -n quick \
  -- clickhouse-client -q "INSERT INTO analytics.page_views_replicated (event_time, user_id, page_url, referrer_url, device, country)  VALUES('2025-01-01 12:00:00', 106, '/home', 'google.com', 'mobile', 'Nigeria')"

That’s the first record we’ve inserted with a user from Nigeria. Let’s query all our hosts to see where that data is:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "SELECT * FROM analytics.page_views_replicated where country = 'Nigeria';"
done
Running query on chi-cluster01-cluster01-0-0-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	Nigeria
Running query on chi-cluster01-cluster01-0-1-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	Nigeria

We added data to chi-cluster01-cluster01-0-0-0 but nowhere else. As you can see from the output, however, that value was also copied to its replica, chi-cluster01-cluster01-0-1-0. That’s what we want from replication: whenever we make a change to our data, it’s automatically synchronized with all of our replicas.

For housekeeping, now that we’ve got all of our data into the analytics.page_views_replicated table, we can drop the original analytics.page_views table:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "DROP TABLE analytics.page_views;"
done

The last thing we’ll do with replication is create a new Distributed table that works with our replicated data. Whenever we do a QUERY, UPDATE, or DELETE, we’ll do that against the Distributed table. Connect to one of the nodes in our cluster:

kubectl exec -it chi-cluster01-cluster01-0-0-0 -n quick -- clickhouse-client
CREATE TABLE analytics.page_views_distributed ON CLUSTER cluster01 AS analytics.page_views_replicated
    ENGINE = Distributed(cluster01, analytics, page_views_replicated, rand())
   ┌─host────────────────────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
1.  chi-cluster01-cluster01-0-0  9000       0                           1                 0 
2.  chi-cluster01-cluster01-0-1  9000       0                           0                 0 
   └─────────────────────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

2 rows in set. Elapsed: 0.066 sec.

👉 Type exit to end the clickhouse-client session.

Now querying our new Distributed table should show the same results from every host:

for pod in 0-0-0 0-1-0; do
  echo "Running query on chi-cluster01-cluster01-$pod..."
  kubectl exec -it chi-cluster01-cluster01-$pod -n quick \
  -- clickhouse-client -q "SELECT * FROM analytics.page_views_distributed where country = 'Nigeria';"
done

Sure enough, we get the same results everywhere:

Running query on chi-cluster01-cluster01-0-0-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	Nigeria
Running query on chi-cluster01-cluster01-0-1-0...
2025-01-01 12:00:00	101	/home	google.com	mobile	Nigeria

Where we are

That’s as far as we’ll go in this tutorial. Along the way, we:

  1. Installed the Altinity Kubernetes Operator for ClickHouse
  2. Created a ClickHouse cluster with a single shard and a single node
  3. Added persistent storage to the cluster
  4. Enabled replication with either Zookeeper or ClickHouse Keeper
  5. Created a ReplicatedMergeTree database to use replication.

Although this ClickHouse cluster isn’t quite ready for managing massive data sets, all the basics are in place.

What’s next

Feel free to go ahead to the Adding a second shard to your cluster page, but be aware that you should only use shards if you have more than 5 TB of data.

What you add to your running cluster is up to you, but there are lots of things the operator can do. Here are just a few, but you can see the table of contents for the docs in the operator's repo: