Manual Record Sharding for High-Performance Updates in Postgres
A wise person once said, "the only mutations in a high-performance postgres database are INSERT and TRUNCATE," or something like that, and they were essentially right: if you are handling on the order of a thousand operations per second or more on a table, UPDATE and DELETE operations can easily become untenable. When many operations attmptto update a "hot" row at once, they must all be serialized rather than occurring in parallel. If such updates occur within the context of larger transactions, the transactions also in effect become serialized. It takes exquisite and painstaking care to add any row-locking operation into a high-volume database, and so the general wisdom is to translate schemas requiring UPDATE operations into insert-only schemas: rather than updating a row, insert a new one.
The Naive Update
For example, let's say we want to write to a table for a web application to track when we last saw some IP address, and how many times we have seen it in total. A naive schema may look like:
column | type |
---|---|
id | bigint (generated by default as identity) |
ip_address | inet |
last_seen | timestamptz |
count | bigint |
Throw a unique index on ip_address
, and issue a query like the
following, and bing bang boom we're done, right?
INSERT INTO ip_addresses (ip_address, last_seen, count)
VALUES ('192.168.0.1', now(), 1)
ON CONFLICT (ip_address) DO UPDATE
SET last_seen = EXCLUDED.last_seen,
count = count + EXCLUDED.count
Well, not necessarily. It obviously "works," but if some IP address is sending thousands of requests per second, this will quickly lock up all of your connections in your connection pool, which will spiral into your webserver holding thousands of open connections, and then all of a sudden you're completely offline. The reason of course is that UPDATE requires a lock on the row in question, and that lock must be released before the next UPDATE can occur, and so on down the line.
Insert-Only Solutions
So, let's take the common wisdom and design an insert-only
solution. We might imagine using the exact same columns as above, but
with no PK on ip_address
. You could then do your inserts like:
WITH previous AS (
SELECT count FROM ip_addresses
WHERE ip_address = '192.168.0.1'
ORDER BY last_seen DESC
LIMIT 1
)
INSERT INTO ip_addresses (ip_address, last_seen, count)
VALUES ('192.168.0.1', now(), COALESCE(previous.count, 0) + 1)
Getting last seen and current count is then a simple:
SELECT last_seen, count
FROM ip_addresses
WHERE ip_address = '192.168.0.1'
ORDER BY last_seen desc
LIMIT 1
This will improve your maximum throughput, to be sure. However, there
are a few problems with it. One is that we now must have an index on
(ip_address, last_seen desc)
in order to enable the query in the
previous
CTE, rather than just on ip_address
. Maybe this was
already an index you wanted, and either way it's not that large an
index, so it's not a big deal. A bigger problem is that we now have to
do the extra query in the previous
CTE, with the index on
(ip_address, last_seen desc)
that should be pretty fast, so maybe it
also is no big deal. The real problem is that, with many
simultaneous requests, the final count
is going to be significantly
incorrect. Say that five requests come in all at once, and all make
the same query in the previous
CTE. They all see that the most
recent value of count
was 5
, and so they all write 6
for the
count. However, 6
is wrong! The count should be 10
. For
high-frequency data, this error will compound over time. Of course you
can throw a SELECT ... FOR UPDATE
in the previous
query, but then
you reintroduce the same issue as with the UPDATE query.
Ultimately, to use an insert-only solution and retain accuracy, you can simplify your schema:
column | type |
---|---|
id | bigint (generated by default as identity) |
ip_address | inet |
time | timestamptz |
And then simply insert new records:
INSERT INTO ip_addresses (ip_address, time)
VALUES ('192.168.0.1', now())
Last seen is them still the simple:
SELECT time
FROM ip_addresses
WHERE ip_address = '192.168.0.1'
ORDER BY time DESC
LIMIT 1
While getting the total count now becomes a query like:
SELECT count(*) FROM ip_addresses WHERE ip_address = '192.168.0.1'
This completely solves our problem with insert throughput. A table
like the above can easily support tens of thousands of records being
inserted per second on a sufficiently beefy machine. However, there is
no free lunch. We now have a new problem: count() is
slow. Now,
maybe it is fast enough for your use case, but its latency grows
linearly with table size, so if it is possible for you to reach
millions or billions of rows, count()
will eventually start to take
a very long time.
Any insert-only solution also introduces the problem of bloat: you now have N records where before you would just have one. With the above solution, where you need a total count over time, this is especially bad, since you need to retain every record for all of time (aside: if you don't need to aggregate the entire table and only need recent rows, a much faster solution than running DELETEs is to partition the table by time and just drop old partitions).
So, is there any way we can have our cake (fast inserts) and eat it too (fast aggregate queries)?
Manual Sharding to Reintroduce Updates
Sharding is the practice of splitting one object up into many, often in order to improve write performance. In distributed databases, each database instance is often called a "shard." Generally, in such databases, there must be some key to determine the appropriate shard for a given piece of data, both to determine where to write it to and where to read it from. In postgres, this of course is essentially the idea of partitions, where the partition key determines which partition a row will be on.
However, partitions in postgres can be difficult to manage, and they aren't really granular enough for what we want here. As such, we're going to manually shard our data into N buckets. Our schema might look like this:
column | type |
---|---|
id | bigint (generated by default as identity) |
ip_address | inet |
bucket | int |
last_seen | timestamptz |
count | bigint |
We will now want a unique index on (ip_address, bucket)
, rather than
on ip_address
alone.
When inserting data, we will randomly determine a bucket to write it to, and then issue a query much like the original, naive one:
INSERT INTO ip_addresses (ip_address, bucket, last_seen, count)
VALUES ('192.168.0.1', 2, now(), 1)
ON CONFLICT (ip_address, bucket) DO UPDATE
SET last_seen = EXCLUDED.last_seen,
count = count + EXCLUDED.count
Retrieving last seen then looks like:
SELECT max(last_seen)
FROM ip_addresses
WHERE ip_address = '192.168.0.1'
Note that for a max()
or min()
column like this, you could also
add an index on (ip_address, last_seen)
, which should make the query
above even faster, or allow you to structure it like SELECT last_seen FROM ip_addresses WHERE ip_address = ? ORDER BY last_seen DESC LIMIT 1
.
And similarly, to get the total count:
SELECT sum(count)
FROM ip_addresses
WHERE ip_address = '192.168.0.1'
GROUP BY ip_address
Proivded you have fewer than around 1,000 buckets, it should be trivial and quite fast for postgres to perform the aggregation.
Because your updates are now spread out over N buckets for a given record, you are much less likely to encounter issues with update contention, and your aggregate queries are now virtually instantaneous.
For what it's worth, while I think it is possible to hide the bucketing behind a stored procedure or a view with some fancy rules, I think it is better to keep the query complexity in your application, both because it allows you to more easily adjust bucket count and because I think, especially in high-performance applications, abstracting too much about the data layer is a mistake: the schema of the table is essential information when designing effective queries, so other than for things like partitions (where the encapsulation is baked into postgres itself), I think it's usually better to see it as it is.
Targeting a Bucket
There are a lot of ways you could target a bucket. The two easiest are probably random choice and time.
For random choice, you use your favorite random number generator to pick a number between 1 and N, where N is your number of buckets.
For time, get a microsecond timestamp (or if your language does not support microsecond time, millisecond), and then take the modulo of N, where N is the number of buckets. The result is your target bucket.
The time approach has the natural benefit of distributing high-frequency updates across buckets fairly evenly, and using a microsecond or even nanosecond timestamp means that even for incoming records that are essentially concurrent, they are unlikely to hit the same bucket. Some Rust to accomplish the time-based approach might look like:
use std::time::SystemTime;
const BUCKET_COUNT: u32 = 256;
let bucket = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
// expect safety: the system time would need to be < 1970/01/01 for this to fail
.expect("System time is irretrievably broken")
.subsec_micros()
% BUCKET_COUNT;
sqlx::query!(
r#"
INSERT INTO ip_addresses (ip_address, bucket, last_seen, count)
VALUES ($1, $2, now(), 1)
ON CONFLICT (ip_address, bucket) DO UPDATE
SET last_seen = EXCLUDED.last_seen
count = ip_addresses.count + 1"#,
"192.168.0.1",
bucket,
)
.execute(cxn)
.await
.unwrap();
Choosing a Bucket Count
You should definitely benchmark this for your system! I have seen good results with bucket counts starting from around 128 and going up through around 1024.
The nice thing about this approach is that, since the SELECT queries are ignorant of how many buckets there are, you should be free to adjust the bucket count over time in order to tweak insert performance, even via something like an environment variable.
Benchmarks
Beware benchmarks on other people's hardware, but here are some numbers from my local machine. You can find the benchmarks on GitHub at @mplanchard/manual-sharding-benchmarks.
Benchmark setup was:
- Machine: 20 cores, 64 GB RAM
- Postgres 17.6
- Multi-threaded async runtime, 10 threads
- sqlx postgres driver, connection pool with 20 connections
- 1,000,000 inserts, spread across 20 async tasks, each inserting in a loop
- Records inserted until all inserts complete or 60 seconds, whichever came first
Used two tables:
- sequential: (id bigint pk, count bigint)
- buckets: (id bigint, bucket int, count bigint, pk (id, bucket))
Benchmarks were:
- insert-only, no update: inserts with no key contention on
sequential
- time-based buckets: insert with an on-conflict cause into
buckets
, bucket chosen by modulo on current nanosecond portion of timestamp - random buckets: insert with an on-conflict cause into
buckets
, bucket chosen randomly - sequential: insert with an on-conflict cause into
sequential
For benchmarks inserting with an on-conflict clause, every insert was on the same primary key, so they are really 1 insert and 999,999 updates.
bench: sequential upsert
total: 257458
elapsed: 60 s
rate: 4290/s
bench: insert-only, no update
total: 1000000
elapsed: 33 s
rate: 30303/s
bench: bucket upsert: 1000 time-based buckets
total: 1000000
elapsed: 35 s
rate: 28571/s
bench: bucket upsert: 100 time-based buckets
total: 1000000
elapsed: 35 s
rate: 28571/s
bench: bucket upsert: 1000 random buckets
total: 1000000
elapsed: 35 s
rate: 28571/s
bench: bucket upsert: 100 random buckets
total: 1000000
elapsed: 36 s
rate: 27777/s
As you can see, with pure inserts we were able to hit around 30k rows/s, while with sequential upserts we were only ble to hit 4.3k rows/s, which is an ~86% decrease. However, all upserts with manual sharding were able to hit ~28.5k rows/s, which is only a 5-6% decrease.
So, ultimately, we retain ~95% of our insert throughput in a maximum contention scenario, while still enabling updates! Not a bad tradeoff, in my opinion.
While your mileage may vary, you can also see that you don't necessarily need a huge number of buckets. In this benchmark at least, 100 buckets was equivalent to 1,000 buckets across the board. However, I highly recommend running a benchmark more tuned to your specific data and use case in order to decide on your bucket count.
Manual Sharding for Other Aggregations
Here, we saw an example of manual sharding with the max()
aggregation and with the sum()
aggregation. Of course, these are not
the only ones you can use! Any aggregation that can derived based on
intermediate data should work. For example, sum()
doesn't require
intermediate data, since sum(1, 2, 3 4)
is the same as sum(sum(1, 2), sum(3, 4))
.
Aggregations that do not have this property are those that require all input data to calculate: things like average, median, etc.