Storage Backends Guide¶
Django DeviceHub uses pluggable storage backends for time-series reading data. The storage backend controls how readings are written and queried, while device metadata and raw messages always stay in the Django ORM.
Choosing a Backend¶
| Backend | Best for | Requirements |
|---|---|---|
| Django ORM (default) | Development, small deployments, any Django-supported database | None |
| TimescaleDB | Medium to large deployments, PostgreSQL environments | TimescaleDB extension |
| InfluxDB 2.x | High-volume time-series, purpose-built compression and retention | InfluxDB server |
Django ORM Backend (Default)¶
Works with any database Django supports -- PostgreSQL, SQLite, MySQL. No additional setup required.
DJANGO_DEVICEHUB = {
"STORAGE": {
"ENGINE": "django_devicehub.storage.django_orm.DjangoORMStorage",
},
}
Since this is the default, you can omit the STORAGE key entirely.
API¶
store_reading(reading_model, device, timestamp, data)¶
Create a reading record in the database:
from django.utils import timezone
storage.store_reading(
WeatherStationReading,
device,
timezone.now(),
{"temperature": 25.3, "humidity": 80.1},
)
query_readings(reading_model, device=None, start=None, end=None, fields=None, limit=None, order="-timestamp")¶
Query readings with optional filters:
# Last 100 readings for a device
readings = storage.query_readings(
WeatherStationReading,
device=device,
limit=100,
)
# Date range query, specific fields
from datetime import datetime, timezone as tz
readings = storage.query_readings(
WeatherStationReading,
device=device,
start=datetime(2025, 1, 1, tzinfo=tz.utc),
end=datetime(2025, 1, 31, tzinfo=tz.utc),
fields=["temperature"],
)
aggregate(reading_model, field, function="avg", device=None, start=None, end=None, interval=None)¶
Aggregate readings. Without interval, returns a scalar. With interval, returns time-bucketed results:
# Average temperature across all readings
result = storage.aggregate(WeatherStationReading, "temperature", "avg")
# {"result": 23.5}
# Hourly averages
buckets = storage.aggregate(
WeatherStationReading, "temperature", "avg",
device=device,
interval="hour",
)
# QuerySet of {"bucket": datetime, "result": float}
Supported aggregate functions: avg, min, max, sum, count.
Supported interval values (as strings): minute, hour, day, week, month. You can also pass a timedelta object.
downsample(reading_model, field, interval, function="avg", ...)¶
Convenience alias for aggregate() with an interval -- downsamples time-series data to a lower resolution:
TimescaleDB Backend¶
TimescaleDB is a PostgreSQL extension optimized for time-series data. The TimescaleDB backend inherits all ORM methods and adds:
time_bucket()for efficient time-series aggregation- Continuous aggregates (auto-refreshed materialized views)
- Retention policies for automatic data expiration
- Hypertable management
Installation¶
Ensure TimescaleDB is installed on your PostgreSQL server and the extension is enabled:
Configuration¶
DJANGO_DEVICEHUB = {
"STORAGE": {
"ENGINE": "django_devicehub.storage.timescale.TimescaleDBStorage",
},
}
Creating Hypertables¶
Use the CreateHypertable migration operation to convert reading tables to hypertables:
# myapp/migrations/0002_hypertables.py
from django.db import migrations
from django_devicehub.storage.timescale import CreateHypertable
class Migration(migrations.Migration):
dependencies = [("myapp", "0001_initial")]
operations = [
CreateHypertable(
table_name="myapp_weatherstationreading",
time_column="timestamp",
chunk_time_interval="7 days",
if_not_exists=True,
),
]
Parameters:
| Parameter | Default | Description |
|---|---|---|
table_name |
(required) | Database table name to convert |
time_column |
"timestamp" |
Column used as the time dimension |
chunk_time_interval |
"7 days" |
Size of each hypertable chunk |
if_not_exists |
True |
Skip if already a hypertable |
The operation is reversible -- the reverse migration is a no-op (TimescaleDB does not support reverting hypertables to regular tables).
Aggregation with time_bucket()¶
The TimescaleDB backend overrides aggregate() to use TimescaleDB's native time_bucket() function, which is significantly faster than Django's Trunc() on hypertables:
from django_devicehub.storage.timescale import TimescaleDBStorage
storage = TimescaleDBStorage()
# 5-minute averages (uses time_bucket() under the hood)
buckets = storage.aggregate(
WeatherStationReading, "temperature", "avg",
device=device,
interval="5 minutes",
)
# [{"bucket": datetime, "result": 23.1}, {"bucket": datetime, "result": 23.4}, ...]
The interval parameter accepts TimescaleDB interval strings ("5 minutes", "1 hour", "1 day") or Python timedelta objects.
Continuous Aggregates¶
Create auto-refreshing materialized views for common queries:
storage.continuous_aggregate(
name="hourly_avg_temperature",
reading_model=WeatherStationReading,
field="temperature",
function="avg",
interval="1 hour",
start_offset="1 month", # refresh data up to 1 month old
end_offset="1 hour", # freshness lag
schedule_interval="1 hour", # how often the refresh job runs
with_data=True, # backfill existing data immediately
)
This creates a materialized view grouped by (time_bucket, device_id) with an automatic refresh policy.
Retention Policies¶
Automatically drop old data:
# Drop readings older than 90 days
storage.create_retention_policy(WeatherStationReading, drop_after="90 days")
# Or use timedelta
from datetime import timedelta
storage.create_retention_policy(WeatherStationReading, drop_after=timedelta(days=30))
# Remove the policy
storage.remove_retention_policy(WeatherStationReading)
Hypertable Info¶
Inspect hypertable metadata:
info = storage.hypertable_info(WeatherStationReading)
# {"hypertable_name": "myapp_weatherstationreading",
# "num_chunks": 12,
# "total_size": "45 MB"}
Returns None if the table is not a hypertable.
InfluxDB 2.x Backend¶
InfluxDB is a purpose-built time-series database with its own storage engine, compression, and Flux query language.
The InfluxDB backend writes readings as InfluxDB points while device metadata and raw messages remain in the Django ORM.
Installation¶
Configuration¶
DJANGO_DEVICEHUB = {
"STORAGE": {
"ENGINE": "django_devicehub.storage.influxdb.InfluxDBStorage",
"INFLUXDB_URL": "http://localhost:8086",
"INFLUXDB_TOKEN": "my-token",
"INFLUXDB_ORG": "my-org",
"INFLUXDB_BUCKET": "iot_readings",
},
}
Create the bucket in InfluxDB:
How readings are stored¶
Readings are written as InfluxDB points:
- measurement: device type name in lowercase (e.g.,
weatherstation) - tags:
device_id - fields: reading values (e.g.,
temperature=25.3, humidity=80.1) - time: reading timestamp (nanosecond precision)
The Django ORM reading model is not written to by this backend. If you need both, subclass InfluxDBStorage and override store_reading() to dual-write.
Querying¶
query_readings() returns a list of dicts (not a Django QuerySet):
from django_devicehub.storage.influxdb import InfluxDBStorage
storage = InfluxDBStorage()
readings = storage.query_readings(
WeatherStationReading,
device=device,
start=start_dt,
end=end_dt,
fields=["temperature", "humidity"],
limit=100,
)
# [{"device_id": "ws-001", "timestamp": datetime, "temperature": 25.3, "humidity": 80.1}, ...]
Aggregation¶
# Scalar aggregate
result = storage.aggregate(WeatherStationReading, "temperature", "avg")
# {"result": 23.5}
# Time-bucketed aggregate (uses InfluxDB aggregateWindow)
buckets = storage.aggregate(
WeatherStationReading, "temperature", "avg",
interval="1 hour",
)
# [{"bucket": datetime, "result": 23.1}, ...]
The interval parameter accepts:
- Human-readable strings:
"5 minutes","1 hour","1 day" - Flux duration syntax:
"5m","1h","1d" - Python
timedeltaobjects
InfluxDB-specific methods¶
delete_readings(reading_model, device=None, start=None, end=None)¶
Delete readings from InfluxDB:
health_check()¶
Check InfluxDB connectivity:
close()¶
Close the InfluxDB client connection:
Writing a Custom Backend¶
Extend BaseStorage to implement a custom storage backend:
from django_devicehub.storage.base import BaseStorage
class MyStorage(BaseStorage):
def store_reading(self, reading_model, device, timestamp, data):
"""Store a sensor reading."""
...
def query_readings(self, reading_model, device=None, start=None, end=None,
fields=None, limit=None, order="-timestamp"):
"""Query stored readings with optional filters."""
...
def aggregate(self, reading_model, field, function="avg", device=None,
start=None, end=None, interval=None):
"""Aggregate readings (avg, min, max, sum, count).
If interval is provided, return time-bucketed results.
"""
...
The downsample() method has a default implementation that delegates to aggregate(), so you only need to override it if you have a more efficient implementation.
Register your backend in settings: