How to configure Clickhouse in sharding and replication cluster

Introduction

Clickhouse is a column store database developed by Yandex used for data analytics. In this article I will talk about setting up a distributed fault tolerant Clickhouse cluster.

If you want to install/configure Clickhouse in single node mode, you should read this article.

Clickhouse supports distribution and replication of data, but the documentation around it is confusing and setting up a cluster is not straight forward. Below I will explain in detail the configuration required for setting up a Clickhouse cluster.

Data distribution refers to splitting your dataset into multiple shards which are then stored on different servers. I won’t be talking about data distribution strategies, but I will be focusing on how to access the different shards across the cluster nodes.

Data replication refers to keeping a copy of the data on one or more nodes for ensuring availability in case of a node crash or a network partition. It can also improve performance by allowing multiple servers to process parallel queries that use the same data.

This article assumes that you already know how to configure and run Clickhouse on a single node.

Data distribution

In order to create a distributed table we need to do two things:

  1. Configure the Clickhouse nodes to make them aware of all the available nodes in the cluster.
  2. Create a new table using the Distributed engine.

In the config.xml file there is a configuration called remote_servers. There you can specify a list of clusters containing your shards. Each shard is then defined as a list of replicas, containing the server addresses. The replica definition has the following parameters:

  • default_database: The database to be used by the Distributedtable engine, if no database is specified.
  • host: The host of the server where the replica resides.
  • port: The port of the server where the replica resides.

Inside a shard, one can define as many replicas as they want. The data will be accessed on the first available replica.

The below example explains how to configure a cluster with 2 nodes, having a shard on each node:

<remote_servers>
<example_cluster>
<shard>
<replica>
<default_database>shard</default_database>
<host>hostname1</host>
<port>9000</port>
</replica>
</shard>

<shard>
<replica>
<default_database>shard</default_database>
<host>hostname2</host>
<port>9000</port>
</replica>
</shard>
</example_cluster>
</remote_servers>

We create shard tables on each of the two configured nodes. Then, we create the main table, which we’ll use for global access.

On each server:

Create the table holding the data (the shard table):

CREATE DATABASE shard;
CREATE TABLE shard.test
(
 id Int64,
 event_time DateTime
)
Engine=MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;

Create the distributed table. The distributed table is actually a view, so it needs to have the same schema definition as the shards. Once the view is created, the data is queried on each shard and the results are aggregated on the node where the query was initially called.

CREATE TABLE default.test
(
id Int64,
event_time DateTime
)
ENGINE = Distributed(‘example_cluster’, ‘’, test, rand());

The second parameter of the Distributed engine is the schema name, but if we leave it empty, the default database which we defined in the configuration will be used.

Data replication

Now that we know how to read data from multiple nodes, we want to make sure that our data is also replicated in order to tolerate node failures.

To achieve this we need to:

  1. Point our Clickhouse servers to a Zookeeper instance, which is used by the replication engine.
  2. Use the ReplicatedMergeTree engine when we create the tables.

In the config.xml file we need to set up the Zookeeper connection.

<zookeeper>
<node index="1">
<host>zookeeper-host-1</host>
<port>2181</port>
</node>

<node index="2">
<host>zookeeper-host-2</host>
<port>2181</port>
</node>

<node index="3">
<host>zookeeper-host-3</host>
<port>2181</port>
</node>
</zookeeper>

Make sure that Zookeeper is up and running when Clickhouse starts, otherwise the database won’t start.

Data replication is achieved using the ReplicatedMergeTree engine. The parameters of the ReplicatedMergeTree are the zookeeper_path and the replica_name. Tables with the same zookeeper_path will be kept in sync.

Example:

On server 1:

CREATE TABLE test
(
id Int64,
partition Int16
) ENGINE = ReplicatedMergeTree(‘/clickhouse/tables/replicated/test’, ‘replica_1’)
ORDER BY id
PARTITION BY partition;

On server 2:

