Skip to content

Storage Backends Guide

Django DeviceHub uses pluggable storage backends for time-series reading data. The storage backend controls how readings are written and queried, while device metadata and raw messages always stay in the Django ORM.

Choosing a Backend

Backend Best for Requirements
Django ORM (default) Development, small deployments, any Django-supported database None
TimescaleDB Medium to large deployments, PostgreSQL environments TimescaleDB extension
InfluxDB 2.x High-volume time-series, purpose-built compression and retention InfluxDB server

Django ORM Backend (Default)

Works with any database Django supports -- PostgreSQL, SQLite, MySQL. No additional setup required.

DJANGO_DEVICEHUB = {
    "STORAGE": {
        "ENGINE": "django_devicehub.storage.django_orm.DjangoORMStorage",
    },
}

Since this is the default, you can omit the STORAGE key entirely.

API

from django_devicehub.storage.django_orm import DjangoORMStorage

storage = DjangoORMStorage()

store_reading(reading_model, device, timestamp, data)

Create a reading record in the database:

from django.utils import timezone

storage.store_reading(
    WeatherStationReading,
    device,
    timezone.now(),
    {"temperature": 25.3, "humidity": 80.1},
)

query_readings(reading_model, device=None, start=None, end=None, fields=None, limit=None, order="-timestamp")

Query readings with optional filters:

# Last 100 readings for a device
readings = storage.query_readings(
    WeatherStationReading,
    device=device,
    limit=100,
)

# Date range query, specific fields
from datetime import datetime, timezone as tz
readings = storage.query_readings(
    WeatherStationReading,
    device=device,
    start=datetime(2025, 1, 1, tzinfo=tz.utc),
    end=datetime(2025, 1, 31, tzinfo=tz.utc),
    fields=["temperature"],
)

aggregate(reading_model, field, function="avg", device=None, start=None, end=None, interval=None)

Aggregate readings. Without interval, returns a scalar. With interval, returns time-bucketed results:

# Average temperature across all readings
result = storage.aggregate(WeatherStationReading, "temperature", "avg")
# {"result": 23.5}

# Hourly averages
buckets = storage.aggregate(
    WeatherStationReading, "temperature", "avg",
    device=device,
    interval="hour",
)
# QuerySet of {"bucket": datetime, "result": float}

Supported aggregate functions: avg, min, max, sum, count.

Supported interval values (as strings): minute, hour, day, week, month. You can also pass a timedelta object.

downsample(reading_model, field, interval, function="avg", ...)

Convenience alias for aggregate() with an interval -- downsamples time-series data to a lower resolution:

hourly = storage.downsample(
    WeatherStationReading, "temperature", "hour", "avg",
    device=device,
)

TimescaleDB Backend

TimescaleDB is a PostgreSQL extension optimized for time-series data. The TimescaleDB backend inherits all ORM methods and adds:

  • time_bucket() for efficient time-series aggregation
  • Continuous aggregates (auto-refreshed materialized views)
  • Retention policies for automatic data expiration
  • Hypertable management

Installation

pip install django-devicehub[timescale]

Ensure TimescaleDB is installed on your PostgreSQL server and the extension is enabled:

CREATE EXTENSION IF NOT EXISTS timescaledb;

Configuration

DJANGO_DEVICEHUB = {
    "STORAGE": {
        "ENGINE": "django_devicehub.storage.timescale.TimescaleDBStorage",
    },
}

Creating Hypertables

Use the CreateHypertable migration operation to convert reading tables to hypertables:

# myapp/migrations/0002_hypertables.py
from django.db import migrations
from django_devicehub.storage.timescale import CreateHypertable

class Migration(migrations.Migration):
    dependencies = [("myapp", "0001_initial")]

    operations = [
        CreateHypertable(
            table_name="myapp_weatherstationreading",
            time_column="timestamp",
            chunk_time_interval="7 days",
            if_not_exists=True,
        ),
    ]

Parameters:

Parameter Default Description
table_name (required) Database table name to convert
time_column "timestamp" Column used as the time dimension
chunk_time_interval "7 days" Size of each hypertable chunk
if_not_exists True Skip if already a hypertable

The operation is reversible -- the reverse migration is a no-op (TimescaleDB does not support reverting hypertables to regular tables).

Aggregation with time_bucket()

The TimescaleDB backend overrides aggregate() to use TimescaleDB's native time_bucket() function, which is significantly faster than Django's Trunc() on hypertables:

from django_devicehub.storage.timescale import TimescaleDBStorage

storage = TimescaleDBStorage()

# 5-minute averages (uses time_bucket() under the hood)
buckets = storage.aggregate(
    WeatherStationReading, "temperature", "avg",
    device=device,
    interval="5 minutes",
)
# [{"bucket": datetime, "result": 23.1}, {"bucket": datetime, "result": 23.4}, ...]

