Python教程-使用Python中的InfluxDB
在搜索用于监视基础架构或第三方应用程序时,Telegraf的内置插件成为了一个很好的选择。或者,我们可能正在查看系统资源,如磁盘和网络利用率,或MySQL数据库的性能。
但是,如果我们正在创建一个应用程序,其中我们希望将用户的数据存储在时间序列数据库中,该怎么办?也许我们可以将其视为物联网(IoT)或基于智能家居的应用程序,每个用户都需要访问智能牙刷的读数。我们想要存储刷牙会话的时间和持续时间,可以发送提醒孩子刷牙的提醒,以及跟踪事项,如电池的健康状况,以及刷头已使用的时间。
无论是为面向用户的应用程序还是为插件Telegraf尚未覆盖的基础架构需求,都可能需要编写新的代码块来收集自定义数据。
让我们考虑一个以智能牙刷为例的情况,我们有一个运行嵌入式Linux并通过蓝牙与牙刷通信的基站。我们已经编写了用于监听传入数据的代码块,而且似乎工作得很好;现在,我们希望将其存储在InfluxDB中。
一种方法是将Telegraf与应用程序一起运行,并通过UDP、Unix或TCP套接字发送数据,让Telegraf处理与InfluxDB的连接以及批处理和写入点。
如果我们只需要数据收集,这种方法完全没问题;但是,如果我们想要查询并获取用户的数据,我们可能需要利用不同语言中可用的InfluxDB库之一,以便在应用程序本身内部处理与InfluxDB的交互。
已经存在多种语言的库具有InfluxDB库,其中许多由其社区维护。我们将了解influxdb-python库的使用。
所以,让我们开始吧。
了解InfluxDB Python客户端库
InfluxDB是由名为InfluxData的公司设计和开发的开源时间序列数据库或TSDB,它是用Go编程语言编写的,用于存储和检索字段中的时间序列数据,例如操作监控、物联网传感器数据、应用程序指标和实时分析。它还支持从Graphite进行数据处理。
Influxdb-python库作为Python客户端与InfluxDB进行交互。该库由InfluxDB的GitHub帐户托管,并由三名社区志愿者维护。
安装库
我们可以使用pip安装程序来安装Python中的库,这也是Python中安装库的最简单方法。安装influxdb库的语法如下:
语法:
$ pip install influxdb
安装完成后,我们可以通过简单创建一个新的Python程序文件并键入以下代码片段来验证它。
语法:
from influxdb import InfluxDBClient
现在,让我们保存文件并尝试执行它。如果没有引发错误,说明库已正确安装。但是,如果出现任何异常,请尝试重新安装或参考官方文档以获取帮助。
创建连接
接下来,我们将创建一个新的InfluxDBClient实例,使用有关服务器的信息,以便我们访问。我们可以使用以下代码片段,其中将值host
和port
替换为InfluxDB主机的适当URL/IP地址和端口:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(host = 'localhost', port = 8086)
在上面的代码片段中,我们从influxdb
库导入了InfluxDBClient
模块。然后,我们使用InfluxDBClient()
函数为名为my_Client
的变量定义了主机和端口,分别定义了主机和端口的值。
InfluxDBClient构造函数还提供其他参数,包括用户名和密码、需要连接的数据库、是否使用SSL、超时和UDP参数。如果我们需要连接到主机somedomain.com
上的端口8086
,使用用户名(例如anonymous
)和密码(例如somepass
),以及使用SSL,我们可以使用以下代码片段,而不是启用SSL和SSL验证,还可以使用另外两个参数ssl = True
和ssl_verify = True
:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining different entities
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
在上面的代码片段中,我们从influxdb
库导入了InfluxDBClient
模块。然后,我们使用该模块来定义了host
、port
、username
、password
、ssl
和verify_ssl
等值,并将这些值存储在名为my_Client
的变量中。
现在,让我们创建一个新的名为mydatabase
的数据库,以便将数据存储到其中,如下所示:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
在上面的代码片段中,我们使用my_Client
的create_database
来创建一个名为mydatabase
的新数据库。
我们可以使用my_Client
的get_list_database()
函数来检查数据库是否已创建,如下所示:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
# verifying if the database is created or not
my_Client.get_list_database()
输出:
[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'mydatabase'}]
在上面的代码片段中,我们使用get_list_database()
函数来验证数据库是否已创建。结果显示,数据库名为mydatabase
,还有我们安装的telegraf
和_internal
数据库。
最后,我们可以设置客户端使用该数据库,如下所示:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
# setting client to use specified database
my_Client.switch_database('mydatabase')
在上面的代码片段中,我们使用switch_database
来设置客户端使用指定的数据库,即mydatabase
。
插入数据
现在,我们有一个要写入数据的数据库,并且已正确配置客户端,现在是时候添加一些数据了。我们将使用客户端的write_points()
方法来执行此操作。该方法接受一系列点和一些其他参数,包括“批量大小”,它允许我们分批插入数据,而不是一次全部插入。我们可以使用这种方式插入大量数据。
write_points()
方法具有名为my_points
的参数,它是一个字典列表,包含要写入数据库的点。让我们现在创建一些示例数据并将其插入。首先,让我们将三个以JSON格式表示的点插入名为json_body
的变量中,如下所示:
示例:
json_body = [
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-04T8:01:00Z",
"fields": {
"duration": 147
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-4xx90d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-05T8:04:00Z",
"fields": {
"duration": 131
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-06T8:02:00Z",
"fields": {
"duration": 124
}
}
]
上面的代码示例表示了智能牙刷的“刷牙事件”,每个事件大约发生在早上八点左右,带有使用牙刷的用户名以及刷头的ID(这有助于我们跟踪使用相同刷头的时间),并包含了用户使用牙刷的持续时间(以秒为单位)的字段。
由于我们已经设置了数据库,且write_points()
的默认输入为JSON,我们可以使用json_body
变量作为唯一参数来调用该方法,如下所示:
示例:
my_Client.write_points(json_body)
输出:
True
在上面的代码片段中,我们使用json_body
变量作为参数调用了write_points()
。结果是,如果写操作成功,该函数会返回一个布尔值为True
。如果我们创建一个应用程序,我们需要使此数据收集自动化,每当用户尝试与牙刷交互时,就会将点插入数据库。
查询数据
一旦我们将数据存储在数据库中,就可以尝试使用一些查询来将其提取出来。我们将使用与写入数据时相同的客户端对象,只是这一次我们将在InfluxDB上执行查询并使用客户端的query()
函数获取结果。
示例:
my_Client.query('SELECT "duration" FROM "mydatabase"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')
在上面的代码片段中,我们使用query()
函数返回一个包含输出数据的ResultSet对象以及一些方便的方法。我们的查询请求在mydatabase
数据库中检索所有的测量数据,按用户进行分组。我们可以使用名为.raw
的参数来访问来自InfluxDB的原始JSON响应。
示例:
results.raw
输出:
{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Derek'}, 'columns': ['time', 'duration'], 'values': [['2021-08-04T08:01:00Z', 147], ['2021-08-05T08:04:00Z', 131], ['2018-08-06T08:02:00Z', 124]]}]}
在上面的代码片段中,我们使用raw
参数来访问来自InfluxDB的原始JSON响应。在大多数情况下,我们不需要直接访问JSON。相反,我们可以使用ResultSet的get_points()
方法来获取来自请求的测量数据,通过标签或字段进行过滤。如果我们想要遍历Derek的所有刷牙会话,我们可以获取标签"user"
的值为"Derek"
的所有点,使用以下命令:
示例:
my_points = results.get_points(tags = {'user':'Derek'})
在上面的示例中,my_points
变量是一个Python生成器,这是一个类似迭代器的函数,我们可以使用for x in y
循环来迭代它,如下所示:
示例:
my_points = results.get_points(tags = {'user': 'Derek'})
for my_point in my_points:
print("Time: %s, Duration: %i" % (my_point['time'], my_point['duration']))
输出:
Time: 2021-08-04T08:01:00Z, Duration: 147
Time: 2021-08-05T08:04:00Z, Duration: 131
Time: 2021-08-06T08:02:00Z, Duration: 124
在上面的代码片段中,我们使用for循环打印用户的每次刷牙时间的时间和持续时间。根据应用程序,我们可能会遍历这些点以计算用户的平均刷牙时间,或者只是验证每天有X次刷牙事件。