CREATE TABLE test
(
id Int64,
partition Int16
) ENGINE = ReplicatedMergeTree(‘/clickhouse/tables/replicated/test’, ‘replica_2’)
ORDER BY id
PARTITION BY partition;

The Zookeeper path can be anything, as long as it matches. For simplicity, you should create a standardized schema. For e.g. /clickhouse/tables/<main_replica_host>/<table_name>

Distributed and replicated configuration

In the previous sections I talked about how to either access data on multiple nodes or how to replicate the data. In this section, I will show you how to combine the two to obtain a distributed and replicated cluster.

This can be achieved following the below model:

  1. Each shard configuration will contain two replicas, one having the default database shard and one having the default database replica
  2. Each server has 3 schema: default, shard and replica
  3. In the default schema we create the tables with the Distributed table engine
  4. In the shard schema we create a ReplicatedMergeTree table keeping the main data
  5. In the replica schema we create a ReplicatedMergeTree table keeping the copy of the data for the other server (i.e. server1 replica will be stored on server2)

Servers configurations

First step is to configure Zookeeper as mentioned in the Data Replication section.

Second, you have to configure the cluster such that the shard is hosted on one server and the replica on the other.

<remote_servers>
<example_cluster>
<shard>
<replica>
<default_database>shard</default_database>
<host>hostname1</host>
<port>9000</port>
</replica>
<replica>
<default_database>replica</default_database>
<host>hostname2</host>
<port>9000</port>
</replica>
</shard>

<shard>
<replica>
<default_database>shard</default_database>
<host>hostname2</host>
<port>9000</port>
</replica>
<replica>
<default_database>replica</default_database>
<host>hostname1</host>
<port>9000</port>
</replica>
</shard>
</example_cluster>
</remote_servers>

Third, creating the database schema and the tables:

On server 1:

CREATE DATABASE shard;
CREATE DATABASE replica;


CREATE TABLE shard.test
(
id Int64,
event_time DateTime
)
Engine=ReplicatedMergeTree('/clickhouse/tables/shard1/test', 'replica_1')
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;


CREATE TABLE replica.test
(
id Int64,
event_time DateTime
)
Engine=ReplicatedMergeTree('/clickhouse/tables/shard2/test', 'replica_2')
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;


CREATE TABLE default.test
(
id Int64,
event_time DateTime
)
ENGINE = Distributed('example_cluster', '', test, rand());

On server 2:

CREATE DATABASE shard;
CREATE DATABASE replica;


CREATE TABLE shard.test
(
id Int64,
event_time DateTime
)
Engine=ReplicatedMergeTree('/clickhouse/tables/shard2/test', 'replica_1')
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;


CREATE TABLE replica.test
(
id Int64,
event_time DateTime
)
Engine=ReplicatedMergeTree('/clickhouse/tables/shard1/test', 'replica_2')
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;


CREATE TABLE default.test
(
id Int64,
event_time DateTime
)
ENGINE = Distributed('example_cluster', '', test, rand());

With the above configuration table test can be accessed on both servers on the default database for selecting data.

Data insertion can be done using the Distributed table and letting Clickhouse distribute your data on the shards. If you choose to do that you will need to configure the shard with the parameter <internal_replication>, as described in the docs. In case you are using the ReplicatedMergeTree, you would need to set the parameter to true in order to avoid data replication and let the engine replicate the data for you.

Another method, which will give you more control on the data distribution across the servers, is to insert data directly in the shard (e.g. INSERT INTO shard.test(id, event_time) VALUES (1, now())) of each server. This will put less stress on the node performing the insert and will also take advantage of the parallel ingestion of data, since multiple inserts can be run locally on each server.

In both cases, since the shards use the ReplicatedMergeTree, the data will be replicated into the replica schema. As described in the documentation, the replication is happening asynchronously, therefore replica and shard data might be out of sync until the replica catches up.

In this article I explained how a Clickhouse cluster can be set up with data distribution and replication. Please let me know if anything is unclear. Thank you for reading!

Leave a Reply

Your email address will not be published. Required fields are marked *