2024 桐庐半程马拉松
00:00:00
时间
0.00
距离(公里)
--:--
配速
--
步频
--
心率 (bpm)
--
配速
步频
|
share-image
ESC

Python +fastapi +InfluxDB 构建心率数据查看系统

之前买了一根心率带,厂商配的 APP 很难用,于是决定自己动手,研究如何通过 Python 获取实时心率数据。

先看效果

实时展示最新是心率数据以及历史数据

本文将介绍如何使用 Python 的构建一个这样的数据查看页面,我们将使用 bleak + fastapi + InfluxDB 连接蓝牙心率带(BLE),并解析标准心率服务的数据。

准备工作

我们需要使用 bleak 库,它是一个跨平台的蓝牙低功耗(BLE)客户端封装。

pip install bleak

核心原理

大多数心率带都遵循 蓝牙心率服务标准 (Heart Rate Service)

  • 服务 UUID: 0000180d-0000-1000-8000-00805f9b34fb
  • 特征值 UUID: 00002a37-0000-1000-8000-00805f9b34fb (用于订阅通知)

数据的核心在于解析通知回调中的字节数据:

  1. Flags (第1字节): 决定了数据格式。
  2. Heart Rate Value: 根据 Flags 的第 0 位,心率值可能是 UINT8 (1字节) 或 UINT16 (2字节)。

InfluxDB 安装与配置

为了持久化存储心率数据,我们需要安装时序数据库 InfluxDB。

安装 InfluxDB

如果是 Linux 环境(如 Ubuntu/Debian),可以使用以下命令安装:

# Ubuntu/Debian
wget https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.1-1_amd64.deb
sudo dpkg -i influxdb2_2.7.1-1_amd64.deb
sudo systemctl start influxdb

安装完成后,InfluxDB 默认运行在 8086 端口。你可以通过浏览器访问 http://localhost:8086 进行初始化设置,创建一个初始用户、组织(Organization)和存储桶(Bucket)。

或者使用 Docker 快速启动:

docker run -d -p 8086:8086 \
-v $PWD/data:/var/lib/influxdb2 \
-v $PWD/config:/etc/influxdb2 \
--name influxdb \
influxdb:2

安装 Python 客户端

为了在 Python 中操作 InfluxDB,我们需要安装官方客户端库:

pip install influxdb-client

完整代码实现:蓝牙读取 + 数据存储

下面的代码展示了如何连接蓝牙心率带,解析数据,并将其写入 InfluxDB。

import asyncio
import os
import sys
from datetime import datetime, timezone
from bleak import BleakScanner, BleakClient
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB 配置
# 建议将 Token 等敏感信息放在环境变量中,或者直接替换为你的配置
INFLUX_TOKEN = os.environ.get("INFLUXDB_TOKEN", "你的InfluxDB Token")
INFLUX_ORG = os.environ.get("INFLUXDB_ORG", "awen")
INFLUX_URL = os.environ.get("INFLUXDB_URL", "http://192.168.1.28:8086")
INFLUX_BUCKET = os.environ.get("INFLUXDB_BUCKET", "heart_rate")

# 初始化 InfluxDB 客户端
# url: InfluxDB 的地址
# token: 用于认证的 Token
# org: 组织名称
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)

# 初始化写 API,使用同步模式 (SYNCHRONOUS) 确保数据写入后再继续
write_api = influx_client.write_api(write_options=SYNCHRONOUS)

# 标准蓝牙心率服务 UUID (Heart Rate Service)
HEART_RATE_SERVICE_UUID = "0000180d-0000-1000-8000-00805f9b34fb"
# 心率测量特征值 UUID (Heart Rate Measurement)
HEART_RATE_MEASUREMENT_UUID = "00002a37-0000-1000-8000-00805f9b34fb"

def notification_handler(sender, data):
"""处理收到的蓝牙通知数据"""
try:
# 蓝牙心率测量数据格式解析
# 第 1 个字节是 Flags,第 0 位决定了心率值格式
flag_byte = data[0]
is_uint16 = flag_byte & 0x01

if is_uint16:
# 如果是 16 位,读取第 2,3 个字节 (小端序)
heart_rate = int.from_bytes(data[1:3], byteorder='little')
else:
# 如果是 8 位,读取第 2 个字节
heart_rate = data[1]

print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 收到心率: {heart_rate} bpm")

