PostHog · Altinity OSA meetup

Stateless ClickHouse
for stream processing

How we turned ClickHouse itself into our Kafka → CH inserter — and deleted the service we were about to build.

James Greenhill  ·  no MergeTree required

Who's talking

Me, and the firehose

James Greenhill — Member of Token Burning Staff, PostHog.

PostHog = open-source product analytics, session replay, feature flags, data warehouse. All the analytical data lives in ClickHouse.

trillions
of events
01

The pipeline that stopped scaling

Kafka table engines, living on the data nodes.

The textbook pattern

Kafka → CH, the classic recipe

Kafka topic
events
Data node (MergeTree)
kafka_events → events_mv → sharded_events

Kafka table engine is a consumer. A materialized view fires on each inserted block and writes into the real MergeTree table. Elegant — pull-based, resilient to spikes, no external writer.

The problem

One box, two jobs, fighting

  • Ingestion and queries share the same nodes. A query storm starves Kafka consumption — and vice-versa.
  • No headroom to recover. If we ever took lag — paused ingestion for any reason — then at above-average peak we didn't have the throughput to catch back up. We'd just keep falling further behind.
  • Scaling consumers meant scaling storage nodes — the expensive, stateful, hard-to-move ones.
Why ingestion is expensive

The insert is the cheap part. Most of the CPU goes to denormalizing properties off every event — parsing the JSON and writing it into Map columns and dozens of materialized columns. On shared nodes, that work fights your queries for the same cores.

meme break This Is Fine meme: ingestion and queries on the same ClickHouse nodes

Our data nodes, every time a dashboard fired a heavy query while the Kafka consumers were trying to keep up.

The fork in the road

Plan A: build an inserter service

Two buttons meme: scale the storage tier vs build an inserter service

The hard part was never the consumer loop — it was the schema. An inserter has to denormalize events into our ever-changing dynamic event schema, which we'd then maintain in two surface areas: once in Go, once in ClickHouse SQL. Wrangling prop Maps and materialized columns in one place was already painful.

And the types would drift subtly between Go and ClickHouse — versus ClickHouse feeding native ClickHouse, one type system end to end.

How far Plan A got, in full (~/inserter):

module github.com/posthog/inserter
go 1.20
// ...line 3 was never written.   (Apr 2023)
galaxy brain Expanding brain meme: Kafka on data nodes, to inserter service, to stateless ClickHouse nodes, to a cluster is just a remote_servers list

The evolution of our ingestion takes — ending on the enlightenment.

02

The insight

We didn't need a new service. We needed a new role for ClickHouse.

Key realization #1

A "cluster" is just a logical list

In ClickHouse, a cluster is just a <remote_servers> entry — and one config can declare many. Nodes can live anywhere, run anything, and be added at will.

<remote_servers>
    <posthog>                       <!-- storage cluster: 10×3 on EC2, NVMe -->
        <shard><replica><host>data-1</host></replica> ... </shard> ...
    </posthog>
    <ingestion-events>              <!-- a SEPARATE cluster: stateless K8s pods -->
        <shard><replica><host>chi-ingestion-events-0</host></replica></shard> ...
    </ingestion-events>
</remote_servers>

EC2 storage and stateless K8s ingestion are separate clusters — just two entries in the same list. The pods are their own cluster; their Distributed tables target posthog to forward writes. Different hardware, bridged by the Distributed engine — never one mixed cluster.

Key realization #2

The inserter we wanted... is a ClickHouse node

Stateless CH node
Kafka table engine
+ materialized view
+ Distributed table
no MergeTree · no data

Consume from Kafka? Kafka table engine.

Transform + batch? Materialized view.

Route to the right shard and INSERT? Distributed engine.

Every feature we'd have built into the inserter, ClickHouse already ships. So run those three objects on a node that stores nothing.

drake says Drake meme: rejecting a stateful inserter service, preferring a stateless ClickHouse node with Kafka + MV + Distributed

Build-vs-reuse, in one frame.

You probably don't need an inserter service.

ClickHouse is already a perfectly good stream processor. Give it a node with no disk to worry about.

03

How it works

Four objects. Two node roles. One native-protocol hop.

The anatomy

Four objects, split across two roles

DATA nodes

① Data table
ReplicatedMergeTree · stores rows

INGESTION nodes (stateless)

② Writable Distributed table
routes INSERTs to the data table
③ Kafka table engine
consumes the topic
④ Materialized view
Kafka table → writable table

The MV reads from the Kafka table and writes to the writable Distributed table. The Distributed table forwards to the MergeTree on the data nodes. Done.

① Data table — on DATA nodes

The only place that actually stores rows

