Postgres + Citus + Partman, Your IoT Database

Craig Kerstiens

8 min read

Postgres is a robust data platform. Yes, it's more than a boring old relational database. It has rich indexing, data types (including JSON), and so much more. It also has support for a variety of extensions that can further broaden it's already great functionality. Two of those extensions when coupled together make Postgres a very compelling approach for IoT architectures. Today we're going to start from the ground up on how you would design your architecture with Postgres along with the Citus and pg_partman extensions.

Citus and sharding

Citus is an extension that allows you to shard your database across multiple nodes, while allowing your application to remain largely unaware. Citus can be beneficial to your app if:

  • You expect to outgrow the performance a single Postgres instance can deliver
  • Your schema/data model can be mapped cleanly to Citus
  • The queries/workload pattern can be mapped cleanly to Citus

Lucky for us, IOT workloads check the boxes for all of the above.

Starting with our IoT dataset

We're going to begin with a simple schema that relates to vehicles and tracks a few basic measurements against them. We'll also have a table that tracks the location of the vehicle at the time of each sensor sampling as well.

CREATE TABLE sensor_data (
  id SERIAL,
  car_id VARCHAR(17) NOT NULL,
  sensor_type VARCHAR(20) NOT NULL,
  sensor_value INT NOT NULL,
  timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);

CREATE TABLE location_data (
  id SERIAL,
  car_id VARCHAR(17) NOT NULL,
  latitude float8,
  longitude float8,
  timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);

While our above schema is simply it's not unrealistic of many IoT data models, though yours could be more complex.

How do we shard this dataset?

The key to sharding is that you can push down most of your joins to the node where the data is located. If you're having to move data in between nodes in order to join your performance will suffer. In the case of IoT workloads device_id is a very common pattern for the sharding key.

To turn this into a sharded database with Citus installed we simply need to run:

SELECT create_distributed_table('sensor_data', 'car_id');
SELECT create_distributed_table('location_data', 'car_id');

By default Citus will co-locate device IDs together because they're sharded on the same value and have the same number of shards. Citus uses a default of creating 32 shards, but it's configurable if you need more or less. It's worth noting that shards are separate from the number of nodes/instances. In this case if you had a Citus configuration of 1 coordinator and 2 workers each worker would reach 16 shards of sensor_data and 16 of location_data.

Now once sharded you we can see how Citus will work. Lets run two different queries and see how Citus works with each:

SELECT sensor_data.car_id, max(sensor_value)
FROM sensor_data
WHERE sensor_type = 'temperature'
GROUP BY 1
ORDER BY 2 DESC;

In the above case Citus will actually parallelize the query and in total run 32 queries, one against each shard, bring back the results to the coordinator and compute the final result. This means for each individual query you run you'd have 32 total connections going out from your coordinator and executing queries. This is great for parallelism, but a big trade-off is the concurrency you get in the number of queries you can run.

However, if we modify the query to include the car_id Citus will execute a single query on the worker node where the data lives.

SELECT sensor_data.car_id, max(sensor_value)
FROM sensor_data
WHERE sensor_type = 'temperature'
  AND car_id='433P2C2S7TJ654181';

Even if we were to expand the query to return the location data as well, because the data is co-located Citus knows it can push down the join to that single node.

SELECT sensor_data.car_id,
       max(sensor_value),
       location_data.latitude,
       location_data.longitude
FROM sensor_data,
     location_data
WHERE sensor_type = 'temperature'
  AND sensor_data.car_id='433P2C2S7TJ654181';
  AND location_data.car_id='433P2C2S7TJ654181';
  AND sensor_data.car_id = location_data.car_id
  AND sensor_data.timestamp = location_data.timestamp

Again, if you anticipate a large data volume and issues scaling performance, your data model can be structured to be cleanly sharded, and your query workload fits well into Citus. Citus gives you a lot of peace of mind to scale out. But where does time series come in?

Time series and Citus

Postgres itself already has native partitioning built-in, but we often recommend coupling that with pg_partman which extends the native partitioning with some helper utilities to make it easier to work with. Partitioning is the process of separating data out by particular buckets into separate tables. In an IOT scenario you may want to retain data on all of your vehicles for the past year, but in most cases are only querying the data for the last week. In that case you could easily partition your data by week, this would allow more easily for the smaller data set of the last week or two to be kept in memory because it is smaller and corresponding indexes are also smaller and easier to maintain.

In order to set up pg_partman with Citus we're actually going to start fresh and create our tables as partitioned tables. Here we can see the end to end setup similar to earlier with Citus, but this time with partitioned tables:

