在搜索用于监视基础架构或第三方应用程序时,Telegraf的内置插件成为了一个很好的选择。或者,我们可能正在查看系统资源,如磁盘和网络利用率,或MySQL数据库的性能。

但是,如果我们正在创建一个应用程序,其中我们希望将用户的数据存储在时间序列数据库中,该怎么办?也许我们可以将其视为物联网(IoT)或基于智能家居的应用程序,每个用户都需要访问智能牙刷的读数。我们想要存储刷牙会话的时间和持续时间,可以发送提醒孩子刷牙的提醒,以及跟踪事项,如电池的健康状况,以及刷头已使用的时间。

无论是为面向用户的应用程序还是为插件Telegraf尚未覆盖的基础架构需求,都可能需要编写新的代码块来收集自定义数据。

让我们考虑一个以智能牙刷为例的情况,我们有一个运行嵌入式Linux并通过蓝牙与牙刷通信的基站。我们已经编写了用于监听传入数据的代码块,而且似乎工作得很好;现在,我们希望将其存储在InfluxDB中。

一种方法是将Telegraf与应用程序一起运行,并通过UDP、Unix或TCP套接字发送数据,让Telegraf处理与InfluxDB的连接以及批处理和写入点。

如果我们只需要数据收集,这种方法完全没问题;但是,如果我们想要查询并获取用户的数据,我们可能需要利用不同语言中可用的InfluxDB库之一,以便在应用程序本身内部处理与InfluxDB的交互。

已经存在多种语言的库具有InfluxDB库,其中许多由其社区维护。我们将了解influxdb-python库的使用。

所以,让我们开始吧。

了解InfluxDB Python客户端库

InfluxDB是由名为InfluxData的公司设计和开发的开源时间序列数据库或TSDB,它是用Go编程语言编写的,用于存储和检索字段中的时间序列数据,例如操作监控、物联网传感器数据、应用程序指标和实时分析。它还支持从Graphite进行数据处理。

Influxdb-python库作为Python客户端与InfluxDB进行交互。该库由InfluxDB的GitHub帐户托管,并由三名社区志愿者维护。

安装库

我们可以使用pip安装程序来安装Python中的库,这也是Python中安装库的最简单方法。安装influxdb库的语法如下:

语法:

$ pip install influxdb  

安装完成后,我们可以通过简单创建一个新的Python程序文件并键入以下代码片段来验证它。

语法:

from influxdb import InfluxDBClient  

现在,让我们保存文件并尝试执行它。如果没有引发错误,说明库已正确安装。但是,如果出现任何异常,请尝试重新安装或参考官方文档以获取帮助。

创建连接

接下来,我们将创建一个新的InfluxDBClient实例,使用有关服务器的信息,以便我们访问。我们可以使用以下代码片段,其中将值hostport替换为InfluxDB主机的适当URL/IP地址和端口:

示例:

# importing the required module  
from influxdb import InfluxDBClient  
# defining the host and port  
my_Client = InfluxDBClient(host = 'localhost', port = 8086)  

在上面的代码片段中,我们从influxdb库导入了InfluxDBClient模块。然后,我们使用InfluxDBClient()函数为名为my_Client的变量定义了主机和端口,分别定义了主机和端口的值。

InfluxDBClient构造函数还提供其他参数,包括用户名和密码、需要连接的数据库、是否使用SSL、超时和UDP参数。如果我们需要连接到主机somedomain.com上的端口8086,使用用户名(例如anonymous)和密码(例如somepass),以及使用SSL,我们可以使用以下代码片段,而不是启用SSL和SSL验证,还可以使用另外两个参数ssl = Truessl_verify = True

示例:

# importing the required module  
from influxdb import InfluxDBClient  
# defining different entities  
my_Client = InfluxDBClient(  
    host = 'somedomain.com',  
    port = 8086,  
    username = 'anonymous',  
    password = "somepass",  
    ssl = True,  
    verify_ssl = True  
    )  

在上面的代码片段中,我们从influxdb库导入了InfluxDBClient模块。然后,我们使用该模块来定义了hostportusernamepasswordsslverify_ssl等值,并将这些值存储在名为my_Client的变量中。

现在,让我们创建一个新的名为mydatabase的数据库,以便将数据存储到其中,如下所示:

示例:

# importing the required module  
from influxdb import InfluxDBClient  
# defining the host and port  
my_Client = InfluxDBClient(  
    host = 'somedomain.com',  
    port = 8086,  
    username = 'anonymous',  
    password = "somepass",  
    ssl = True,  
    verify_ssl = True  
    )  
# creating a database  
my_Client.create_database('mydatabase')  

在上面的代码片段中,我们使用my_Clientcreate_database来创建一个名为mydatabase的新数据库。

我们可以使用my_Clientget_list_database()函数来检查数据库是否已创建,如下所示:

示例:

# importing the required module  
from influxdb import InfluxDBClient  
# defining the host and port  
my_Client = InfluxDBClient(  
    host = 'somedomain.com',  
    port = 8086,  
    username = 'anonymous',  
    password = "somepass",  
    ssl = True,  
    verify_ssl = True  
    )  
# creating a database  
my_Client.create_database('mydatabase')  
# verifying if the database is created or not  
my_Client.get_list_database()  

输出:

[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'mydatabase'}]