CREATE TABLE IF NOT EXISTS groups
(
    group_type_index UInt8,
    group_key        VARCHAR,
    created_at       DateTime64,
    team_id          Int64,
    group_properties VARCHAR,
    _timestamp       DateTime,
    _offset          UInt64
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/posthog.groups', '{replica}', _timestamp)
ORDER BY (team_id, group_type_index, group_key)

Plain replicated MergeTree. It has no idea Kafka exists. This is the only stateful piece.

② Writable table — on INGESTION nodes

A Distributed table is the "INSERT client"

CREATE TABLE IF NOT EXISTS writable_groups
(
    group_type_index UInt8, group_key VARCHAR, created_at DateTime64,
    team_id Int64, group_properties VARCHAR, _timestamp DateTime, _offset UInt64
)
ENGINE = Distributed('posthog_single_shard', 'default', 'groups');

Non-sharded target → point the Distributed table at posthog_single_shard. Sharded target → use the full posthog cluster with a sharding key:

ENGINE = Distributed('posthog', 'default', 'sharded_events', sipHash64(distinct_id));
③ Kafka table engine — on INGESTION nodes

The consumer

CREATE TABLE IF NOT EXISTS kafka_groups
(
    group_type_index UInt8, group_key VARCHAR, created_at DateTime64,
    team_id Int64, group_properties VARCHAR
)
ENGINE = Kafka(warpstream_ingestion,            -- named collection (broker list)
    kafka_topic_list = 'groups',
    kafka_group_name = 'group1',
    kafka_format     = 'JSONEachRow');

Broker lists come from named collections — which is how we run MSK and WarpStream side by side (more later). One consumer per INGESTION_SMALL node; more on the hotter tiers.

④ Materialized view — on INGESTION nodes

The wire between consumer and writer

CREATE MATERIALIZED VIEW IF NOT EXISTS groups_mv
TO writable_groups            -- ← writes into the Distributed table
AS SELECT
    group_type_index, group_key, created_at, team_id, group_properties,
    _timestamp, _offset
FROM kafka_groups;            -- ← reads from the Kafka table engine

Selecting from the Kafka table advances consumer offsets. The MV fires per block, transforms, and pushes into writable_groups → which forwards to the data nodes. That's the whole inserter.

Application A · end to end

The whole path

Kafka
topic: groups

stateless ingestion pod (K8s)

kafka_groups
groups_mv
writable_groups
Distributed
groups
MergeTree
(data nodes)

Native-protocol INSERT over the cluster. The pod holds no data — kill it, replace it, scale it, nothing is lost.

How DDL knows where to land

One migration, role-aware placement

operations = [
    run_sql_with_exceptions(GROUPS_TABLE_SQL(),          node_roles=[NodeRole.DATA]),
    run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]),
    run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(),    node_roles=[NodeRole.INGESTION_SMALL]),
    run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(),       node_roles=[NodeRole.INGESTION_SMALL]),
]

No ON CLUSTER. run_sql_with_exceptions fans each statement to exactly the hosts whose macro hostClusterRole matches — so the MergeTree lands on storage, and Kafka/MV/Distributed land on the stateless tier.

The roles

Ingestion is tiered by throughput

class NodeRole(StrEnum):
    DATA              = "data"        # the sharded MergeTree storage cluster
    INGESTION_SMALL   = "small"       # 1 Kafka consumer  — low-volume topics
    INGESTION_MEDIUM  = "medium"      # 4 Kafka consumers — mid-volume
    INGESTION_EVENTS  = "events"      # the main event firehose, many shards
    SHUFFLEHOG        = "shufflehog"  # re-partitions events so each key has one consumer
    # separate clusters: AI_EVENTS, AUX, OPS, SESSIONS, LOGS, ENDPOINTS ...

Each role is its own logical cluster of stateless nodes, sized to the topic it drains. Scaling ingestion now means adding cheap, stateless pods — never touching the storage tier.

The catch behind the tiers

Tuning the Kafka table engine is the fiddly part

ENGINE = Kafka(warpstream_ingestion,
    kafka_topic_list = 'events', kafka_group_name = 'clickhouse',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers       = 4,    -- ≤ topic partitions  AND  ≤ physical cores
    kafka_thread_per_consumer = 1);   -- without this, all 4 squash onto ONE thread
The trap
kafka_thread_per_consumer defaults to 0 — rows from every consumer get squashed into a single block on one thread. So raising kafka_num_consumers alone buys you nothing; you only get independent, parallel flushes with it set to 1.
Both are ENGINE settings in the DDL, capped by partition count and core count. Retuning means dropping and recreating the Kafka table — and getting it wrong is silent lag. This is the real lever behind small / medium / events.
04