CREATE TABLE sensor_data (
  id SERIAL,
  car_id VARCHAR(17) NOT NULL,
  sensor_type VARCHAR(20) NOT NULL,
  sensor_value INT NOT NULL,
  timestamp TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (timestamp);

CREATE TABLE location_data (
  id SERIAL,
  car_id VARCHAR(17) NOT NULL,
  latitude float8,
  longitude float8,
  timestamp TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (timestamp);

SELECT create_distributed_table('sensor_data', 'car_id');
SELECT create_distributed_table('location_data', 'car_id');

Now if we look at our database, it still contains just a few tables sensor_data and location_data:

\d
                             List of relations
 Schema |           Name            |       Type        |       Owner
--------+---------------------------+-------------------+-------------------
 public | location_data             | partitioned table | application
 public | location_data_id_seq      | sequence          | application
 public | pg_stat_statements        | view              | crunchy_superuser
 public | pg_stat_statements_info   | view              | crunchy_superuser
 public | sensor_data               | partitioned table | application
 public | sensor_data_id_seq        | sequence          | application
(6 rows)

We've declared sensor_data and location_data as partitioned tables, we haven't done anything to setup the initial partitions. Here we're going to leverage pg_partman to create the partitions. We're going to have it create monthly partitions, you could have this been weekly, daily or some other granularity. We're going to have it create partitions starting at 1 month ago:

SELECT partman.create_parent('public.sensor_data', 'thetime', 'native', 'monthly',
p_start_partition := (now() - interval '1 month')::date::text );
SELECT partman.create_parent('public.location_data', 'thetime', 'native', 'monthly',
p_start_partition := (now() - interval '1 month')::date::text );

-- Configure partman to continue creating partitions

UPDATE partman.part_config SET infinite_time_partitions = true;

-- Configure partman to regularly run to create new partitions
SELECT cron.schedule('@hourly', $$SELECT partman.run_maintenance()$$);

So now we're running partitioned data inside each of our Citus shards.

Long term data retention & columnar compression

The above approach for partitioning and sharding works great when it comes to building your application and keeping it performant. Enter the cost management side of the equation. Retaining all data for all of time is valuable if the cost is free, but saving all of your data so it’s easily queryable isn’t actually going to be free. Enter Citus columnar support. Citus columnar support comes with a few caveats:

  • No support for updates or deletes
  • No support for logical replication or decoding

Fortunately for us, our IoT use case still can fully take advantage of the columnar format and provide:

  • Great storage compression
  • Faster querying when scanning lots of sequential data

Let's look at turning a table into a columnar one:

SELECT alter_table_set_access_method('sensor_data_2023_oct', 'columnar');

This will change the partition for sensor_data in October into a columnar format.

We can now run a VACUUM VERBOSE sensor_data on the table and see that we have a 10.20x compression rate!

VACUUM VERBOSE sensor_data;
INFO:  statistics for "sensor_data":
storage id: 10000000068
total file size: 64897024, total data size: 64252933
compression rate: 10.20x
total row count: 11999999, stripe count: 80, average rows per stripe: 149999
chunk count: 6000, containing data for dropped columns: 0, zstd compressed: 6000

Because our IoT data generally comes in within a set period of time, and is immutable after a specific date, we can then go and compress partitions after a certain period of time. In this case we’re going to convert all partitions older than 3 months. Bear with us because the pg_cron incantation for it is a bit gnarly, but gets the job done:

DO $accessmethod$
DECLARE
    v_row_partitions    record;
    v_row_info          record;
    v_sql               text;
BEGIN

FOR v_row_partitions IN
    SELECT partition_schemaname||'.'||partition_tablename AS partition_name FROM partman.show_partitions('partman_test.time_taptest_table')
LOOP
    FOR v_row_info IN
        SELECT child_start_time, child_end_time FROM partman.show_partition_info(v_row_partitions.partition_name)
    LOOP
        IF v_row_info.child_end_time < CURRENT_TIMESTAMP - '3 months'::interval THEN
            v_sql := format('SELECT alter_table_set_access_method(%L, columnar)', v_row_partitions.partition_name);
            EXECUTE '%', v_sql;
        END IF;
    END LOOP;
END LOOP;
END
$accessmethod$;

Your scalable IOT database with Citus, pg_partman, and columnar compression

And there we have it, a horizontally scalable database for an IOT workload driven by:

  • Citus based sharding for seamless scaling and performance
  • pg_partman for native time-series partitioning, giving us faster query recall and reporting
  • Columnar compression to help us better manage storage and longer term retention
Avatar for Craig Kerstiens

Written by

Craig Kerstiens

November 17, 2023 More by this author