在上面的代码片段中,我们使用get_list_database()函数来验证数据库是否已创建。结果显示,数据库名为mydatabase,还有我们安装的telegraf_internal数据库。

最后,我们可以设置客户端使用该数据库,如下所示:

示例:

# importing the required module  
from influxdb import InfluxDBClient  
# defining the host and port  
my_Client = InfluxDBClient(  
    host = 'somedomain.com',  
    port = 8086,  
    username = 'anonymous',  
    password = "somepass",  
    ssl = True,  
    verify_ssl = True  
    )  
# creating a database  
my_Client.create_database('mydatabase')  
# setting client to use specified database  
my_Client.switch_database('mydatabase') 

在上面的代码片段中,我们使用switch_database来设置客户端使用指定的数据库,即mydatabase

插入数据

现在,我们有一个要写入数据的数据库,并且已正确配置客户端,现在是时候添加一些数据了。我们将使用客户端的write_points()方法来执行此操作。该方法接受一系列点和一些其他参数,包括“批量大小”,它允许我们分批插入数据,而不是一次全部插入。我们可以使用这种方式插入大量数据。

write_points()方法具有名为my_points的参数,它是一个字典列表,包含要写入数据库的点。让我们现在创建一些示例数据并将其插入。首先,让我们将三个以JSON格式表示的点插入名为json_body的变量中,如下所示:

示例:

json_body = [  
    {  
        "measurement": "brushEvents",  
        "tags": {  
            "user": "Derek",  
            "brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"  
        },  
        "time": "2021-08-04T8:01:00Z",  
        "fields": {  
            "duration": 147  
        }  
    },  
    {  
        "measurement": "brushEvents",  
        "tags": {  
            "user": "Derek",  
            "brushId": "6a89f539-71c6-4xx90d-a28d-6c5d84c0ee2f"  
        },  
        "time": "2021-08-05T8:04:00Z",  
        "fields": {  
            "duration": 131  
        }  
    },  
    {  
        "measurement": "brushEvents",  
        "tags": {  
            "user": "Derek",  
            "brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"  
        },  
        "time": "2021-08-06T8:02:00Z",  
        "fields": {  
            "duration": 124  
        }  
    }  
]  

上面的代码示例表示了智能牙刷的“刷牙事件”,每个事件大约发生在早上八点左右,带有使用牙刷的用户名以及刷头的ID(这有助于我们跟踪使用相同刷头的时间),并包含了用户使用牙刷的持续时间(以秒为单位)的字段。

由于我们已经设置了数据库,且write_points()的默认输入为JSON,我们可以使用json_body变量作为唯一参数来调用该方法,如下所示:

示例:

my_Client.write_points(json_body)  

输出:

True

在上面的代码片段中,我们使用json_body变量作为参数调用了write_points()。结果是,如果写操作成功,该函数会返回一个布尔值为True。如果我们创建一个应用程序,我们需要使此数据收集自动化,每当用户尝试与牙刷交互时,就会将点插入数据库。

查询数据

一旦我们将数据存储在数据库中,就可以尝试使用一些查询来将其提取出来。我们将使用与写入数据时相同的客户端对象,只是这一次我们将在InfluxDB上执行查询并使用客户端的query()函数获取结果。

示例:

my_Client.query('SELECT "duration" FROM "mydatabase"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')  

在上面的代码片段中,我们使用query()函数返回一个包含输出数据的ResultSet对象以及一些方便的方法。我们的查询请求在mydatabase数据库中检索所有的测量数据,按用户进行分组。我们可以使用名为.raw的参数来访问来自InfluxDB的原始JSON响应。

示例:

results.raw  

输出:

{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Derek'}, 'columns': ['time', 'duration'], 'values': [['2021-08-04T08:01:00Z', 147], ['2021-08-05T08:04:00Z', 131], ['2018-08-06T08:02:00Z', 124]]}]}

在上面的代码片段中,我们使用raw参数来访问来自InfluxDB的原始JSON响应。在大多数情况下,我们不需要直接访问JSON。相反,我们可以使用ResultSetget_points()方法来获取来自请求的测量数据,通过标签或字段进行过滤。如果我们想要遍历Derek的所有刷牙会话,我们可以获取标签"user"的值为"Derek"的所有点,使用以下命令:

示例:

my_points = results.get_points(tags = {'user':'Derek'})  

在上面的示例中,my_points变量是一个Python生成器,这是一个类似迭代器的函数,我们可以使用for x in y循环来迭代它,如下所示:

示例:

my_points = results.get_points(tags = {'user': 'Derek'})  
for my_point in my_points:  
    print("Time: %s, Duration: %i" % (my_point['time'], my_point['duration']))  

输出:

Time: 2021-08-04T08:01:00Z, Duration: 147
Time: 2021-08-05T08:04:00Z, Duration: 131
Time: 2021-08-06T08:02:00Z, Duration: 124

在上面的代码片段中,我们使用for循环打印用户的每次刷牙时间的时间和持续时间。根据应用程序,我们可能会遍历这些点以计算用户的平均刷牙时间,或者只是验证每天有X次刷牙事件。

标签: Tkinter教程, Tkinter安装, Tkinter库, Tkinter入门, Tkinter学习, Tkinter入门教程, Tkinter, Tkinter进阶, Tkinter指南, Tkinter学习指南, Tkinter进阶教程, Tkinter编程