业界主流的大数据处理利器:Spark

5、特征处理C-3创建:刘锐, 最后修改: 刘锐今天 18:35

业界主流的大数据处理利器:Spark

架构。从下面 Spark 的架构图中我们可以看到,Spark 程序由 Manager Node(管理节点)进行调度组织,由 Worker Node(工作节点)进行具体的计算任务执行,最终将结果返回给 Drive Program(驱动程序)。在物理的 Worker Node 上,数据还会分为不同的 partition(数据分片),可以说 partition 是 Spark 的基础数据单元。

Spark 是怎么协同这么多的工作节点,通过并行计算得出最终的结果呢?

最关键的过程是我们要理解哪些是可以纯并行处理的部分,哪些是必须 shuffle(混洗)和 reduce 的部分。


shuffle 操作需要在不同计算节点之间进行数据交换,非常消耗计算、通信及存储资源,因此 shuffle 操作是 spark 程序应该尽量避免的。
Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。

如何利用 One-hot 编码处理类别型特征

广义上来讲,所有的特征都可以分为两大类。
第一类是类别、ID 型特征(以下简称类别型特征)。拿电影推荐来说,电影的风格、ID、标签、导演演员等信息,用户看过的电影 ID、用户的性别、地理位置信息、当前的季节、时间(上午,下午,晚上)、天气等等,这些无法用数字表示的信息全都可以被看作是类别、ID 类特征。
第二类是数值型特征,能用数字直接表示的特征就是数值型特征,典型的包括用户的年龄、收入、电影的播放时长、点击量、点击率等。
这里我们就要用到 One-hot 编码(也被称为独热编码),它是将类别、ID 型特征转换成数值向量的一种最典型的编码方式。它通过把所有其他维度置为 0,单独将当前类别或者 ID 对应的维度置为 1 的方式生成特征向量。这怎么理解呢?我们举例来说,假设某样本有三个特征,分别是星期、性别和城市,我们用 [Weekday=Tuesday, Gender=Male, City=London] 来表示,用 One-hot 编码对其进行数值化的结果。

我们使用 Spark 的机器学习库 MLlib 来完成 One-hot 特征的处理。
def oneHotEncoderExample(samples:DataFrame): Unit ={
//samples样本集中的每一条数据代表一部电影的信息,其中movieId为电影id
val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType))
//利用Spark的机器学习库Spark MLlib创建One-hot编码器
val oneHotEncoder = new OneHotEncoderEstimator()
.setInputCols(Array("movieIdNumber"))
.setOutputCols(Array("movieIdVector"))
.setDropLast(false)
//训练One-hot编码器,并完成从id特征到One-hot向量的转换
val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
//打印最终样本的数据结构
oneHotEncoderSamples.printSchema()
//打印10条样本查看结果
oneHotEncoderSamples.show(10)
(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering中的oneHotEncoderExample函数)
 

数值型特征的处理 - 归一化和分桶

我们经常会用分桶的方式来解决特征值分布极不均匀的问题。所谓"分桶(Bucketing)",就是将样本按照某特征的值从高到低排序,然后按照桶的数量找到分位数,将样本分到各自的桶中,再用桶 ID 作为特征值。
在 Spark MLlib 中,分别提供了两个转换器 MinMaxScaler 和 QuantileDiscretizer,来进行归一化和分桶的特征处理。它们的使用方法和之前介绍的 OneHotEncoderEstimator 一样,都是先用 fit 函数进行数据预处理,再用 transform 函数完成特征转换。下面的代码就是 SparrowRecSys 利用这两个转换器完成特征归一化和分桶的过程。
def ratingFeatures(samples:DataFrame): Unit ={
samples.printSchema()
samples.show(10)
//利用打分表ratings计算电影的平均分、被打分次数等数值型特征
val movieFeatures = samples.groupBy(col("movieId"))
.agg(count(lit(1)).as("ratingCount"),
avg(col("rating")).as("avgRating"),
variance(col("rating")).as("ratingVar"))
.withColumn("avgRatingVec", double2vec(col("avgRating")))
movieFeatures.show(10)
//分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中
val ratingCountDiscretizer = new QuantileDiscretizer()
.setInputCol("ratingCount")
.setOutputCol("ratingCountBucket")
.setNumBuckets(100)
//归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化
val ratingScaler = new MinMaxScaler()
.setInputCol("avgRatingVec")
.setOutputCol("scaleAvgRating")
//创建一个pipeline,依次执行两个特征处理过程
val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
val featurePipeline = new Pipeline().setStages(pipelineStage)
val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
//打印最终结果
movieProcessedFeatures.show(
(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering中的ratingFeatures函数)
当然,对于数值型特征的处理方法还远不止于此,在经典的 YouTube 深度推荐模型中,我们就可以看到一些很有意思的处理方法。比如,在处理观看时间间隔(time since last watch)和视频曝光量(#previous impressions)这两个特征的时,YouTube 模型对它们进行归一化后,又将它们各自处理成了三个特征(图 6 中红框内的部分),分别是原特征值 x,特征值的平方x^2
无论是平方还是开方操作,改变的还是这个特征值的分布,这些操作与分桶操作一样,都是希望通过改变特征的分布,让模型能够更好地学习到特征内包含的有价值信息

 

  • 无标签