从用户的评分历史中,我们其实可以提取出非常多有价值的特征。比如,我们可以根据 ratings 表的历史联合 movies 表的电影信息,提取出用户统计类特征,它包括用户评分总数、用户平均评分、用户评分标准差、用户好评电影的发布年份均值、用户好评电影的发布年份标准差、用户最喜欢的电影风格,以及用户好评电影 ID 等等。最后是“场景特征”。我们可用的场景特征就一个,那就是评分的时间戳,我们把它作为代表时间场景的特征放到特征工程中。
用 Spark 来提取这些特征的总体实现会比较琐碎,所以我就不把全部代码贴在这里了,你可以参考 SparrowRecsys 项目中的 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngForRecModel 对象,里面包含了所有特征工程的代码。这里,我们只讲几个有代表性的统计型特征的处理方法。
val movieRatingFeatures = samplesWithMovies3.groupBy(col("movieId"))
.agg(count(lit(1)).as("movieRatingCount"),
avg(col("rating")).as("movieAvgRating"),
stddev(col("rating")).as("movieRatingStddev"))
计算统计型特征的典型方法,就是利用 Spark 中的 groupBy 操作,将原始评分数据按照 movieId 分组,然后用 agg 聚合操作来计算一些统计型特征。比如,在上面的代码中,我们就分别使用了 count 内置聚合函数来统计电影评价次数(movieRatingCount),用 avg 函数来统计评分均值(movieAvgRating),以及使用 stddev 函数来计算评价分数的标准差(movieRatingStddev)。
一般来说,我们不会人为预设哪个特征有用,哪个特征无用,而是让模型自己去判断,如果一个特征的加入没有提升模型效果,我们再去除这个特征。就像我刚才虽然提取了不少特征,但并不是说每个模型都会使用全部的特征,而是根据模型结构、模型效果有针对性地部分使用它们
对于一个推荐模型来说,它的根本任务是预测一个用户 U 对一个物品 I 在场景 C 下的喜好分数。所以在训练时,我们要为模型生成一组包含 U、I、C 的特征,以及最终真实得分的样本。在 SparrowRecsys 中,这样的样本就是基于评分数据 ratings,联合用户、物品特征得来的。
//读取原始ratings数据
val ratingSamples = spark.read.format("csv").option("header", "true").load(ratingsResourcesPath.getPath)
//添加样本标签
val ratingSamplesWithLabel = addSampleLabel(ratingSamples)
//添加物品(电影)特征
val samplesWithMovieFeatures = addMovieFeatures(movieSamples, ratingSamplesWithLabel)
//添加用户特征
val samplesWithUserFeatures = addUserFeatures(samplesWithMovieFeatures)
接着,我们来看第二个关键问题,也就是样本的标签是什么,对于 MovieLens 数据集来说,用户对电影的评分是最直接的标签数据,因为它就是我们想要预测的用户对电影的评价,所以 ratings 表中的 0-5 的评分数据自然可以作为样本的标签。
为了让我们的实践过程更接近真实的应用场景,我也对 MovieLens 数据集进行了进一步处理。具体来说就是,把评分大于等于 3.5 分的样本标签标识为 1,意为“喜欢”,评分小于 3.5 分的样本标签标识为 0,意为“不喜欢”。这样一来,我们可以完全把推荐问题转换为 CTR 预估问题。
那在 Spark 中,我们应该如何处理这些跟历史行为相关的特征呢?这就需要用到 window 函数了。比如说,我在生成 userAvgRating 这个特征的时候,是使用下面的代码生成的:
withColumn("userAvgRating", avg(col("rating"))
.over(Window.partitionBy("userId")
.orderBy(col("timestamp")).rowsBetween(-100, -1)))
over(Window.partitionBy("userId").orderBy(col("timestamp")))操作,它的意思是,在做 rating 平均这个操作的时候,我们不要对这个 userId 下面的所有评分取平均值,而是要创建一个滑动窗口,先把这个用户下面的评分按照时间排序,再让这个滑动窗口一一滑动,滑动窗口的位置始终在当前 rating 前一个 rating 的位置。这样,我们再对滑动窗口内的分数做平均,就不会引入未来信息了。
好在,特征数据库 Redis 已经为我们提供了解决办法。我们把用户特征和物品特征分别存入 Redis,线上推断的时候,再把所需的用户特征和物品特征分别取出,拼接成模型所需的特征向量就可以了。FeatureEngForRecModel 中的 extractAndSaveUserFeaturesToRedis 函数给出了详细的 Redis 操作,我把其中的关键操作放在了下面。
val userKey = userFeaturePrefix + sample.getAs[String]("userId")
val valueMap = mutable.Map[String, String]()
valueMap("userRatedMovie1") = sample.getAs[String]("userRatedMovie1")
valueMap("userRatedMovie2") = sample.getAs[String]("userRatedMovie2")
...
valueMap("userAvgRating") = sample.getAs[String]("userAvgRating")
valueMap("userRatingStddev") = sample.getAs[String]("userRatingStddev")
redisClient.hset(userKey, JavaConversions.mapAsJavaMap(valueMap))