领取MOLI红包
栏目分类
StorageChain中文网

当前位置:ESE中文网 > StorageChain中文网 >

热点资讯

Spark处理trick总结分析

发布日期:2025-01-03 18:26    点击次数:139

前言 最近做了很多数据清洗以及摸底的工作,由于处理的数据很大,所以采用了spark进行辅助处理,期间遇到了很多问题,特此记录一下,供大家学习。 由于比较熟悉python, 所以笔者采用的是pyspark,所以下面给的demo都是基于pyspark,其实其他语言脚本一样,重在学习思想,具体实现改改对应的API即可。 这里尽可能的把一些坑以及实现技巧以demo的形式直白的提供出来,顺序不分先后。有了这些demo,大家在实现自己各种各样需求尤其是一些有难度需求的时候,就可以参考了,当然了有时间笔者后续还会更新一些demo,感兴趣的同学可以关注下。 trick 首先说一个最基本思想:能map绝不reduce。 换句话说当在实现某一需求时,要尽可能得用map类的算子,这是相当快的。但是聚合类的算子通常来说是相对较慢,如果我们最后不得不用聚合类算子的时候,我们也要把这一步逻辑看看能不能尽可能的往后放,而把一些诸如过滤什么的逻辑往前放,这样最后的数据量就会越来越少,再进行聚合的时候就会快很多。如果反过来,那就得不偿失了,虽然最后实现的效果是一样的,但是时间差却是数量级的。 常用API 这里列一下我们最常用的算子 filter: 过滤,满足条件的返回True, 需要过滤的返回False。 map: 每条样本做一些共同的操作。 flatMap: 一条拆分成多条返回,具体的是list。 reduceByKey: 根据key进行聚合。 聚合 一个最常见的场景就是需要对某一个字段进行聚合:假设现在我们有一份流水表,其每一行数据就是一个用户的一次点击行为,那现在我们想统计一下每个用户一共点击了多少次,更甚至我们想拿到每个用户点击过的所有item集合。伪代码如下: 首先我们先通过get_key_value函数将每条数据转化成(key, value)的形式,然后通过reduceByKey聚合算子进行聚合,它就会把相同key的数据聚合在一起,说到这里,大家可能不觉得有什么?这算什么trick!其实笔者这里想展示的是get_key_value函数返回形式:[item] 。 为了对比,这里笔者再列一下两者的区别: 可以看到第一个的value是一个列表,而第二个就是单纯的item,我们看reduceByKey这里我们用的具体聚合形式是相加,列表相加就是得到一个更大的列表即: 所以最后我们就拿到了:每个用户点击过的所有item集合,具体的是一个列表。 抽样、分批 在日常中我们需要抽样出一部分数据进行数据分析或者实验,甚至我们需要将数据等分成多少份,一份一份用(后面会说),这个时候怎么办呢? 当然了spark也有类似sample这样的抽样算子 那其实我们也可以实现,而且可以灵活控制等分等等且速度非常快,如下: 假设我们需要抽取1/10的数据出来,总的思路就是先给每个样本打上一个[1,10]的随机数,然后只过滤出打上1的数据即可。 以此类推,我们还可以得到3/10的数据出来,那就是在过滤的时候,取出打上[1,2,3]的即可,当然了[4,5,6]也行,只要取三个就行。 笛卡尔积 有的时候需要在两个集合之间做笛卡尔积,假设这两个集合是A和B即两个rdd。 首先spark已经提供了对应的API即cartesian,具体如下: 其更具体的用法和返回形式大家可以找找相关博客,很多,笔者这里不再累述。 但是其速度非常慢 尤其当rdd_A和rdd_B比较大的时候,这个时候怎么办呢? 这个时候我们可以借助广播机制,其实已经有人也用了这个trick: https://www.jb51.net/article/203197.htm 首先说一下spark中的广播机制,假设一个变量被申请为了广播机制,那么其实是缓存了一个只读的变量在每台机器上,假设当前rdd_A比较小,rdd_B比较大,那么我可以把rdd_A转化为广播变量,然后用这个广播变量和每个rdd_B中的每个元素都去做一个操作,进而实现笛卡尔积的效果,好了,笔者给一下pyspark的实现: 可以看到我们先把rdd_A转化为广播变量,然后通过flatMap,将rdd_A和所有rdd_B中的单个元素进行操作,具体是什么操作大家可以在ops函数中自己定义自己的逻辑。 关于spark的广播机制更多讲解,大家也可以找找文档,很多的,比如: https://www.cnblogs.com/Lee-yl/p/9777857.html 但目前为止,其实还没有真真结束,从上面我们可以看到,rdd_A被转化为了广播变量,但是其有一个重要的前提:那就是rdd_A比较小。但是当rdd_A比较大的时候,我们在转化的过程中,就会报内存错误,当然了可以通过增加配置: 但是如果rdd_A还是极其大呢?换句话说rdd_A和rdd_B都是非常大的,哪一个做广播变量都是不合适的,怎么办呢? 其实我们一部分一部分的做。假设我们把rdd_A拆分成10份,这样的话,每一份的量级就降下来了,然后把每一份转化为广播变量且都去和rdd_B做笛卡尔积,最后再汇总一下就可以啦。 有了想法,那么怎么实现呢? 分批大家都会了,如上。但是这里面会有另外一个问题,那就是这个广播变量名会被重复利用,在进行下一批广播变量的时候,需要先销毁,再创建,demo如下: 可以看到,最主要的就是unpersist() 广播变量应用之向量索引 说到广播机制,这里就再介绍一个稍微复杂的demo,乘热打铁。 做算法的同学,可能经常会遇到向量索引这一场景:即每一个item被表征成一个embedding,然后两个item的相似度便可以基于embedding的余弦相似度进行量化。向量索引是指假设来了一个query,候选池子里面假设有几百万的doc,最终目的就是要从候选池子中挑选出与query最相似的n个topk个doc。 关于做大规模数量级的索引已经有很多现成好的API可以用,最常见的包比如有faiss。如果还不熟悉faiss的同学,可以先简单搜一下其基本用法,看看demo,很简单。 好啦,假设现在query的量级是10w,doc的量级是100w,面对这么大的量级,我们当然是想通过spark来并行处理,加快计算流程。那么该怎么做呢? 这时我们便可以使用spark的广播机制进行处理啦,而且很显然doc应该是广播变量,因为每一个query都要和全部的doc做计算。 废话不多说,直接看实现 首先建立doc索引: 这里的index_embedding_rdd就是doc的embedding,可以看到先要collect,然后建立索引。 建立完索引后,就可以开始计算了,但是这里会有一个问题就是query的量级也是比较大的,如果一起计算可能会OM,所以我们分批次进行即batch: 假设query_embedding_rdd是全部query的embedding,为了实现batch,我们先将query_embedding_rdd进行分区repartition,然后每个batch进行,可以看到核心就是batch_get_nearest_ids这个函数: 从这里可以清楚的看到就是组batch,组够一个batch后就可以给当前这个batch内的query进行计算最相似的候选啦即__batch_get_nearest_ids这个核心函数: 这里就是真真的调用faiss的索引API进行召回啦,当然了batch_res这个就是结果,自己可以想怎么定义都行,笔者这里不仅返回了召回的item,还返回了query自身的一些信息。 注意点 在map的时候,不论是self的类成员还是类方法都要放到外面,不要放到类里面,不然会报错 总结 总之,在用spark做任何需求之前,一定要牢记能map就map,尽量不要聚合算子,实在不行就尽可能放到最后。 以上就是Spark处理trick总结分析的详细内容,更多关于Spark处理trick的资料请关注脚本之家其它相关文章!

友情链接:

Powered by ESE中文网 @2013-2022 RSS地图 HTML地图

Copyright Powered by站群系统 © 2013-2024