The interval parameter accepts TimescaleDB interval strings ("5 minutes", "1 hour", "1 day") or Python timedelta objects.

Continuous Aggregates

Create auto-refreshing materialized views for common queries:

storage.continuous_aggregate(
    name="hourly_avg_temperature",
    reading_model=WeatherStationReading,
    field="temperature",
    function="avg",
    interval="1 hour",
    start_offset="1 month",    # refresh data up to 1 month old
    end_offset="1 hour",       # freshness lag
    schedule_interval="1 hour", # how often the refresh job runs
    with_data=True,             # backfill existing data immediately
)

This creates a materialized view grouped by (time_bucket, device_id) with an automatic refresh policy.

Retention Policies

Automatically drop old data:

# Drop readings older than 90 days
storage.create_retention_policy(WeatherStationReading, drop_after="90 days")

# Or use timedelta
from datetime import timedelta
storage.create_retention_policy(WeatherStationReading, drop_after=timedelta(days=30))

# Remove the policy
storage.remove_retention_policy(WeatherStationReading)

Hypertable Info

Inspect hypertable metadata:

info = storage.hypertable_info(WeatherStationReading)
# {"hypertable_name": "myapp_weatherstationreading",
#  "num_chunks": 12,
#  "total_size": "45 MB"}

Returns None if the table is not a hypertable.

InfluxDB 2.x Backend

InfluxDB is a purpose-built time-series database with its own storage engine, compression, and Flux query language.

The InfluxDB backend writes readings as InfluxDB points while device metadata and raw messages remain in the Django ORM.

Installation

pip install django-devicehub[influxdb]

Configuration

DJANGO_DEVICEHUB = {
    "STORAGE": {
        "ENGINE": "django_devicehub.storage.influxdb.InfluxDBStorage",
        "INFLUXDB_URL": "http://localhost:8086",
        "INFLUXDB_TOKEN": "my-token",
        "INFLUXDB_ORG": "my-org",
        "INFLUXDB_BUCKET": "iot_readings",
    },
}

Create the bucket in InfluxDB:

influx bucket create -n iot_readings -o my-org -r 90d

How readings are stored

Readings are written as InfluxDB points:

  • measurement: device type name in lowercase (e.g., weatherstation)
  • tags: device_id
  • fields: reading values (e.g., temperature=25.3, humidity=80.1)
  • time: reading timestamp (nanosecond precision)

The Django ORM reading model is not written to by this backend. If you need both, subclass InfluxDBStorage and override store_reading() to dual-write.

Querying

query_readings() returns a list of dicts (not a Django QuerySet):

from django_devicehub.storage.influxdb import InfluxDBStorage

storage = InfluxDBStorage()

readings = storage.query_readings(
    WeatherStationReading,
    device=device,
    start=start_dt,
    end=end_dt,
    fields=["temperature", "humidity"],
    limit=100,
)
# [{"device_id": "ws-001", "timestamp": datetime, "temperature": 25.3, "humidity": 80.1}, ...]

Aggregation

# Scalar aggregate
result = storage.aggregate(WeatherStationReading, "temperature", "avg")
# {"result": 23.5}

# Time-bucketed aggregate (uses InfluxDB aggregateWindow)
buckets = storage.aggregate(
    WeatherStationReading, "temperature", "avg",
    interval="1 hour",
)
# [{"bucket": datetime, "result": 23.1}, ...]

The interval parameter accepts:

  • Human-readable strings: "5 minutes", "1 hour", "1 day"
  • Flux duration syntax: "5m", "1h", "1d"
  • Python timedelta objects

InfluxDB-specific methods

delete_readings(reading_model, device=None, start=None, end=None)

Delete readings from InfluxDB:

storage.delete_readings(WeatherStationReading, device=device,
                        start=start_dt, end=end_dt)

health_check()

Check InfluxDB connectivity:

health = storage.health_check()
# {"status": "pass", "message": "", "version": "2.7.1"}

close()

Close the InfluxDB client connection:

storage.close()

Writing a Custom Backend

Extend BaseStorage to implement a custom storage backend:

from django_devicehub.storage.base import BaseStorage

class MyStorage(BaseStorage):
    def store_reading(self, reading_model, device, timestamp, data):
        """Store a sensor reading."""
        ...

    def query_readings(self, reading_model, device=None, start=None, end=None,
                       fields=None, limit=None, order="-timestamp"):
        """Query stored readings with optional filters."""
        ...

    def aggregate(self, reading_model, field, function="avg", device=None,
                  start=None, end=None, interval=None):
        """Aggregate readings (avg, min, max, sum, count).

        If interval is provided, return time-bucketed results.
        """
        ...

The downsample() method has a default implementation that delegates to aggregate(), so you only need to override it if you have a more efficient implementation.

Register your backend in settings:

DJANGO_DEVICEHUB = {
    "STORAGE": {
        "ENGINE": "myapp.storage.MyStorage",
    },
}