虽然自己刚解决掉一个TB级数据导入“大项目”,但是感觉自己对大数据流程还是缺乏一定对认识,所以想通过一个完整对项目体会一下大数据如何落地再实际产出对,正好@志斌 提供了一个硬盘给我,里面有12年微博对数据,数据量在3TB左右,微博量为65亿,所以就借助这个数据来捣鼓一下推荐系统,我把自己捣鼓的过程写下来,希望能够给其他想了解推荐系统的人一点帮助。
什么是推荐系统 #
其实从名字上面我们就知道,我们其实就是要做一件事,“推荐”,并把这个东西做成一个系统,推荐很好理解,但是怎么来做呢,我们拿买东西作为例子,我很喜欢吃苹果,所以我经常去超市买苹果,售货员大妈知道我喜欢吃苹果,所以经常推荐我一些好吃苹果,这个就是推荐,他是根据我们历史的消费记录来给我们推荐的
这算一种非常简单的推荐系统,你喜欢啥我就给你推荐啥,但是有一个问题,假如你喜欢Apple,你买了一个IPhone X,但是如果我再给你推荐IPhone X其实你不会买的(当然也有可能,但是可能性非常低),那一个“高级”的推荐系统应该怎么做呢,你买了IPhone X,你就很有可能会买AirPod、会买IPod、IMac这些,你脑子里面可能会这样想,做一个归类把一类商品都归到一起,比如IPhone,Mac这些,然后只要他买了其中一个,我们就推荐其他,当然这是最容易想到的一个方法,但是这个方法成本太高而且只适用了Apple产品,假如你买小米,你的手机可能是小米的,但是你的贴膜可能不是小米的。
得亏于大数据时代的到来,我们记录了很多用户的购买记录,这就相当于我们知道这些商品存在一些联系,假如我们能把这种联系找出来,再用一些算法搅拌一下数据,我们就能产生香喷喷的“推荐数据”
推荐系统原理 #
首先我们来看一下我们有什么数据,最主要的就三张表,第一张用户表(User表),第二张商品表(Item表),第三张用户和商品交互记录表,这个交互记录可能是评分可能是浏览记录可能是购买记录,我们简称为Rank表
我们把这三个维度组合起来就是一个二维数组,我们可以用一个矩阵来代表这个,如下面所示
$$\begin{bmatrix} 5 & 4 & 2 & 1 & 0 \\ 4 & 3 & 1 & 2 & 5 \\ 1 & 1 & 5 & 0 & 2 \end{bmatrix}$$
有三个用户,5件商品,我们从上面伪造的数据可以看出,用户1、2品味很近,而用户三品味与他们完全不同,所以对于最后一件商品,虽然用户1没有评价或者没有购买,我们可以预测他的评价和用户2 差不多,所以我们可以把东西给推荐给用户1,这个是从我们直觉上来感受的,但是计算机可是没有知觉的,我们必须把这些东西量化才能实现智能推荐
我们用一个欧几里德距离或者皮尔逊相关度来衡量这种品味,其实你可以把它想象为一个N维空间的上每个点的距离,这里我们就不谈公式,我们直接给结果,对于每个用户来说,自己跟自己的品味是100%相近,其他人则有的相同,有的不相同,所以对于用户1,我们可以算出一个“品味值”(我们其实可以不跟自己算因为绝对是1,但是在大型矩阵运算的过程中,其实少算一个比多算一个要划得来)
$$\begin{bmatrix} 1 & 0.8 & 0.1 \end{bmatrix}$$
我们得到上面的品味值(加权值),所以对于最后一个商品,我们把其他用户评价与他们与用户一的值相乘,我们就把他们对数据的评价可靠性给量化下来了,所以就可以下一步排序输出我们的推荐。
我们知道现在的大数据强大在于能给所有用户都推荐他们想要的数据,但是从我们上面的描述来看,我们为了获取一个用户的品味值就得对数据进行大量运算,如果一个用户要1s的,一亿个用户就得要3年才能算完,所以我们得将我们的for循环改成矩阵运算,才能通过大数据将所以用户的推荐给算出来
现在我们来谈谈怎么用矩阵运算来加速这个过程,我们用$$R_{UI}$$ 来代替上面的评价矩阵,我们首先要把$$R_{UI}$$ 给倒置,然后让两个做矩阵乘法,得到”品味矩阵“ $$W_{UU}$$
$$R_{UI}$$ * $$R_{UI}^T $$ = $$W_{UU}$$
这个$$W_{UU}$$ 是什么呢,他是一个行数为用户总数,列数也为用户总数的方阵,每一行代表,第i个用户,他与其他用户的品味差距,有了这个我们可以将$$W_{UU}$$ 与 $$R_{UI}$$ 做一个矩阵乘法,这样我们就得到一个$$V_{UI}$$矩阵,这个矩阵就是我们对$$R_{UI}$$的一个预测值,我们就把一些用户没有评价的值给预测出现,接下来就对每一行先做一个过滤,取出用户没有购买的,然后把没有购买的按照大小排序输出
PS:也可以用点乘一下矩阵进行过滤,就是将没有将用户与商品购买记录生产一个UI矩阵,购买过置0,每购买过置1,这样点乘后购买过无论分多高都变成0了,把0过滤就ok了
我们来总结一下上面用到的矩阵公式
- $$R_{UI}$$ * $$R_{UI}^T $$ = $$W_{UU}$$
- $$W_{UU}$$ * $$R_{UI}$$ = $$V_{UI}$$
初略一看没有啥问题,但是你要想一下假如用户数非常大,有一千万(很容易达到,QQ微信都10几亿了),那个$$W_{UU}$$ 矩阵的大小有 1千万 * 一千万 = 1e+14 ,我们根本没有足够的内存来存放这个矩阵,而且有一个问题,就是其实假如我们有一千万件商品,但是一个用户一辈子可能就买了几十件,这是一个巨大的稀疏矩阵
所以虽然我们这个矩阵算法很高效,能通过两次运算就把用户推荐给全部输出,但是假如用户量多或者商品多第一内存不够,第二个矩阵太稀疏了,在没有大数据之前或者计算机还没有现在这么强的时候,大家的解决方法是对数据进行抽样,把大数据分解成为小数据,但是大数据的魅力就是使用全部数据而不是部分,所以我们要使用一些抽象的算法来帮助我们把这个问题给解决了
首先我们还是要用所以的用户数据,所以的商品数据,但是现在我们做一个抽象,我们假设用户身上有几十种特征来决定他们的品味(比如年龄,性别,性格等),我们直接跳过前面两个计算步骤,我们假设用户特征矩阵$$U_{UP}$$ (每个用户有P个特征,决定他们品味),对于商品,我们假设就是因为这些特征导致的一个商品特征矩阵$$I_{PI}$$
最后我们假设,我们这个$$U_{UP}$$ 矩阵乘法$$I_{PI}$$就是 我们上面得到的$$V_{UI}$$ ,也就是公式为
$$U_{UP}$$ *$$I_{PI}$$ = $$V_{UI}$$
假如我们能计算出来这个 $$U_{UP}$$和 $$I_{PI}$$,那么我们就基本上算出所以用户的评价了,这个就是ALS的原理,我们利用这个抽象能将原来的亿亿相乘的计算,改成亿与常数P的相乘的计算
当然具体的原理没有这么简单,涉及到很多矩阵运算的知识,我这里就不多说了,具体可以看下面给的论文《Collaborative Filtering for Implicit Feedback Datasets》,Spark的Mlib中的ALS实现就是基于这篇论文的,也可以参考一下Python的实现,公式推导比较复杂,但是实现还是非常好理解的
Python实战 #
首先我们来先试试使用Python来快速搭建我们的模型出来
{'account': '1920286160', 'content': '你的笑容灿烂了一整个夏天!', 'source': 'http://wp1.sina.cn/wap240/5d738637jw1dpmonay1thj.jpg', 'time': '2012-02-01 13:48:39', 'from': '网页版', 'comment': 0, 'forward': 0, 'forward_account': '1567852087', 'forward_content': '民浩君!! ','forward_id': 'xdasfd21x-s', 'forword_nickname': '全球流行风尚', 'is_forward_followed': '0'}
我们原始数据全部都是这种字符串,我们没有用户点赞、浏览数据,我们只有用户所发的所以微博数据,而且假如后面有forward_account
的值的话,说明这条微博是用户转发的,所以这里我们把用户转发当做一个用户“喜欢”这篇微博,我们系统就是要推荐更多微博给用户“转发”,所以我们把用户账号,和转发ID提取出来当做一次“记录”
我们把数据全部变成,user-id:forward_id
,字符对,然后我们导入到一个pandas
的DataFrame
里面去
我们这里使用一个Python 实现的ALS包implicit
,只需要把我们生成的DataFrame
转换成为一个压缩矩阵OK了,但是这里要注意的一点是我们所以的用户id和转发id都得变成Int型整数,最好的做法使用
df.user.astype('category').cat.codes
比如上面,我们把user
这个字符型数字转换成一个整数序号,这样我们得到的就是每个user
id的序号
完整的代码为下面
origin_data = [(x['forward_id'], x['account'], 1) for x in data if x.get('forward_id')] # data 为json数组
df = pd.DataFrame(origin_data, columns=['item', 'user', 'num'])
df['item'] = df.item.astype('category').cat.codes.astype('int64')
df['user'] = df.user.astype('category').cat.codes.astype('int64')
from scipy import sparse
# convert df to martrix
item_user_data = sparse.csr_matrix(df.values)
# train
import implicit
model = implicit.als.AlternatingLeastSquares(factors=50)
model.fit(item_user_data)
# recommend items for a user
user_items = item_user_data.T.tocsr()
# recommend user
model.recommend(1, user_items=user_items, N = 5)
只要简单的几行代码我们就实现了一个协同过滤的算法实现,从Python代码我们可以知道,最关键的地方就是生成训练数据,然后其他都交给算法了,接下来我们来尝试实现一个使用Spark加载全部数据,来实现对几十亿微博数据的推荐
Spark实战 #
在前面我们把3个T的微博数据的用户转发数据抽取出来,最后数据量在35亿左右,数据量在100G左右,这个数据量还是有的大的,首先内存肯定放不下,假如我们使用前面Python的方式来实现的话,我们的内存至少要200个G,所以我们得使用Spark的内存加磁盘运算来实现这个推荐系统
生成用户和转发ID的序列值 #
前面Python的实现100个G的数据排序,基本的原理就是Hash表进行排序,然而我们内存不足以放下这100G的数据,所以我们要使用MapReduce来“消化”这个数据,所以我们要借助Hive来进行一个排序,一方面我们可以生成用户和物品的唯一ID,等我们产生推荐结果后也可以通过HIVE来生成对应的数据
第一步我们将数据导入到HIVE中
首先创表
create table record(item string, account int, time int) row format delimited fields terminated by ',' LINES TERMINATED BY '\n' stored as textfile;
这里我还额外导入了一个时间进去,为的是方便以后做线上预测
然后我们把文件导入进去(文件存贮在/weibo/all_data.txt
中)
load data local inpath '/weibo/all_data.txt' into table record;
接下来我们生成用户表和物品表
create table items as select row_number() over (order by a.item) as id ,a.item from (select distinct item from record) a ;
create table users as select row_number() over (order by a.account) as id ,a.account from (select distinct account from record) a ;
我们使用HIVE的row_number
函数来生成一个唯一序列自增值,每一个值对应一个用户,最后我们生成我们训练表,最好使用HIVE2
来进行导入,因为HIVE1的row_number
会存贮所以的id值,由于我们数据量有35个亿,内存会爆炸,HIVE2
优化了使用一个流来存贮
create table train as
select account_id, item_id, count(*) as num from (select a.id as item_id, b.id as account_id from record
left join items a on record.item = a.item
left join users b on record.account = b.account) a group by item_id, account_id ;
接下来我们在Spark里面调用我们Mlib的ALS
模型,首先我们从HIVE中获取数据
val df = sql("select * from train")
然后我们从将数据导入ALS模型进行训练,同Python比较类似,我就直接给出代码
val ratings = df.map(row => Rating(row.getAs[Int]("item_id").toInt, row.getAs[Int]("account_id").toInt, row.getAs[Long]("num").toDouble)).rdd
val rank = 10
val numIterations = 50
val model = ALS.train(ratings, rank, numIterations, 0.01)
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"Mean Squared Error = $MSE")
最终我们这35亿数据用时4个半小时跑完模型并计算出MSE (均方误差),当然由于我的核心比较小,假如在一个大集群上面的话速度会快很多,值得注意的是,在Spark运行这35亿数据的时候,物理磁盘占用达到800多个G,所以在哪里省下来(内存)的就得从哪里还回来(磁盘)O(∩_∩)O哈哈~
总结 #
我们这次只是完成了一个非常简陋的推荐系统,只是具备简单的推荐功能,我们还没有考虑到一些“冷启动”问题,一个“智能”的推荐系统需要考虑到很多方面,我的推荐系统还有很多地方需要完善,但是从我的这个Demo里面也可以看到其实大数据也是非常“简单”的,由于数据量太大,你只能使用最简洁的代码来训练你的模型,你的算法要最大可能性满足分布式要求,推荐系统最核心的地方还是如何将你的推荐算法变成一个优秀的分布式算法,我们这篇博客只是走马观花的谈了一下它的原理,看看有空好好研究一下Spark分布式算法是怎么实现的。