Python教程-Dask Python(第二部分)
在上一个教程中,我们已经了解了分布式计算的概念以及Dask的介绍。我们还了解了什么是Dask集群以及如何安装Dask,以及Dask界面的介绍。
Dask界面
正如我们已经讨论过的,Dask界面具有各种用于分布式计算的并行算法集合。数据科学从业者正在使用一些重要的用户界面来扩展NumPy、Pandas和scikit-learn:
- 数组: 并行NumPy
- 数据帧: 并行Pandas
- 机器学习: 并行Scikit-Learn
我们已经在前一个教程中介绍了Dask数组,现在让我们直接进入Dask数据帧。
Dask数据帧
我们已经观察到,需要将多个NumPy数组组合在一起才能形成一个Dask数组。类似地,Dask数据帧包含多个较小的Pandas数据帧。Pandas的大型数据帧按行分割以形成多个较小的数据帧。这些较小的数据帧可以在单个系统或多个系统上使用(因此,允许我们存储比内存更大的数据集)。Dask数据帧的每个计算都会在当前的Pandas数据帧上并行化执行函数。
下面是一个表示Dask数据帧结构的图像:
Dask数据帧还提供了与Pandas数据帧非常相似的应用程序编程接口(API)。
现在,让我们考虑一些使用Dask数据帧执行基本函数的示例。
示例1:读取CSV文件
使用Pandas读取文件
# reading the file using pandas
import pandas as pd
my_pdfile = pd.read_csv("covid_19_india.csv")
print(my_pdfile)
输出:
Sno Date Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational Cured Deaths Confirmed
0 1 30/01/20 6:00 PM Kerala 1 0 0 0 1
1 2 31/01/20 6:00 PM Kerala 1 0 0 0 1
2 3 01/02/20 6:00 PM Kerala 2 0 0 0 2
3 4 02/02/20 6:00 PM Kerala 3 0 0 0 3
4 5 03/02/20 6:00 PM Kerala 3 0 0 0 3
... ... ... ... ... ... ... ... ... ...
9286 9287 09/12/20 8:00 AM Telengana - - 266120 1480 275261
9287 9288 09/12/20 8:00 AM Tripura - - 32169 373 32945
9288 9289 09/12/20 8:00 AM Uttarakhand - - 72435 1307 79141
9289 9290 09/12/20 8:00 AM Uttar Pradesh - - 528832 7967 558173
9290 9291 09/12/20 8:00 AM West Bengal - - 475425 8820 507995
[9291 rows x 9 columns]
使用Dask读取文件
# reading the file using dask
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.compute())
输出:
Sno Date Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational Cured Deaths Confirmed
0 1 30/01/20 6:00 PM Kerala 1 0 0 0 1
1 2 31/01/20 6:00 PM Kerala 1 0 0 0 1
2 3 01/02/20 6:00 PM Kerala 2 0 0 0 2
3 4 02/02/20 6:00 PM Kerala 3 0 0 0 3
4 5 03/02/20 6:00 PM Kerala 3 0 0 0 3
... ... ... ... ... ... ... ... ... ...
9286 9287 09/12/20 8:00 AM Telengana - - 266120 1480 275261
9287 9288 09/12/20 8:00 AM Tripura - - 32169 373 32945
9288 9289 09/12/20 8:00 AM Uttarakhand - - 72435 1307 79141
9289 9290 09/12/20 8:00 AM Uttar Pradesh - - 528832 7967 558173
9290 9291 09/12/20 8:00 AM West Bengal - - 475425 8820 507995
[9291 rows x 9 columns]
解释:
在上面的示例中,我们创建了两个不同的程序。在第一个程序中,我们导入了pandas库,并使用read_csv()函数来读取CSV文件。相比之下,在第二个程序中,我们导入了dask库的dataframe模块,并使用read_csv()函数来读取CSV文件。
这两个程序的结果将是相同的,但处理时间不同。与Pandas相比,Dask数据帧在执行函数时提供更快的速度。在实际使用中,一旦使用,这一点就会变得明显。
示例2:查找特定列的值计数
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.State.value_counts().compute())
输出:
Kerala 315
Delhi 283
Rajasthan 282
Haryana 281
Uttar Pradesh 281
Tamil Nadu 278
Ladakh 278
Jammu and Kashmir 276
Karnataka 276
Punjab 275
Maharashtra 275
Andhra Pradesh 273
Uttarakhand 270
Odisha 269
West Bengal 267
Puducherry 267
Chhattisgarh 266
Gujarat 265
Chandigarh 265
Madhya Pradesh 264
Himachal Pradesh 264
Bihar 263
Manipur 261
Mizoram 260
Andaman and Nicobar Islands 259
Goa 259
Assam 253
Jharkhand 253
Arunachal Pradesh 251
Tripura 247
Meghalaya 240
Telengana 236
Nagaland 207
Sikkim 200
Dadra and Nagar Haveli and Daman and Diu 181
Cases being reassigned to states 60
Telangana 45
Dadar Nagar Haveli 37
Unassigned 3
Telangana*** 1
Maharashtra*** 1
Telengana*** 1
Chandigarh*** 1
Daman & Diu 1
Punjab*** 1
Name: State, dtype: int64
解释:
在上面的示例中,我们导入了dask库的dataframe模块,并使用read_csv()函数来从CSV文件中读取内容。然后,我们使用列名“States”后面跟着value_counts()方法来计算该特定列中每个值的总数。结果是我们得到了该列中出现的所有州的名称以及它们出现的总次数。
示例3:在Dask数据帧上使用groupby函数
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.groupby(my_ddfile.State).Cured.max().compute())
输出:
State
Andaman and Nicobar Islands 4647
Andhra Pradesh 860368
Arunachal Pradesh 15690
Assam 209447
Bihar 232563
Cases being reassigned to states 0
Chandigarh 16981
Chandigarh*** 14381
Chhattisgarh 227158
Dadar Nagar Haveli 2
Dadra and Nagar Haveli and Daman and Diu 3330
Daman & Diu 0
Delhi 565039
Goa 46924
Gujarat 203111
Haryana 232108
Himachal Pradesh 37871
Jammu and Kashmir 107282
Jharkhand 107898
Karnataka 858370
Kerala 582351
Ladakh 8056
Madhya Pradesh 200664
Maharashtra 1737080
Maharashtra*** 1581373
Manipur 23166
Meghalaya 11686
Mizoram 3772
Nagaland 10781
Odisha 316970
Puducherry 36308
Punjab 145093
Punjab*** 130406
Rajasthan 260773
Sikkim 4735
Tamil Nadu 770378
Telangana 41332
Telangana*** 40334
Telengana 266120
Telengana*** 42909
Tripura 32169
Unassigned 0
Uttar Pradesh 528832
Uttarakhand 72435
West Bengal 475425
Name: Cured, dtype: int64
解释:
在上面的程序中,我们再次导入了dask库的dataframe模块,并使用read_csv从指定的CSV文件中读取内容。然后,我们使用dask数据帧的groupby函数和max()函数来查找每个州中治愈人数的最大值。
现在,让我们了解另一个Dask界面,即Dask机器学习。
Dask机器学习
Dask机器学习提供了可扩展的Python中的机器学习算法,与scikit-learn兼容。让我们首先了解如何使用scikit-learn处理计算,然后更深入地了解Dask如何以不同的方式执行这些函数。
用户可以通过将参数njobs = -1放置在scikit-learn(在单个系统上)中来执行并行计算。Scikit-learn使用Joblib来执行这些并行计算。Joblib是一个提供并行支持的Python库。当我们调用fit()函数时,根据要执行的任务(无论是超参数搜索还是拟合模型),Joblib会将任务分发到可用的核心上。
然而,我们可以使用scikit-learn库将并行计算扩展到多台机器。与此不同,Dask既在单个系统上表现良好,又可以轻松扩展到一组系统。
Dask提供了一个中央任务调度程序和一组工作程序。调度程序将任务分配给每个工作程序。然后,这些工作程序被分配了可以执行计算的核心数量。工作程序提供两个功能:
- 计算分配的任务
- 根据请求为其他工作程序提供结果
让我们考虑一个演示调度程序和工作程序之间对话方式的示例(这个示例是由Dask的开发人员Matthew Rocklin提供的):
中央任务调度程序以Python函数的形式将工作发送给工作程序以在同一台计算机或集群上执行。
- Worker A,请计算x = f(1),Worker B请计算y = g(2)
- Worker A,一旦g(2)函数完成,请从Worker B获取y,并执行z = h(x, y)
上面的示例应该为我们展示了Dask的工作方式提供了清晰的演示。现在让我们了解机器学习模型和Dask-search CV。
机器学习模型
Dask机器学习(也称为Dask-ML)提供了Python中的可扩展机器学习。但在开始之前,让我们遵循以下Dask-ML安装步骤:
使用conda安装
conda install -c conda-forge dask-ml
使用pip安装
$ pip install dask-ml
让我们继续了解如何直接并行化scikit-learn以及如何使用Dask Array重新实现算法。
1. 直接并行化scikit-learn
正如我们已经讨论过的,Scikit-Learn(也称为sklearn)通过在后台使用Joblib来提供在单个CPU上的并行计算。我们可以直接使用Dask来并行化多个sklearn估算器,而无需修改当前代码。
第一步是从dask库的分布模块导入client。此命令将在系统上生成一个本地调度程序和工作程序。
from dask.distributed import Client
# starting a local Dask client
my_client = Client()
接下来的步骤是在后台实例化dask的joblib。我们必须从sklearn库的joblib中导入parallel_backend,如下所示:
import dask_ml.joblib
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):
# Normal scikit-learn code goes here
from sklearn.ensemble import RandomForestClassifier
my_model = RandomForestClassifier()
2. 使用Dask Array重新实现算法
Dask-ML重新实现了简单的机器学习算法,以使用NumPy数组。NumPy数组由Dask Array替换,以实现可扩展的算法。此替换有助于实现以下内容:
- 线性模型(例如,线性回归、泊松回归、逻辑回归等)
- 预处理(例如,标定器、转换器等)
- 聚类(例如,K均值、谱聚类等)
A. 线性模型示例
from dask_ml.linear_model import LogisticRegression
mymodel = LogisticRegression()
mymodel.fit(data, labels)
B. 预处理示例
from dask_ml.preprocessing import OneHotEncoder
myencoder = OneHotEncoder(sparse=True)
myresult = myencoder.fit(data)
C. 聚类示例
from dask_ml.cluster import KMeans
mymodel = KMeans()
mymodel.fit(data)
Dask-Search CV
超参数调整 在构建模型中被认为是一个重要的步骤,可以在很大程度上改变模型的性能。机器学习模型有各种各样的超参数,很难确定在特定情况下哪个参数表现更好。手动执行此任务是相当繁琐的工作。但是,Scikit-Learn库提供了Gridsearch来简化超参数调整的任务。用户必须提供参数,而Gridsearch将提供这些参数的最佳组合。
让我们考虑一个例子,我们需要选择一个随机森林技术来拟合数据集。该模型有三个重要的可调参数 - 第一个参数,第二个参数和第三个参数,分别如下设置:
第一个参数 - Bootstrap = True
第二个参数 - max_depth - [8, 9]
第三个参数 - n_estimators:[50, 100 , 200]
1. sklearn Gridsearch:对于每个参数组合,Scikit-learn Gridsearch将执行任务,有时会重复执行同一个任务多次。下面的图表演示了这不是最有效的方法:
2. Dask-Search CV:与scikit-learn的Gridsearch CV不同,Dask提供了一个名为Dask-Search CV的库。Dask-Search CV合并了步骤以减少重复。我们可以使用以下步骤安装Dask-search:
使用conda安装Dask-Search CV
conda install dask-searchcv -c conda-forge
使用pip安装Dask-Search CV
$ pip install dask-searchcv
下面的图表显示了Dask-Search CV的工作方式:
Spark和Dask之间的区别
在Dask的帮助下,Python机器学习使用了可扩展的库,这是Dask-ML。与此不同,Spark使用Scala或Java中的机器学习库。Dask旨在在Python中的单台或多台机器上提供计算。另一方面,Spark旨在在分布式计算环境中提供计算。这两者都在不同的方面具有各自的优势和用途。
这就是Dask Python的第二部分教程的全部内容。我希望这些示例和解释对你有所帮助,以更好地理解Dask在Python中的用法。如果你有任何问题或疑虑,请随时提问。在Dask中继续学习并提高你的Python技能!