# 构建 InfluxDB 数据点
# Measurement: heart_rate (类似表名)
# Tag: device=ble_hr_monitor (索引列,用于快速查询)
# Field: bpm=heart_rate (实际数值)
# Time: 当前 UTC 时间
point = Point("heart_rate") \
.tag("device", "ble_hr_monitor") \
.field("bpm", heart_rate) \
.time(datetime.now(timezone.utc))

# 写入数据
write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=point)
except Exception as e:
print(f"数据处理或写入失败: {e}", file=sys.stderr)

async def scan_for_device():
print("正在扫描蓝牙设备...")
# 扫描 5 秒
devices = await BleakScanner.discover(return_adv=True, timeout=5.0)

# 优先通过 Service UUID 匹配
for device, adv_data in devices.values():
if "0000180d-0000-1000-8000-00805f9b34fb" in adv_data.service_uuids:
print(f"*** 找到心率设备 (UUID): {device.name} ({device.address}) ***")
return device

# 如果没有通过 UUID 自动找到,尝试通过名称模糊匹配
for device, adv_data in devices.values():
if device.name and ("Heart" in device.name or "HRM" in device.name or "Polar" in device.name or "Garmin" in device.name):
print(f"*** 找到心率设备 (名称): {device.name} ({device.address}) ***")
return device

return None

async def run_ble_client():
"""主循环,负责扫描、连接和保持连接"""
while True:
target_device = await scan_for_device()

if not target_device:
print("未找到心率设备,5秒后重试...")
await asyncio.sleep(5)
continue

print(f"正在连接到 {target_device.name} ({target_device.address})...")

# 定义断开连接事件
disconnected_event = asyncio.Event()

def on_disconnect(client):
print(f"设备已断开连接: {client.address}")
disconnected_event.set()

try:
async with BleakClient(target_device.address, disconnected_callback=on_disconnect) as client:
print(f"已连接: {client.is_connected}")

# 订阅通知,收到数据时触发 notification_handler
await client.start_notify(HEART_RATE_MEASUREMENT_UUID, notification_handler)
print("正在监听心率数据... (保持连接中)")

# 阻塞在这里,直到连接断开
await disconnected_event.wait()

except Exception as e:
print(f"连接发生错误: {e}")

print("连接丢失或断开,5秒后重新开始扫描...")
await asyncio.sleep(5)

if __name__ == "__main__":
try:
asyncio.run(run_ble_client())
except KeyboardInterrupt:
print("用户手动停止程序")
finally:
print("关闭数据库连接...")
write_api.close()
influx_client.close()

运行脚本后,数据就会源源不断地写入 InfluxDB。

数据可视化接口 (FastAPI)

有了数据,我们可以写一个简单的 API 接口来给前端提供数据。这里使用 FastAPI

pip install fastapi uvicorn

创建一个 main.py 文件:

import os
from typing import List
from datetime import datetime, timedelta, timezone
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from influxdb_client import InfluxDBClient
from pydantic import BaseModel

app = FastAPI()
# 挂载静态文件目录,用于存放 js/css 等
app.mount("/static", StaticFiles(directory="."), name="static")

# InfluxDB 配置 (需与写入端保持一致)
INFLUX_TOKEN = os.environ.get("INFLUXDB_TOKEN", "你的InfluxDB Token")
INFLUX_ORG = os.environ.get("INFLUXDB_ORG", "awen")
INFLUX_URL = os.environ.get("INFLUXDB_URL", "http://192.168.1.28:8086")
INFLUX_BUCKET = os.environ.get("INFLUXDB_BUCKET", "heart_rate")

# 初始化查询客户端
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
query_api = influx_client.query_api()

class HeartRatePoint(BaseModel):
time: str
bpm: int

