查看原文
其他

ML&DEV[15] | pyspark杂记

机智的叉烧 CS的陋室 2022-08-08

【ML&DEV】


这是大家没有看过的船新栏目!ML表示机器学习,DEV表示开发,本专栏旨在为大家分享作为算法工程师的工作,机器学习生态下的有关模型方法和技术,从数据生产到模型部署维护监控全流程,预备知识、理论、技术、经验等都会涉及,近期内容以入门线路为主,敬请期待!


往期回顾:

pyspark杂记

pyspark最近才get起来,和很多人相比应该算晚的哈哈哈,看了很多资料,但是总感觉现在的教程什么的都比较局限,几乎都只是在聊wordcount,导致很多东西没谈到精髓,这些记录是基础教程的一些补充,主要集中在rdd上,按照我的风格,依旧是尽可能少写一些和网络上重复的东西吧,所以单独做教程效果可能不好,结合着作为参考可能效果更好。

常用的几种rdd函数

我打算用表格的方式给大家列举。下面的语言会比较直白,这里并不全,想要看完整版建议大家还是直接去看API文档。

分类函数解释
基操map遍历每行做处理,返回的是每行整行的结果。
基操reducemapreduce经典搭档,map做合并,reduce做聚合。
基操flatmap一行变多行,返回结果会和其他行的做concat。
基操filter过滤,括号里是满足条件的留下
基操collect聚集,想要print的话必须得要这步。
集合操作union并集。
集合操作intersection交集。
集合操作substract差集。
bykey系列reduceByKey合并具有相同key的值
bykey系列combineByKey合并具有相同key的值,但是灵活性会更高
bykey系列groupByKey相同key进行合并
rdd合并substractByKey差集
rdd合并join内连接
rdd合并rightOuterJoin右外连接
rdd合并leftOuterJoin左外连接

必要的记录

基本使用范式

pyspark内rdd大部分操作内部传入的其实都是函数,一般的使用方法都是lambda,举个例子其实就是这样

rdd_process.map(lambda s: [s[0],s[1]+1,s[2]])

这种方式对简单操作来说是比较方便,但是对于复杂函数就比较痛苦了,所以我的习惯一般是这样子。

def add_1(s):
    return [s[0],s[1]+1,s[2]]

rdd_process.map(add_1)

传入的函数插件化操作起来会更快,不影响流水线的操作,另外当函数多的时候,其实可以单独建立工具类来统一管理。

bykey的理解

pyspark的bykey,是要求输入的rdd是二维的,即第一列是key,第二列是value,所谓的bykey实质上就是根据这个key来进行的操作,至于value,就是bykey里面操作的主体,也就是输入。来一个简单例子的说一下。

rdd应该是这个格式:rdd=[[1,{"1":10}],[1,{"1":20,"2":30}],[2,{"1":20}]],这里一共有3行数据,key分别是1,1,2,而value是一个在python里面是dict的结构。以reducebykey为例,看看可以怎么玩:

def add_key(s1,s2):
    for key2 in s2:
        if key2 in s1:
            s1[key2] = s1[key2] + s2[key2]
        else:
            s1[key2] = s2[key2]
    return s1
rdd.reduceByKey(add_key)

最终的输出就是rdd=[[1,{"1":30,"2":30}],[2,{"1":20}]],可以自己看看里面的操作哈~

combinebykey详解

combine可以说是pyspark从api层面最难理解的东西了,毕竟这里面有3个函数,先来看看API文档是怎么说的:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

3个参数,3个都是函数类型的函数。但其实这3个非常好理解,我来这么说看大家能不能明白。

combinebykey,就是按照key来进行聚合,问题是,value的格式是用户自定义的,因此要定义好怎么怎么聚合。

  • createCombiner。聚合第一步,需要初始化最终输出的结构,没错这里其实就定义了一个“最终输出结构”。
  • mergeValue。在初始化的结构基础上,需要定义每一条数据要合并到这个“最终输出结构”的规则。
  • mergeCombiners。mergeValue是指上可以理解为每一个分片下处理的,那么每个分片的合并,其实就是这个“最终输出结构”的合并了,这里定义的是两个“最终输出结构”的合并。

好了,开始看看,例如现在我们要统计文章的词频,但是我们拿到的rdd结构是这样的:

[
    ["1","的"],
    ["1","的"],
    ["2","今天"],
    ["1","你"]
]

现在我们需要进行合并,于是可以这么定义:

def createCombiner(item):
    return {item:1}

def mergeValue(aggregated,item1):
    if item1 in aggregated:
        aggregated[item1] = aggregated[item1] + 1
    else:
        aggregated[item1] = 1
    return aggregated

def mergeCombiners(aggr1,aggr2):
    for item2 in aggr2:
        if item2 in aggr1:
            aggr1[item2] = aggr1[item2] + aggr2[item2]
        else:
            aggr1[item2] = aggr2[item2]
    return aggr1

rdd.combineByKey(createCombiner,mergeValue,mergeCombiners)

可算琢磨明白了。

reduce/combine/group

这3个ByKey其实非常接近,有很多人尝试讨论他的区别,这里我不赘述,把有关文章放在这里,有兴趣的可以传送过去。

  • https://www.cnblogs.com/wwcom123/p/10398392.html

简单说说结论,前两者速度比group快,而reduce比combine在开发上更简单,但是combine灵活性更高。

学习小结

由于项目的需求,我也是紧急学紧急用,其实学的时间总共不够一天,但是好在一方面有前人的代码,可以参照着写,另一方面就是靠边干边学了,网络的资料很多都不是很靠谱,抄袭和重复的很多,只能靠自己去尝试,这点非常不容易。这里提几个点吧:

  • 多看官方文档。
  • 博客协助理解还是有点用的,多看几篇,百度谷歌必应我都会去翻。
  • 自己多动手,站在岸上学不会游泳。

剩下的就加油吧,奥利给~



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存