Use Case

A number of sensors continuously generating data points at regular intervals (2 seconds). The readings include a humidity value and a temperature value.

CREATE TABLE IF NOT EXISTS dht_readings (
    time TIMESTAMPTZ NOT NULL,
    device_id TEXT,
    temperature FLOAT NOT NULL,
    humidity FLOAT NOT NULL
)

SELECT create_hypertable('dht_readings', 'time', if_not_exists => TRUE);

What Is Downsampling

Downsampling reduces the granularity of a data set from a fine grained time series to a coarse set of data. As an example, if our sensors compute and send a reading every 2 seconds we can downsample to 1 hour, 5 hours, and 1 day averages.

In our example each sensor would generate a total of 60/2s * 60m * 24h = 43200 samples x day x device. When we downsample to 1 hour we have 24 samples per day.

We are trading off data resolution for data compactness augmenting the amount of information that we can store over time.

To do this we use PostgreSQL aggregation functions to make value aggregates, continuous aggregates to recompute aggregates and materialize the results, and data retention policies to remove data after a period of time.

What is the best PostgrSQL aggregation function for our example use case?

Process

Continuous Aggregates

In this step we create a view that will rollup an average of values for a time period that is greater than our original sampling rate.

CREATE VIEW dht_readings_5m
WITH (
    timescaledb.continuous,
    timescaledb.ignore_invalidation_older_than = '5d',
    timescaledb.refresh_lag = '-30m',
    timescaledb.refresh_interval = '5m'
)
AS SELECT
  time_bucket('5m', time) as bucket,
  device_id,
  AVG(humidity) as humidity_avg,
  MAX(humidity) as humidity_max,
  MIN(humidity) as humidity_min,
  AVG(temperature) as temperature_avg,
  MAX(temperature) as temperature_max,
  MIN(temperature) as temperature_min
FROM dht_readings
GROUP BY bucket, device_id, temperature, humidity;

You can find the time_bucket documentation here with a set of sample queries that provide good insights on how to use it.

We should create aggregates for our other buckets of 5h and 24h.

Data Retention Policy

In order to clean up the space taken by the raw data set we will create a retention policy to delete the finer grained sensor data collected.

SELECT add_drop_chunks_policy('dht_readings', INTERVAL '5 days', cascade_to_materializations => FALSE);

We are dropping dht_readings data older than 5 days but we maintain the aggregated views such as dht_readings_5m, dht_readings_1h, and dht_readings_24h.

You can get some information regarding the continuous aggregation pipeline by issuing the following query:

SELECT * FROM timescaledb_information.continuous_aggregate_stats;
view_namecompleted_thresholdinvalidation_thresholdjob_idlast_run_started_atlast_successful_finishlast_run_statusjob_statuslast_run_durationnext_scheduled_runtotal_runstotal_successestotal_failurestotal_crashes
dht_readings_5m2020-07-23 01:40:00+002020-07-23 01:40:00+0010022020-07-23 01:36:53.076732+002020-07-23 01:36:53.158439+00SuccessScheduled00:00:00.0817072020-07-23 01:41:53.158439+001339133900
dht_readings_1h2020-07-23 01:00:00+002020-07-23 01:40:00+0010032020-07-23 01:29:37.550259+002020-07-23 01:29:37.674958+00SuccessScheduled00:00:00.1246992020-07-23 02:29:37.674958+00151500
dht_readings_1d2020-07-23 00:00:00+002020-07-23 01:40:00+0010042020-07-23 00:29:47.522506+002020-07-23 00:29:48.247258+00SuccessScheduled00:00:00.7247522020-07-24 00:29:48.247258+001100

Querying

Data Backfill

See time_bucket_gapfill and

Notes

Resources: