Python教程-Dask Python
在现代的机器学习和数据科学世界中,使用独特的Python工具变得非常容易。这些包括scikit-learn、NumPy或Pandas,但它们在内存使用或处理时间方面不适合处理大规模数据。
预期会有一些需要转向分布式计算工具(传统上是Apache Spark)的情况。然而,这可能意味着要为全新的系统重新调整工作流程,从熟悉的Python生态系统导航到不同的Java虚拟机(JVM)世界,从而显著复杂化开发工作流程。
Dask库用于将分布式计算能力与数据科学的Python开发灵活性相结合,可以与Python的标准数据工具无缝集成。
理解分布式计算
让我们考虑一个场景:我们有一个数据集,可能是一组文本文件,它们非常大,无法装入计算机的内存中。我们可以利用Python中的文件流和其他生成器工具来迭代处理数据集,而无需将其加载到内存中。然而,仍然存在另一个问题,因为程序仍然在单个线程上运行,这最终会限制速度,即使在内存管理之后也是如此。
因此,Python提供了一项称为全局解释器锁(又称大多数开发人员使用CPython)的安全功能,用于在Python中编写并行代码,但这可能有点棘手。
因此,有一些解决方案可供选择。这些解决方案包括在GIL之外使用较低级别的工具(例如NumPy在编译代码中执行多线程工作)或在Python代码包(如multiprocessing或joblib)中利用多个进程/线程。
然而,尝试并行化以加速代码变得困难,结果可能是即使过程正确执行,也会生成可读性较差的代码,需要开发人员完全重新架构过程,但可能在系统上有限的资源。
对于像上面所述的大规模困难问题,分布式计算可以被认为是一种主要的关键。工作被分配给分布式系统中的多个独立工作机器,而不仅仅是在单个设备上的多线程工作。
这些独立工作机器在其处理器上处理数据集的块,在其磁盘空间或内存中保存数据。这些工作机器仅通过相对简单的消息传递与彼此或中央调度器通信,而不是像多线程代码中那样共享磁盘空间和内存。
分布式计算系统还允许开发人员扩展代码以并行在任意数量的工作机器上运行非常大的数据集,而不必复杂地设计设置集中式调度器并将工作机器与它们分开。
让我们了解一下Dask是什么以及它是如何工作的。
理解Dask
Dask是一个与其他社区项目(例如Pandas、NumPy和scikit-learn)协调开发和设计的免费开源库。它是一个并行计算库,通过任务工作机器和任务调度程序将更大的计算分解为更小的计算。Dask库在内存大小之外提供分布式并行和多核执行的功能。
Dask通过其低级别调度程序和高级集合提供不同的实用工具。
- 低级别调度程序:Dask提供了动态处理任务图的任务调度程序,这些执行机器控制高级别集合。但是,我们可以使用它们来为用户定义的工作负载提供动力。这些调度程序具有较低的延迟(约为1ms),并努力以较小的内存占用处理计算。Dask中的调度程序是在复杂情况下指导使用多进程和线程库或其他任务调度系统(如IPython parallel或Luigi)的选择。
- 高级集合:Dask提供了高级别的数组、数据帧和包集合,模仿了Pandas、列表和NumPy。但是,我们可以并行在不适合内存的数据集上操作这些高级集合。Dask的高级集合是大型数据集的Pandas和NumPy的替代品。
Dask的用例提供了几个示例工作流程,可以考虑Dask作为理想选择的情况。
Dask调度程序的类型
Dask提供主要两种类型的调度程序:单机调度程序和分布式调度程序。
- 单机调度程序: 单机调度程序针对大于内存使用的情况进行了优化。此调度程序易于使用,成本较低,但由于在单个计算机上工作,无法扩展。
- 分布式调度程序: 分布式调度程序比单机调度程序更复杂,完全是异步的(持续非阻塞对话),与单机调度程序相比更加强大。
在大多数情况下,建议使用分布式调度程序,因为它提供了一个灵活且交互式的仪表板,包括多个表格和带有实时信息的图形。默认情况下,在初始化集群时,它位于端口8787上。
在我们进入安装部分之前,让我们了解一下Dask集群。
理解Dask集群
集群是一个分布式或并行处理系统,包括一组相互连接的独立计算机,它们协同工作,形成一个单一的、集成的计算资源。集群中的一个节点可以被视为一个单处理器或多处理器系统,例如个人计算机(PC)、工作站,甚至SMP。
在集群的世界中,有各种不同的架构形式,用于确定如何将工作精确地分配给计算机。让我们了解在Dask中如何组织集群的方式。
Dask网络由三个部分组成:
- 集中式调度程序: 集中式调度程序管理工作机器并分配需要完成的任务。
- 多个工作机器: 多个工作机器执行计算,保存结果,并与其他工作机器通信。
- 一个或多个客户端: 一个或多个客户端可以与用户从Jupyter笔记本或脚本进行交互。这些客户端还将工作提交给调度程序以在工作机器上处理。
客户端会将请求发送给调度程序,描述计算的代码类型。一旦收到请求,调度程序将工作分配给工作机器以满足请求,最终工作机器完成计算工作。
正如我们所看到的,Dask将这些庞大的数据计算分解为多个小计算。
值得注意的是,Dask还可以部署在基于集群的各种技术上,例如:
- Kubernetes集群
- HPC集群,使用作业管理器,如LSF、PBS、SGE、SLURM或科学和学术实验室中常见的其他作业管理器。
- 处理YARN的Spark或Hadoop集群。
如何安装Dask Python
我们可以使用Anaconda或pip来安装Dask。
通过Anaconda安装Dask的语法如下:
conda install dask
或者
我们可以在终端或命令提示符中使用以下命令通过pip安装Dask:
$ pip install dask[complete]
一旦我们成功安装了Dask库,让我们了解Dask界面。
理解Dask界面
Dask提供不同的用户界面。这些界面包含一组不同的用于分布式计算的并行算法。以下是数据科学从业者寻求扩展NumPy、Pandas和scikit-learn的一些重要用户界面。
- 数组: 并行NumPy
- 数据框: 并行Pandas
- 机器学习: 并行Scikit-Learn
Dask数组
Dask中的数组使用基于块的算法提供了一个大于内存、并行和n维数组,换句话说,它是NumPy数组的分布形式。
以下是一个帮助我们了解Dask数组的图像:
正如我们所看到的,多个NumPy数组被组织成网格,以形成一个Dask数组。当我们创建一个Dask数组时,我们可以指定块的大小,该大小定义了NumPy数组的大小。例如,如果数组中有十个值,并且我们提供了块大小为五,它将返回两个包含五个值的NumPy数组。
Dask数组提供了一些重要的功能,如下所述:
- 大于内存: Dask数组允许我们处理比可用内存更大的数据集。Dask帮助将数组分解为许多较小的片段,对这些片段进行操作以减少计算的内存占用,并有效地从磁盘流式传输数据。
- 并行: Dask数组利用所有核心进行并行计算。
- 块算法: Dask数组还提供了块算法,以在块或子矩阵上操作,而不是在整个数组的行或列上运行。这个功能有助于通过执行许多小计算来执行大型计算。
以下是使用Dask创建数组的一些简单示例。
示例1:使用Dask数组创建随机数组
import dask.array as darray
# using arange for creating an array with values from 0 to 15
my_array = darray.arange(16, chunks = 5)
print( my_array.compute())
# using chunks for checking the size of each chunk
print(my_array.chunks)
输出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
((5, 5, 5, 1),)
说明:
在上面的程序中,我们从dask库中导入了数组模块,并使用arange()方法创建了一个包含16个值的数组,并将块大小定义为5。然后,我们使用compute()方法打印数组。我们还使用chunks函数检查每个块的大小。结果是生成的数组,我们还可以看到数组被分为四个块,其中第一、第二和第三块每个包含五个值,第四块只包含一个值。
示例2:将NumPy数组转换为Dask数组
import numpy as np
import dask.array as darray
first_array = np.arange(15)
second_array = darray.from_array(first_array, chunks = 5)
# resulting in a dask array
print(second_array.compute())
输出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14]
说明:
在上面的示例中,我们导入了NumPy库和Dask库的数组模块,并使用arange()方法创建了一个包含15个值的NumPy数组。然后,我们使用from_array()方法将first_array转换为Dask数组,并定义了块大小为5。然后,我们使用compute()函数打印数组。结果是前100个数字的总和。
我们已经讨论了Dask Python的基本介绍,但还有一些重要的概念需要讨论。剩下的教程将在第二部分中介绍。