In production

Two regions, ~60 stateless ingestion nodes, zero stored bytes.

The shape of it

Storage vs ingestion, by the numbers

Storage (stateful, EC2 + NVMe)

  • US: 10 shards × 3 replicas = 30 nodes
  • EU: 8 shards × 3 replicas = 24 nodes
  • i8ge metal, tiered NVMe→EBS→S3

Ingestion (stateless, K8s + EC2)

  • US: ~38 nodes — small / medium / events tiers
  • EU: ~22 nodes
  • commodity instances, recycled freely

Ingestion outnumbers storage by node count — but costs a fraction, because there's no data to hold or protect.

"Stateless" — literally

The config that proves there's nothing to lose

EC2 ingestion hosts (Ansible inventory):

hostClusterRole: ingestion
raid_volumes: []        # no NVMe / EBS data disks
backup_tables: []       # nothing to back up
clickhouse_disk_balancer_enabled: false

K8s ingestion pods (chart values):

volumeTemplates:
  - name: ingestion
    reclaimPolicy: Delete   # PVC dies with the pod
    resources:
      requests: { storage: 10Gi }   # local cache only
🙌 The K8s ingestion tier runs on Altinity's clickhouse-operator — each pod is a ClickHouseInstallation CRD (the chi-ingestion-* pods in kubectl get pods). Thanks for the operator, Altinity 👋

Pods get recycled and lose their disk — and it doesn't matter. The data is in Kafka and on the storage cluster; the ingestion tier is pure compute.

One pattern, two jobs

Same shape, different sink

Every stateless node is the same core — a Kafka table engine → materialized view, no MergeTree. The only thing that changes is what the MV writes into.

Application A · ingestion
MV
Distributed
table
sharded MergeTree
data nodes

Sink = ClickHouse.
The inserter service we never built.

Application B · ShuffleHog
MV
Kafka table
producer
re-partitioned
topic

Sink = Kafka, re-keyed.
Re-shapes the stream for a downstream service.

Application B · why ShuffleHog exists

Partition by the key your consumer aggregates on

events
raw stream
ShuffleHog
re-key: project+event
re-partitioned
key → 1 partition
propdefs
buffer + squash
Postgres
upserts

Downstream is propdefs (property-defs-rs): it tracks every project + event + property key seen, buffers them, and flushes idempotent upserts into Postgres.

The bug
The topic wasn't partitioned by project+event. The same (project, event) arrived on many partitions → many consumers upserting the same Postgres rows → tuple-lock contention → everything crawled.
The fix: re-partition so (project + event) hashes to one partition. Exactly one consumer owns each key, so the squash upserts with zero row contention in PG.
A bonus the pattern unlocked

Swapping Kafka under a live firehose

Migrating MSKWarpStream with zero downtime: run a fully parallel write path per backend — its own Kafka table engine → MV → Distributed table, separate consumer groups — both landing in the same data table.

kafka_groups
MSK
groups_mv
writable_groups
Distributed
kafka_groups_ws
WarpStream
groups_ws_mv
writable_groups_ws
Distributed
groups
MergeTree

Cut over by adding or removing a whole leg. No inserter to rewrite, no dual-write code — just DDL.

05

What we learned

The tradeoff

Sharp edges

The cost of this pattern is schema management. Every object — the data table, the Kafka table engine, the MV, the Distributed writable table — is schema we own and migrate, now across multiple node roles.

It's real engineering — but far simpler than building, scaling, and being on call for a stateful inserter service.

And we didn't start from zero: our complex topology already forced us to build a migration system. The stateless ingestion tiers just added a few layers: small, medium, events.

Why it stays fast: the operator-managed clusters are shards-only (replicasCount: 1) — by design. A migration lands on every node at once, instead of rolling through replicas and waiting on ZooKeeper. (The 3× replicas live on the EC2 data cluster, where durability matters; the stateless tier's tables aren't replicated anyway.)
Did it work?

The payoff

Ingestion scales independently from the data & query tiers. We add stateless ingestion pods without touching storage — and because the query tier's resources are now dedicated, query performance is far more deterministic.

  • Query storms no longer stall ingestion (and vice-versa).
  • Scale ingestion = add stateless pods, minutes not days.
  • Zero inserter service to build, staff, or be paged for.
The one thing

Reach for ClickHouse's own primitives before you build a service around it.

Kafka engine + materialized view + Distributed table, on a node that stores nothing, is a stream processor.

Thanks — and we're hiring

Questions?

The pattern, the migrations, and the topology are all open source.

We're hiringposthog.com/careers

QR code linking to posthog.github.io/presentations
Slides & more talks
posthog.github.io/presentations