InfluxDB入门教程2:数据模型、写入与Flux查询

# InfluxDB入门教程2:数据模型、写入与Flux查询

在上一篇教程中,我们完成了 InfluxDB 的安装配置。本教程将深入讲解 InfluxDB 的数据模型、数据写入方式以及强大的 Flux 查询语言。

## 一、InfluxDB 数据模型

InfluxDB 使用独特的时序数据模型,理解这个模型是高效使用 InfluxDB 的关键。

### 1.1 核心概念

#### Measurement(测量)
相当于关系型数据库中的表,是数据的容器:

measurement: cpu_usage

#### Tag(标签)
索引字段,用于快速查询和过滤:
- 只能存储字符串
- 自动索引
- 适合作为维度(如主机名、区域、环境)

tags:
  - host: server01
  - region: us-west
  - env: production

#### Field(字段)
实际的数据值:
- 支持多种数据类型(float, int, string, boolean)
- 不被索引
- 适合存储测量值

fields:
  - value: 75.5 (float)
  - count: 100 (int)
  - status: "active" (string)
  - enabled: true (boolean)

#### Timestamp(时间戳)
每条数据必须有时间戳:
- 纳秒级精度
- 如果不指定,使用服务器当前时间

### 1.2 数据模型示例

# 完整的数据点结构
cpu_usage,host=server01,region=us-west,env=production value=75.5 1609459200000000000

# 解析:
# measurement: cpu_usage
# tags: host=server01, region=us-west, env=production
# field: value=75.5
# timestamp: 1609459200000000000

## 二、数据写入方式

### 2.1 行协议(Line Protocol)

InfluxDB 使用行协议格式写入数据:

# 基本语法
measurement,tag_key1=tag_value1,tag_key2=tag_value2 field_key1=field_value1,field_key2=field_value2 [timestamp]

# 多个数据点(用换行分隔)
cpu,host=server01 value=75.5 1609459200000000000
cpu,host=server02 value=82.3 1609459201000000000
memory,host=server01 used=8.5,total=16 1609459202000000000

### 2.2 使用 CLI 写入

# 进入 influx CLI
influx -c default

# 写入数据
write cpu_usage,host=server01 value=75.5 1609459200000000000

# 批量写入
write -b -p s <

### 2.3 使用 HTTP API 写入

# 单条写入
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
  --header 'Authorization: Token your_api_token' \
  --header 'Content-Type: text/plain; charset=utf-8' \
  --data 'cpu_usage,host=server01 value=75.5'

# 批量写入
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
  --header 'Authorization: Token your_api_token' \
  --header 'Content-Type: text/plain; charset=utf-8' \
  --data-raw 'cpu,host=server01 value=75.5
cpu,host=server02 value=82.3
memory,host=server01 used=8.5,total=16'

### 2.4 使用客户端库写入

#### Python 示例

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import time

# 初始化客户端
client = InfluxDBClient(
    url="http://localhost:8086",
    token="your_api_token",
    org="myorg"
)

# 创建写入 API
write_api = client.write_api(write_options=SYNCHRONOUS)

# 创建数据点
point = Point("cpu_usage") \
    .tag("host", "server01") \
    .tag("region", "us-west") \
    .field("value", 75.5) \
    .time(time.time_ns(), WritePrecision.NS)

# 写入单点
write_api.write(bucket="mybucket", record=point)

# 批量写入
points = [
    Point("cpu").tag("host", "server01").field("value", 75.5),
    Point("cpu").tag("host", "server02").field("value", 82.3),
    Point("memory").tag("host", "server01").field("used", 8.5)
]
write_api.write(bucket="mybucket", record=points)

# 关闭客户端
client.close()

## 三、Flux 查询语言

Flux 是 InfluxDB 2.x 的原生查询语言,功能强大且灵活。

### 3.1 基本查询语法

// 基本查询结构
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")

// 完整查询示例
from(bucket: "mybucket")
  |> range(start: -1h, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["host"] == "server01")

### 3.2 常用查询操作

#### 时间范围查询

// 过去 1 小时
from(bucket: "mybucket")
  |> range(start: -1h)

// 指定时间范围
from(bucket: "mybucket")
  |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-01T23:59:59Z)

// 相对时间
from(bucket: "mybucket")
  |> range(start: -24h, stop: -12h)

#### 数据过滤

// 过滤 measurement
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")

// 过滤字段
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_field"] == "value")

// 过滤标签
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["host"] == "server01")

// 复合条件
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["host"] == "server01" or r["host"] == "server02")
  |> filter(fn: (r) => r["region"] == "us-west")

#### 聚合计算

// 平均值
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")
  |> aggregateWindow(every: 5m, fn: mean)

