Python教程-Python中的Kafka教程
在以下教程中,我们将讨论Apache Kafka以及它在Python编程语言中的应用。
了解Apache Kafka
Apache Kafka是一个开源的流平台,最初由LinkedIn设计。后来,它移交给了Apache基金会并于2011年开源。
根据维基百科的定义:
Apache Kafka是由Apache软件基金会开发的开源平台,用于处理流数据。它是用Java和Scala编写的。该项目的目标是提供一个高吞吐量、统一的、低延迟的平台,以处理实时数据流。Apache Kafka的存储层基本上是一个“大规模可扩展的发布/订阅消息队列,设计为分布式事务日志”,这使得它在处理流式数据方面非常有价值,特别是对于企业基础架构。此外,Kafka通过Kafka Connect连接到外部系统(用于导入和导出数据),并提供Kafka Streams,这是一个用于Java流处理的库。
我们可以将它视为一个巨大的提交日志,可以按照发生的顺序存储数据。此日志的用户可以根据其需求访问和使用它。
Apache Kafka的一些用例
我们可以在不同的地方使用Apache Kafka。让我们考虑一些Kafka的用例,这些用例可以帮助我们了解其用途:
- 活动监控: 我们可以使用Kafka来监控活动。活动可能属于物理传感器和设备,也可能属于网站。生产者可以从数据源发布原始数据,后续可以用于查找趋势和模式。
- 消息传递: 我们还可以将Kafka用作服务之间的消息代理。如果我们正在实施微服务架构,可以将一个微服务作为生产者,另一个作为消费者。例如,我们可以有一个负责创建新帐户并向与帐户创建相关的用户发送电子邮件的微服务。
- 日志汇总: 我们还可以利用Kafka从不同系统收集日志,并将它们存储在一个集中的系统中进行进一步处理。
- ETL: Kafka提供几乎实时的流处理功能,因此我们可以根据需要开发ETL。
- 数据库: 根据前面提到的内容,我们可以说Kafka也充当数据库。它不是具有数据查询功能的典型数据库,但Kafka可以存储数据,只要我们需要,而不会消耗它。
理解Kafka的核心概念
让我们讨论Kafka的核心概念。
- 主题(Topics): 系统中输入的每个消息都必须属于某个主题。主题是记录流。消息以键值对的格式存储。每条消息都被分配一个称为偏移(Offset)的序列,其中一条消息的结果可能成为其他消息的输入,以进行进一步处理。
- 生产者(Producers): 生产者是负责将数据发布到Kafka系统的应用程序。它们将数据发布到所选择的主题上。
- 消费者(Consumer): 有消费者应用程序使用发布到主题的消息。消费者订阅其首选主题并消费数据。
- 代理(Broker): 代理是Kafka的一个实例,负责消息交换。我们可以将Kafka作为集群的一部分或独立机器使用。
现在,让我们考虑一个简单的示例,假设有一家餐馆的库房,库房中存放了所有的原材料,如蔬菜、大米、面粉等。
这家餐厅供应各种不同种类的菜肴,如印度菜、意大利菜、中餐等。每种烹饪风格的厨师都可以参考库房,选择所需的原材料制作菜肴。还可能存在一种秘密成分,它用于每种类型的菜肴。在这种情况下,库房充当代理,商品的供应商充当生产者,商品和厨师制作的秘密成分充当主题,而厨师充当消费者。
如何在Python中访问Kafka?
Python编程语言中有各种库可用于使用Kafka。以下是其中一些库的描述:
序号 | 库 | 描述 |
---|---|---|
1 | Kafka-Python | 这是Python社区设计的开源库。 |
2 | PyKafka | 由Parsly维护的库,它声称具有Pythonic API。但是,与Kafka-Python不同,此库无法创建动态主题。 |
3 | Confluent Python Kafka | Confluent提供的库,作为librdkafka的薄包装提供。因此,它的性能优于前两者。 |
安装依赖项
我们将在此项目中使用Kafka-Python。因此,我们可以使用pip安装它,如下所示:
语法:
$ pip install kafka-python
现在,让我们开始构建项目。
项目代码
在以下示例中,我们将创建一个生产者,该生产者会生成从1到500的数字,并将它们发送到Kafka代理。稍后,消费者将从代理中读取数据,并将其存储在MongoDB集合中。
利用Kafka的一个好处是,如果消费者崩溃,另一个或修复的消费者将继续从先前的位置继续读取。这是一种良好的方法,以确保所有数据都被输入到数据库中,而不会丢失数据或产生重复。
在下面的示例中,让我们创建一个新的Python程序文件,命名为produce.py
,并开始导入一些所需的库和模块。
文件:produce.py
# importing the required libraries
from time import sleep
from json import dumps
from kafka import KafkaProducer
解释:
在上面的代码片段中,我们已经导入了所需的库和模块。现在,让我们初始化一个新的Kafka生产者。请注意以下参数:
- bootstrap_servers = ['localhost:9092']: 此参数设置与生产者联系以引导初始集群元数据的主机和端口。在这里设置这个参数并不是强制的,因为默认的主机和端口是localhost:9092。
- value_serializer = lambda x: dumps(x).encode('utf-8'): 此参数用于在将数据发送到代理之前对数据进行序列化。在这里,我们将数据转换为JSON文件并将其编码为UTF-8。
让我们考虑以下代码片段以了解详细信息。
文件:produce.py
# initializing the Kafka producer
my_producer = KafkaProducer(
bootstrap_servers = ['localhost:9092'],
value_serializer = lambda x:dumps(x).encode('utf-8')
)
解释:
在上面的代码片段中,我们使用KafkaProducer()函数初始化了Kafka生产者,其中包含我们之前学习的参数。
现在,我们需要生成从1到500的数字。我们可以使用一个for循环来执行此操作,在其中将每个数字作为一个具有一个键的字典的值。此键仅用作数据的键,不用作主题的键。在同一个循环内,我们还将数据发送到代理。
我们可以通过调用生产者的send方法并指定主题和数据来执行此操作。
注意:值序列化器将自动转换和编码数据。
让我们考虑以下代码片段以了解详细信息。
文件:produce.py
# generating the numbers ranging from 1 to 500
for n in range(500):
my_data = {'num' : n}
my_producer.send('testnum', value = my_data)
sleep(5)
解释:
在上面的代码片段中,我们使用for循环来迭代从一到500的数字。我们还在每次迭代之间添加了五秒的间隔。
如果有人想测试代码,建议创建一个新的主题并将数据发送到该新生成的主题。这种方法可以避免在同时测试生产者和消费者时出现重复的值和testnum主题中可能导致混淆的情况。
消费数据
在开始消费者的编码部分之前,让我们创建一个新的Python程序文件并命名为consume.py
。我们将导入一些模块,例如json.loads、MongoClient和KafkaConsumer。由于本教程不涵盖PyMongo的范围,我们不会深入研究其代码。
此外,用户还可以根据需要将Mongo代码替换为其他代码。我们可以编写此代码以将数据插入到另一个数据库中,处理数据或任何其他用户可以想到的操作。
让我们考虑以下代码片段,以开始。
文件:consume.py
# importing the required modules
from json import loads
from kafka import KafkaConsumer
from pymongo import MongoClient
解释:
在上面的代码片段中,我们从各自的库中导入了所需的模块。
让我们创建Kafka消费者。我们将使用KafkaConsumer()函数进行此工作;因此,让我们仔细查看此函数中使用的参数。
- 主题(Topic): KafkaConsumer()函数的第一个参数是主题。在此示例中,它是testnum。
- bootstrap_servers = ['localhost:9092']: 此参数与生产者相同。
- auto_offset_reset = 'earliest': 这个参数是其他重要参数之一。它处理消费者在被关闭或中断后重新开始读取的位置,我们可以将其设置为最新或最早。当我们将其设置为最早时,消费者将从最新提交的偏移开始读取。当我们将其设置为最新时,消费者将从日志的末尾开始读取。这正是我们在这里需要的。
- enable_auto_commit = True: 此参数确认消费者是否在每个间隔内提交其读取的偏移。
- auto_commit_interval_ms = 1000ms: 此参数用于设置两次提交之间的间隔。由于消息每五秒钟传入一次,每秒提交一次似乎是合理的。
- group_id = 'counters': 此参数是消费者所属的消费者组。请注意,为了使它们自动提交,消费者必须是消费者组的一部分。
- value_deserializer用于将数据反序列化为通用的JSON格式,这是值序列化器的反向工作方式。
让我们考虑以下代码片段以了解详细信息。
文件:consume.py
# generating the Kafka Consumer
my_consumer = KafkaConsumer(
'testnum',
bootstrap_servers = ['localhost : 9092'],
auto_offset_reset = 'earliest',
enable_auto_commit = True,
group_id = 'my-group',
value_deserializer = lambda x : loads(x.decode('utf-8'))
)
解释:
在上面的代码片段中,我们使用KafkaConsumer()函数创建了Kafka消费者。我们还在函数内部添加了我们之前学习的参数。
现在,让我们考虑以下代码片段,以连接到MongoDB数据库的testnum集合(此集合类似于关系数据库中的表)。
文件:consume.py
my_client = MongoClient('localhost : 27017')
my_collection = my_client.testnum.testnum
解释:
在上面的代码片段中,我们定义了一个名为my_client的变量,该变量使用指定的主机和端口MongoClient()函数。然后,我们定义了另一个名为my_collection的变量,该变量使用my_client变量访问testnum主题中的数据。
可以通过循环遍历从消费者中提取数据(在这里,可以将消费者视为可迭代对象)来提取数据。消费者将继续监听,直到代理不再响应。可以使用值属性访问消息的值。在这里,我们将消息覆盖为消息值。
下一行将数据插入数据库集合。最后一行将打印消息已添加到我们的集合的确认。
注意:可以向此循环中的所有操作插入回调。
文件:consume.py
for message in my_consumer:
message = message.value
collection.insert_one(message)
print(message + " added to " + my_collection)
解释:
在上面的代码片段中,我们使用for循环遍历消费者以提取数据。为了测试代码,可以首先执行produce.py文件,然后执行consume.py。