数据流采集服务

数据流采集服务

概述

本文描述了采集服务器如何处理设备上报的消息, 并解析和存储这些数据.

版本

V2.0

作者

@author ChengZhen

数据流消息

云平台将所有设备和云, 设备和设备之间的通信数据统称为消息.

数据流消息格式

设备通过发送数据流消息主动上报数据给云端

例如:

{
    "did": "1234567890",    // 发送者的 ID
    "type": "stream",       // 消息类型
    "at": 1234567890        // 数据采集时间, 默认单位为毫秒
    "data": {
        "on": false,         // 数据点 1, 布尔类型
        "temperature": 25.5, // 数据点 2, 浮点型
        "battery": 95,       // 数据点 3, 整数类型
        "status": "active"   // 数据点 4, 字符串类型
    }
}

数据流格式

指按时间采集和存储的数据, 主要组成如下:

数据流采集流程

设备->设备: 1.测量数据
MQ->MQ: 开始
采集服务->MQ: 订阅 'messages/+'
设备->MQ: 2.发布到 'messages/:id'
MQ->采集服务: 3.推送消息
采集服务->采集服务: 处理数据
采集服务->时序数据库: 4. 写入数据点

MQ 是一个支持 MQTT/CoAP 等协议的消息队列, 支持发布/订阅消息通信方式

数据流解析

本节主要描述采集服务器如何解析和处理收到的消息

解析数据点

数据点有以下表示方式 (以 temperature 为例):

使用标准格式表示的多个数据点:

{
    "did": String,
    "type": "stream",
    "data": [{
        "at": Number, 
        "<name>": Number
    }]
}

使用精简方式表示的多个数据点:

{
    "did": String,
    "type": "stream",
    "data": {
        "@columns": ["at", "<name>"],
        "points": [[Number, Number]]
    }
}

使用精简方式表示的单个数据点:

{
    "did": String,
    "type": "stream",
    "data": {
        "at": Number,
        "<name>": Number
    }
}

其中 points 属性保存了数据点的值

一般用 at 字段表示数据点的时间戳, 如果不存在则取服务器的当前时间

其他属性存放标签和字段的值, 具体在解析规则中定义.

数据流标签

{
    "did": String, // 网关设备 ID
    "type": "stream",
    "data": {
        "at": Number,
        "tid": String,
        "rssi1m": Number,
        "rssi": Number // RSSI 值
    }
}

根据查询数据流定义, 上述的数据中的 did, tidrssi1m 都是数据流标签

特殊标签

数据流存储

本节主要描述如何存储上报的数据流.

本系统使用时序数据库来存储数据流, 相比其他数据库时序数据库专门针对数据流存储优化, 实现了非常高的读写性能.

数据库设计

数据库

默认使用一个名为 stream 的数据库用来存储所有数据流数据.

数据表名称

数据表不需要事先创建, 在保存数据点时会自动创建相应名称的数据表

一般用产品的编码做为表名, 如 gateway, camera, 用来存储这个产品所有设备的数据

如果设备属性为 object 类型,则创建一个独立的表来存储这个属性的数据流,表名为: 产品 key + 属性名, 如 gateway.device.

由上可知,同一个产品下所有的设备上报的数据流会存到同一个数据表中

数据表结构

对于有多个字段的数据流可以按下面的结构定义数据表:

如表名为 gps, 则表结构为:

NAME FIELD TYPE DESCRIPTION
time 时间戳 Timestamp 时间戳, Influxdb 内置字段
did 标签 String 设备 ID
lat 字段 Number 纬度
lng 字段 Number 经度
battery 字段 Number 电池电量

数据点保存周期

因为数据流的存储空间毕竟是有限的, 而数据流是持续在产生, 所以限制数据保存周期是必要的, 这样新生产的数据会覆盖最早的数据.

数据降采样

数据库可以定期汇总一个数据流的数据并存到另一个数据表中, 这样可以大大减少保存的数据量, 可以延长数据的保存时间.

比如温度数据流, 当时间超过 1 周以后, 只需要保存每天的最大, 最小和平均温度. 而原始的数据流只需要保存一周.

https://jasper-zhang1.gitbooks.io/influxdb/content/Guide/downsampling_and_retention.html

数据流分发

/things/<company>/<key>/<did>/properties