@app.get("/api/heart-rate/latest", response_model=HeartRatePoint)
def get_latest_heart_rate():
"""从 InfluxDB 获取最新的心率数据"""
# 使用 Flux 语言查询
# range(start: -1h): 查找过去1小时的数据
# last(): 取最后一条
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "heart_rate")
|> filter(fn: (r) => r["_field"] == "bpm")
|> last()
'''
result = query_api.query(org=INFLUX_ORG, query=query)

if not result or len(result) == 0:
raise HTTPException(status_code=404, detail="No data found")

for table in result:
for record in table.records:
return HeartRatePoint(
time=record.get_time().isoformat(),
bpm=int(record.get_value())
)

raise HTTPException(status_code=404, detail="No data found")

@app.get("/api/heart-rate/history", response_model=List[HeartRatePoint])
def get_heart_rate_history(minutes: int = 60):
"""获取最近一段时间的心率历史趋势"""
# aggregateWindow: 降采样,每10秒取一个平均值,防止前端点数过多
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -{minutes}m)
|> filter(fn: (r) => r["_measurement"] == "heart_rate")
|> filter(fn: (r) => r["_field"] == "bpm")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
result = query_api.query(org=INFLUX_ORG, query=query)

points = []
if result:
for table in result:
for record in table.records:
points.append(HeartRatePoint(
time=record.get_time().isoformat(),
bpm=int(record.get_value())
))
return points

@app.get("/")
def read_root():
"""返回简单的 HTML 页面"""
with open("index.html", "r", encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

简单的 Web 监控页面

创建一个 index.html 文件,使用 Chart.js 绘制实时折线图。

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>蓝牙心率实时监控</title>
<script src="/static/chart.js"></script>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background-color: #f0f2f5;
margin: 0;
padding: 20px;
display: flex;
flex-direction: column;
align-items: center;
}
.container {
background-color: white;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
padding: 20px;
width: 90%;
max-width: 800px;
margin-bottom: 20px;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.current-hr {
text-align: center;
margin-bottom: 20px;
}
.bpm-value {
font-size: 72px;
font-weight: bold;
color: #e74c3c;
}
.bpm-label {
font-size: 24px;
color: #7f8c8d;
}
.chart-container {
position: relative;
height: 400px;
width: 100%;
}
.status {
text-align: center;
color: #95a5a6;
font-size: 14px;
margin-top: 10px;
}
</style>
</head>
<body>

<div class="container">
<div class="header">
<h1>实时心率监控</h1>
</div>

<div class="current-hr">
<div class="bpm-value" id="currentBpm">--</div>
<div class="bpm-label">BPM</div>
</div>
</div>

<div class="container">
<h2>历史趋势 (近1小时)</h2>
<div class="chart-container">
<canvas id="hrChart"></canvas>
</div>
<div class="status" id="lastUpdated">等待数据...</div>
</div>

<script>
const ctx = document.getElementById('hrChart').getContext('2d');
let hrChart;

// 初始化图表
function initChart() {
hrChart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: '心率 (BPM)',
data: [],
borderColor: '#e74c3c',
backgroundColor: 'rgba(231, 76, 60, 0.2)',
borderWidth: 2,
tension: 0.4,
fill: true,
pointRadius: 2
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
scales: {
x: {
type: 'category',
ticks: {
maxTicksLimit: 10
}
},
y: {
beginAtZero: false,
suggestedMin: 40,
suggestedMax: 200
}
},
animation: false
}
});
}

// 获取最新心率
async function fetchLatestHR() {
try {
const response = await fetch('/api/heart-rate/latest');
if (response.ok) {
const data = await response.json();
document.getElementById('currentBpm').textContent = data.bpm;
document.getElementById('lastUpdated').textContent = `最后更新: ${new Date(data.time).toLocaleString()}`;
}
} catch (error) {
console.error('获取最新心率失败:', error);
}
}

// 获取历史数据并更新图表
async function fetchHistoryHR() {
try {
const response = await fetch('/api/heart-rate/history?minutes=60');
if (response.ok) {
const data = await response.json();

const labels = data.map(point => {
const date = new Date(point.time);
return date.toLocaleTimeString([], {hour: '2-digit', minute:'2-digit'});
});
const values = data.map(point => point.bpm);

hrChart.data.labels = labels;
hrChart.data.datasets[0].data = values;
hrChart.update();
}
} catch (error) {
console.error('获取历史数据失败:', error);
}
}

// 页面加载完成后执行
document.addEventListener('DOMContentLoaded', () => {
initChart();
fetchLatestHR();
fetchHistoryHR();

// 定时刷新
setInterval(fetchLatestHR, 2000); // 每2秒更新最新值
setInterval(fetchHistoryHR, 10000); // 每10秒更新图表
});
</script>
</body>
</html>

##HRV 分析

睡眠

文章作者:阿文
文章链接: https://www.awen.me/post/2d388e82.html
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 阿文的博客