Python教程-PySpark MLlib
机器学习是一种将数据与统计工具相结合以预测输出的数据分析技术。各种企业行业都使用这种预测来做出有利的决策。
PySpark提供了一个用于机器学习的API,称为 mllib。PySpark的mllib支持各种机器学习算法,如分类、回归、聚类、协同过滤和降维,以及底层的优化原语。以下是各种机器学习概念:
- 分类
pyspark.mllib库支持多种分类方法,如二元分类、多类分类和回归分析。对象可以属于不同的类别。分类的目标是基于信息区分数据。在分类中,随机森林、朴素贝叶斯、决策树是最常用的算法。
- 聚类
聚类是一种无监督的机器学习问题。当您不知道如何对数据进行分类时,我们需要算法来查找模式并相应地对数据进行分类。流行的聚类算法有 K均值聚类、高斯混合模型、层次聚类。
- 频繁模式匹配(Frequent Pattern Matching,FPM)
FPM用于挖掘各种项目、项目集、子序列或其他子结构。它主要用于大规模数据集。
- 线性代数(linalg)
mllib.linalg实用程序用于线性代数操作。
- 推荐系统
推荐系统用于定义用于做出推荐的相关数据。它能够预测未来的偏好并推荐排名靠前的项目。例如,在线娱乐平台 Netflix 拥有大量电影,有时人们在选择喜爱的内容时会遇到困难。这就是推荐系统发挥重要作用的领域。
- 回归(Regression)
回归用于找到变量之间的关系和依赖性。它找到数据的每个特征与其他特征之间的相关性,并预测未来值。
mllib包支持许多其他算法、类和函数。在这里,我们将了解 pyspark.mllib 的基本概念。
MLlib特性
PySpark mllib 对于迭代算法非常有用。其特性如下:
- 特征提取(Extraction): 从“行”数据中提取特征。
- 特征转换(Transformation): 用于缩放、转换或修改特征。
- 特征选择(Selection): 从较大的特征集中选择有用的子集。
- 局部敏感哈希(Locality Sensitive Hashing): 将特征转换与其他算法相结合。
让我们来看一下PySpark MLlib的基本库。
MLlib线性回归
线性回归用于找到变量之间的关系和依赖性。考虑以下代码:
frompyspark.sql import SparkSession
spark = SparkSession.builder.appName('Customer').getOrCreate()
frompyspark.ml.regression import LinearRegression
dataset = spark.read.csv(r'C:\Users\DEVANSH SHARMA\Ecommerce-Customers.csv')
dataset.show(10)
输出:
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
| Email| Address| Avatar|Avg Session Length| Time on App| Time on Website|Length of Membership|Yearly Amount Spent|
|mstephenson@ferna...|835 Frank TunnelW...| Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616| 4.0826206329529615| 587.9510539684005|
| hduke@hotmail.com|4547 Archer Commo...| DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744| 2.66403418213262| 392.2049334443264|
| pallen@yahoo.com|24645 Valerie Uni...| Bisque|33.000914755642675|11.330278057777512|37.110597442120856| 4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...| SaddleBrown| 34.30555662975554|13.717513665142507| 36.72128267790313| 3.120178782748092| 581.8523440352177|
|mstephens@davidso...|14023 Rodriguez P...|MediumAquaMarine| 33.33067252364639|12.795188551078114| 37.53665330059473| 4.446308318351434| 599.4060920457634|
|alvareznancy@luca...|645 Martha Park A...| FloralWhite|33.871037879341976|12.026925339755056| 34.47687762925054| 5.493507201364199| 637.102447915074|
|katherine20@yahoo...|68388 Reyes Light...| DarkSlateBlue| 32.02159550138701|11.366348309710526| 36.68377615286961| 4.685017246570912| 521.5721747578274|
| awatkins@yahoo.com|Unit 6538 Box 898...| Aqua|32.739142938380326| 12.35195897300293| 37.37335885854755| 4.4342734348999375| 549.9041461052942|
|vchurch@walter-ma...|860 Lee KeyWest D...| Salmon| 33.98777289568564|13.386235275676436|37.534497341555735| 3.2734335777477144| 570.2004089636196|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
only showing top 10 rows
在下面的代码中,我们导入 VectorAssembler 库以创建新的列“Independent Feature”:
frompyspark.ml.linalg import Vectors
frompyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols = ["Avg Session Length","Time on App","Time on Website"],outputCol = "Independent Features")
output = featureassembler.transform(dataset)
output.show()
输出:
+------------------+
Independent Feature
+------------------+
|34.49726772511229 |
|31.92627202636016 |
|33.000914755642675|
|34.30555662975554 |
|33.33067252364639 |
|33.871037879341976|
|32.02159550138701 |
|32.739142938380326|
|33.98777289568564 |
+------------------+
z = featureassembler.transform(dataset)
finlized_data = z.select("Indepenent feature", "Yearly Amount Spent",)
z.show()
输出:
+--------------------++-------------------+
|Independent Feature | Yearly Amount Spent|
+--------------------++-------------------+
|34.49726772511229 | 587.9510539684005 |
|31.92627202636016 | 392.2049334443264 |
|33.000914755642675 | 487.5475048674720 |
|34.30555662975554 | 581.8523440352177 |
|33.33067252364639 | 599.4060920457634 |
|33.871037879341976 | 637.102447915074 |
|32.02159550138701 | 521.5721747578274 |
|32.739142938380326 | 549.9041461052942 |
|33.98777289568564 | 570.2004089636196 |
+--------------------++-------------------+
PySpark提供了 LinearRegression() 函数来预测任何给定数据集。其语法如下:
regressor = LinearRegression(featureCol = 'column_name1', labelCol = 'column_name2 ')
MLlib K-Means聚类
K-Means聚类算法是最常用和广泛使用的算法之一。它用于将数据点聚类到预定义数量的簇中。以下示例展示了如何使用MLlib的K-Means聚类库:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load(r"C:\Users\DEVANSH SHARMA\Iris.csv")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
PySpark MLlib的参数
PySpark MLlib的一些重要参数如下:
- 评分(Ratings)
这是Ratings或(userID,productID,rating)元组的RDD。
- 排名(Rank)
表示计算特征矩阵的排名(特征数量)。
- 迭代次数(Iterations)
表示ALS的迭代次数。(默认:5)
- Lambda
这是正则化参数。(默认值:0.01)
- 块(Blocks)
用于并行计算一些块的数量。
协同过滤(mllib.recommendation)
协同过滤是一种通常用于推荐系统的技术。该技术专注于填充用户-项目的缺失条目。spark.ml 目前支持基于模型的协同过滤。在协同过滤中,用户和产品由一组小的隐藏因子描述,这些因子可用于预测缺失的条目。
正则化参数的缩放
正则化参数 regParam 缩放以解决最小二乘问题。当在更新用户因子时生成的评分数为用户生成的,或者在更新产品因子时,产品收到的评分数为产品生成的。
冷启动策略
ALS模型(交替最小二乘模型) 用于预测常见的预测问题。当在测试数据集中出现在模型训练期间可能不存在的用户或项目时,就会遇到问题。这可以发生在以下两种情况下:
- 在预测中,模型没有针对没有评分历史的用户和项目进行训练(这称为冷启动策略)。
- 在交叉验证期间,在训练和评估集之间分割数据。在评估集中遇到未在训练集中的用户和项目是很常见的。
考虑以下示例,我们从MovieLens数据集加载评分数据。每行包含一个用户、一个电影、评分和时间戳。
#importing the libraries
frompyspark.ml.evaluation import RegressionEvaluator
frompyspark.ml.recommendation import ALS
frompyspark.sql import Row
no_of_lines = spark.read.text(r"C:\Users\DEVANSH SHARMA\MovieLens.csv").rdd
no_of_parts = no_of_lines.map(lambda row: row.value.split("::"))
ratingsRDD = no_of_lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Develop the recommendation model using ALS on the training data
# Note we set cold start strategy to make sure that we don't get NaN evaluation metrics.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Calculate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Evaluate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Evaluate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Evaluate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Evalute top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)