InfluxDB入门教程2:数据模型、写入与Flux查询
# InfluxDB入门教程2:数据模型、写入与Flux查询
在上一篇教程中,我们完成了 InfluxDB 的安装配置。本教程将深入讲解 InfluxDB 的数据模型、数据写入方式以及强大的 Flux 查询语言。
## 一、InfluxDB 数据模型
InfluxDB 使用独特的时序数据模型,理解这个模型是高效使用 InfluxDB 的关键。
### 1.1 核心概念
#### Measurement(测量)
相当于关系型数据库中的表,是数据的容器:
measurement: cpu_usage
#### Tag(标签)
索引字段,用于快速查询和过滤:
- 只能存储字符串
- 自动索引
- 适合作为维度(如主机名、区域、环境)
tags:
- host: server01
- region: us-west
- env: production
#### Field(字段)
实际的数据值:
- 支持多种数据类型(float, int, string, boolean)
- 不被索引
- 适合存储测量值
fields:
- value: 75.5 (float)
- count: 100 (int)
- status: "active" (string)
- enabled: true (boolean)
#### Timestamp(时间戳)
每条数据必须有时间戳:
- 纳秒级精度
- 如果不指定,使用服务器当前时间
### 1.2 数据模型示例
# 完整的数据点结构
cpu_usage,host=server01,region=us-west,env=production value=75.5 1609459200000000000
# 解析:
# measurement: cpu_usage
# tags: host=server01, region=us-west, env=production
# field: value=75.5
# timestamp: 1609459200000000000
## 二、数据写入方式
### 2.1 行协议(Line Protocol)
InfluxDB 使用行协议格式写入数据:
# 基本语法
measurement,tag_key1=tag_value1,tag_key2=tag_value2 field_key1=field_value1,field_key2=field_value2 [timestamp]
# 多个数据点(用换行分隔)
cpu,host=server01 value=75.5 1609459200000000000
cpu,host=server02 value=82.3 1609459201000000000
memory,host=server01 used=8.5,total=16 1609459202000000000
### 2.2 使用 CLI 写入
# 进入 influx CLI
influx -c default
# 写入数据
write cpu_usage,host=server01 value=75.5 1609459200000000000
# 批量写入
write -b -p s <
### 2.3 使用 HTTP API 写入
# 单条写入
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
--header 'Authorization: Token your_api_token' \
--header 'Content-Type: text/plain; charset=utf-8' \
--data 'cpu_usage,host=server01 value=75.5'
# 批量写入
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
--header 'Authorization: Token your_api_token' \
--header 'Content-Type: text/plain; charset=utf-8' \
--data-raw 'cpu,host=server01 value=75.5
cpu,host=server02 value=82.3
memory,host=server01 used=8.5,total=16'
### 2.4 使用客户端库写入
#### Python 示例
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import time
# 初始化客户端
client = InfluxDBClient(
url="http://localhost:8086",
token="your_api_token",
org="myorg"
)
# 创建写入 API
write_api = client.write_api(write_options=SYNCHRONOUS)
# 创建数据点
point = Point("cpu_usage") \
.tag("host", "server01") \
.tag("region", "us-west") \
.field("value", 75.5) \
.time(time.time_ns(), WritePrecision.NS)
# 写入单点
write_api.write(bucket="mybucket", record=point)
# 批量写入
points = [
Point("cpu").tag("host", "server01").field("value", 75.5),
Point("cpu").tag("host", "server02").field("value", 82.3),
Point("memory").tag("host", "server01").field("used", 8.5)
]
write_api.write(bucket="mybucket", record=points)
# 关闭客户端
client.close()
## 三、Flux 查询语言
Flux 是 InfluxDB 2.x 的原生查询语言,功能强大且灵活。
### 3.1 基本查询语法
// 基本查询结构
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
// 完整查询示例
from(bucket: "mybucket")
|> range(start: -1h, stop: now())
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
|> filter(fn: (r) => r["_field"] == "value")
|> filter(fn: (r) => r["host"] == "server01")
### 3.2 常用查询操作
#### 时间范围查询
// 过去 1 小时
from(bucket: "mybucket")
|> range(start: -1h)
// 指定时间范围
from(bucket: "mybucket")
|> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-01T23:59:59Z)
// 相对时间
from(bucket: "mybucket")
|> range(start: -24h, stop: -12h)
#### 数据过滤
// 过滤 measurement
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
// 过滤字段
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_field"] == "value")
// 过滤标签
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["host"] == "server01")
// 复合条件
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["host"] == "server01" or r["host"] == "server02")
|> filter(fn: (r) => r["region"] == "us-west")
#### 聚合计算
// 平均值
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
|> aggregateWindow(every: 5m, fn: mean)
// 最大值
from(bucket: "mybucket")
|> range(start: -1h)
|> aggregateWindow(every: 5m, fn: max)
// 最小值
from(bucket: "mybucket")
|> range(start: -1h)
|> aggregateWindow(every: 5m, fn: min)
// 求和
from(bucket: "mybucket")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: sum)
// 计数
from(bucket: "mybucket")
|> range(start: -1h)
|> count()
#### 数据转换
// 单位转换
from(bucket: "mybucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
_value: r._value / 1024,
unit: "GB"
}))
// 重命名字段
from(bucket: "mybucket")
|> range(start: -1h)
|> rename(columns: {_value: "cpu_value"})
// 保留/删除列
from(bucket: "mybucket")
|> range(start: -1h)
|> keep(columns: ["_time", "_value", "host"])
from(bucket: "mybucket")
|> range(start: -1h)
|> drop(columns: ["_start", "_stop", "_measurement"])
#### 排序和限制
// 按时间排序
from(bucket: "mybucket")
|> range(start: -1h)
|> sort(columns: ["_time"])
// 按值排序
from(bucket: "mybucket")
|> range(start: -1h)
|> sort(columns: ["_value"], desc: true)
// 限制返回行数
from(bucket: "mybucket")
|> range(start: -1h)
|> limit(n: 100)
### 3.3 高级查询
#### 窗口查询
// 按时间窗口聚合
from(bucket: "mybucket")
|> range(start: -24h)
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
// 按标签分组
from(bucket: "mybucket")
|> range(start: -1h)
|> group(columns: ["host"])
|> mean()
// 分组后窗口聚合
from(bucket: "mybucket")
|> range(start: -24h)
|> group(columns: ["host", "region"])
|> aggregateWindow(every: 1h, fn: mean)
#### 关联查询
// 关联两个 measurement
cpu = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
memory = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "memory")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
#### 多表查询
// union 合并多个查询
query1 = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["host"] == "server01")
query2 = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["host"] == "server02")
union(tables: [query1, query2])
### 3.4 实用查询示例
#### CPU 使用率趋势
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 5m, fn: mean)
|> yield(name: "avg_cpu")
#### 主机资源使用对比
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage" or r["_measurement"] == "memory_usage")
|> aggregateWindow(every: 1h, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_measurement", "_field"], valueColumn: "_value")
|> yield(name: "resource_comparison")
#### 异常检测
// 查找超过阈值的 CPU 使用率
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
|> filter(fn: (r) => r["_field"] == "value")
|> filter(fn: (r) => r._value > 90.0)
|> yield(name: "high_cpu_alert")
// 计算标准差检测异常
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu_usage")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> sd(column: "_value")
|> map(fn: (r) => ({
r with
threshold: r._value * 2.0
}))
## 四、最佳实践
### 4.1 数据模型设计
// 推荐:使用 tag 作为维度
cpu_usage,host=server01,region=us-west,env=prod value=75.5
// 不推荐:将动态数据作为 tag
cpu_usage,host=server01,current_user=admin value=75.5 // current_user 应该是 field
// 推荐:合理命名
system_cpu,host=server01 value=75.5
system_memory,host=server01 used=8.5
// 不推荐:过于通用的名称
metric,host=server01 value=75.5
### 4.2 写入优化
# 批量写入(推荐 5000-10000 点/批次)
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
--header 'Authorization: Token your_api_token' \
--data-raw '...thousands of lines...'
# 使用一致的精度
# 全部使用纳秒级时间戳
point.time(time.time_ns(), WritePrecision.NS)
# 合理设置 tag
# tag 数量控制在合理范围,避免高基数
### 4.3 查询优化
// 使用 filter 而不是 map 过滤数据
// 好
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r["host"] == "server01")
// 差(先查询再过滤)
from(bucket: "mybucket")
|> range(start: -1h)
|> map(fn: (r) => if r["host"] == "server01" then r else dropRecord(r: r))
// 合理设置时间范围
// 查询所需的最小时间范围,不要用 -1y 如果只需要 -1h
## 五、总结
本教程涵盖了:
- InfluxDB 数据模型的核心概念
- 多种数据写入方式
- Flux 查询语言的基础和高级用法
- 实用查询示例和最佳实践
掌握这些知识后,你可以开始构建实际的应用。在下一篇教程中,我们将学习 InfluxDB 的数据备份、恢复和运维管理。