// 最大值
from(bucket: "mybucket")
  |> range(start: -1h)
  |> aggregateWindow(every: 5m, fn: max)

// 最小值
from(bucket: "mybucket")
  |> range(start: -1h)
  |> aggregateWindow(every: 5m, fn: min)

// 求和
from(bucket: "mybucket")
  |> range(start: -1h)
  |> aggregateWindow(every: 1h, fn: sum)

// 计数
from(bucket: "mybucket")
  |> range(start: -1h)
  |> count()

#### 数据转换

// 单位转换
from(bucket: "mybucket")
  |> range(start: -1h)
  |> map(fn: (r) => ({
      r with
      _value: r._value / 1024,
      unit: "GB"
    }))

// 重命名字段
from(bucket: "mybucket")
  |> range(start: -1h)
  |> rename(columns: {_value: "cpu_value"})

// 保留/删除列
from(bucket: "mybucket")
  |> range(start: -1h)
  |> keep(columns: ["_time", "_value", "host"])

from(bucket: "mybucket")
  |> range(start: -1h)
  |> drop(columns: ["_start", "_stop", "_measurement"])

#### 排序和限制

// 按时间排序
from(bucket: "mybucket")
  |> range(start: -1h)
  |> sort(columns: ["_time"])

// 按值排序
from(bucket: "mybucket")
  |> range(start: -1h)
  |> sort(columns: ["_value"], desc: true)

// 限制返回行数
from(bucket: "mybucket")
  |> range(start: -1h)
  |> limit(n: 100)

### 3.3 高级查询

#### 窗口查询

// 按时间窗口聚合
from(bucket: "mybucket")
  |> range(start: -24h)
  |> aggregateWindow(
      every: 1h,
      fn: mean,
      createEmpty: false
    )

// 按标签分组
from(bucket: "mybucket")
  |> range(start: -1h)
  |> group(columns: ["host"])
  |> mean()

// 分组后窗口聚合
from(bucket: "mybucket")
  |> range(start: -24h)
  |> group(columns: ["host", "region"])
  |> aggregateWindow(every: 1h, fn: mean)

#### 关联查询

// 关联两个 measurement
cpu = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

memory = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "memory")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])

#### 多表查询

// union 合并多个查询
query1 = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["host"] == "server01")

query2 = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["host"] == "server02")

union(tables: [query1, query2])

### 3.4 实用查询示例

#### CPU 使用率趋势

from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: 5m, fn: mean)
  |> yield(name: "avg_cpu")

#### 主机资源使用对比

from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage" or r["_measurement"] == "memory_usage")
  |> aggregateWindow(every: 1h, fn: mean)
  |> pivot(rowKey:["_time"], columnKey: ["_measurement", "_field"], valueColumn: "_value")
  |> yield(name: "resource_comparison")

#### 异常检测

// 查找超过阈值的 CPU 使用率
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r._value > 90.0)
  |> yield(name: "high_cpu_alert")

// 计算标准差检测异常
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "cpu_usage")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
  |> sd(column: "_value")
  |> map(fn: (r) => ({
      r with
      threshold: r._value * 2.0
    }))

## 四、最佳实践

### 4.1 数据模型设计

// 推荐:使用 tag 作为维度
cpu_usage,host=server01,region=us-west,env=prod value=75.5

// 不推荐:将动态数据作为 tag
cpu_usage,host=server01,current_user=admin value=75.5  // current_user 应该是 field

// 推荐:合理命名
system_cpu,host=server01 value=75.5
system_memory,host=server01 used=8.5

// 不推荐:过于通用的名称
metric,host=server01 value=75.5

### 4.2 写入优化

# 批量写入(推荐 5000-10000 点/批次)
curl -XPOST 'http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket' \
  --header 'Authorization: Token your_api_token' \
  --data-raw '...thousands of lines...'

# 使用一致的精度
# 全部使用纳秒级时间戳
point.time(time.time_ns(), WritePrecision.NS)

# 合理设置 tag
# tag 数量控制在合理范围,避免高基数

### 4.3 查询优化

// 使用 filter 而不是 map 过滤数据
// 好
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r["host"] == "server01")

// 差(先查询再过滤)
from(bucket: "mybucket")
  |> range(start: -1h)
  |> map(fn: (r) => if r["host"] == "server01" then r else dropRecord(r: r))

// 合理设置时间范围
// 查询所需的最小时间范围,不要用 -1y 如果只需要 -1h

## 五、总结

本教程涵盖了:
- InfluxDB 数据模型的核心概念
- 多种数据写入方式
- Flux 查询语言的基础和高级用法
- 实用查询示例和最佳实践

掌握这些知识后,你可以开始构建实际的应用。在下一篇教程中,我们将学习 InfluxDB 的数据备份、恢复和运维管理。

发表回复

后才能评论