查看原文
其他

来看看一个大二学生的Spark练习题

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!


今天查资料的时候看到一个朋友的博客写的很好,加了好友,对方表示大二的学生,写的Spark的练习题非常接地气并且很适合练手,大家可以看看。


一、基础练习题

首先让我们准备好该题所需的数据 test.txt

数据结构如下依次是:班级 姓名 年龄 性别 科目 成绩

12 宋江 25 男 chinese 5012 宋江 25 男 math 6012 宋江 25 男 english 7012 吴用 20 男 chinese 5012 吴用 20 男 math 5012 吴用 20 男 english 5012 杨春 19 女 chinese 7012 杨春 19 女 math 7012 杨春 19 女 english 7013 李逵 25 男 chinese 6013 李逵 25 男 math 6013 李逵 25 男 english 7013 林冲 20 男 chinese 5013 林冲 20 男 math 6013 林冲 20 男 english 5013 王英 19 女 chinese 7013 王英 19 女 math 8013 王英 19 女 english 70

题目如下:

1. 读取文件的数据test.txt

2. 一共有多少个小于20岁的人参加考试?

3. 一共有多少个等于20岁的人参加考试?

4. 一共有多少个大于20岁的人参加考试?

5. 一共有多个男生参加考试?

6. 一共有多少个女生参加考试?

7. 12班有多少人参加考试?

8. 13班有多少人参加考试?

9. 语文科目的平均成绩是多少?

10. 数学科目的平均成绩是多少?

11. 英语科目的平均成绩是多少?

12. 每个人平均成绩是多少?

13. 12班平均成绩是多少?

14. 12班男生平均总成绩是多少?

15. 12班女生平均总成绩是多少?

16. 13班平均成绩是多少?

17. 13班男生平均总成绩是多少?

18. 13班女生平均总成绩是多少?

19. 全校语文成绩最高分是多少?

20. 12班语文成绩最低分是多少?

21. 13班数学最高成绩是多少?

22. 总成绩大于150分的12班的女生有几个?

23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?

答案在这里:

object test {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("test") val sc = new SparkContext(config)
// 1.读取文件的数据test.txt // 返回包含所有行数据的列表 val data: RDD[String] = sc.textFile("E:\\2020大数据新学年\\BigData\\05-Spark\\0403\\test.txt")
//val value: RDD[Array[String]] = sc.makeRDD(List("12 宋江 25 男 chinese 50")).map(x=>x.split(" "))
// 2. 一共有多少个小于20岁的人参加考试?2 val count1: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt<20).groupBy(_(1)).count()

// 3. 一共有多少个等于20岁的人参加考试?2 val count2: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt==20).groupBy(_(1)).count()
// 4. 一共有多少个大于20岁的人参加考试?2 val count3: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>20).groupBy(_(1)).count()
// 5. 一共有多个男生参加考试?4 val count4: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("男")).groupBy(_(1)).count()
// 6. 一共有多少个女生参加考试?2 val count5: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("女")).groupBy(_(1)).count()
// 7. 12班有多少人参加考试?3 val count6: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12")).groupBy(_(1)).count()
// 8. 13班有多少人参加考试?3 val count7: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13")).groupBy(_(1)).count()
// 9. 语文科目的平均成绩是多少?58.333333333333336 val mean1 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("chinese")).map(x=>x(5).toInt).mean()
// 10. 数学科目的平均成绩是多少?63.333333333333336 val mean2 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("math")).map(x=>x(5).toInt).mean()
// 11. 英语科目的平均成绩是多少?63.333333333333336 val mean3 = data.map(x=>x.split(" ")).filter(x=>x(4).equals("english")).map(x=>x(5).toInt).mean()
// 12. 每个人平均成绩是多少? //(王英,73) //(杨春,70) //(宋江,60) //(李逵,63) //(吴用,50) //(林冲,53) val every_socre: RDD[(String, Any)] = data.map(x=>x.split(" ")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum /t._2.size))
// 13. 12班平均成绩是多少?60.0 var mean5 = data.map(x => x.split(" ")).filter(x => x(0).equals("12")).map(x => x(5).toInt).mean()
// 14. 12班男生平均总成绩是多少?165.0 // (宋江,180) // (吴用,150) val boy12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()
// 15. 12班女生平均总成绩是多少?210.0 // (杨春,210) val girl12_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()
// 16. 13班平均成绩是多少?63.333333333333336 var mean8 = data.map(x => x.split(" ")).filter(x => x(0).equals("13")).map(x => x(5).toInt).mean()
// 17. 13班男生平均总成绩是多少?175.0 //(李逵,190) //(林冲,160) val boy13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("男")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()
// 18. 13班女生平均总成绩是多少? //(王英,220) val girl13_avgsocre: Double = data.map(x=>x.split(" ")).filter(x=>x(0).equals("13") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).map(x=>x._2).mean()
// 19. 全校语文成绩最高分是多少?70 var max1 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese")).map(x => x(5).toInt).max()
// 20. 12班语文成绩最低分是多少?50 var max2 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese") && x(0).equals("12")).map(x => x(5).toInt).min()
// 21. 13班数学最高成绩是多少?80 var max3 = data.map(x => x.split(" ")).filter(x => x(4).equals("math") && x(0).equals("13")).map(x => x(5).toInt).max()
// 22. 总成绩大于150分的12班的女生有几个?1 //(杨春,210) val count12_gt150girl: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()
// 23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少? //val countall: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>=19 && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count() val complex1 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)}) //(13,李逵,男 , 60) val complex2 = data.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(2)+","+line(3)+","+line(4),line(5).toInt)}) //(12,宋江,男,chinese , 50)
// 过滤出总分大于150的,并求出平均成绩 (13,李逵,男,(60,1)) (13,李逵,男,(190,3)) 总成绩大于150 (13,李逵,男,63) val com1: RDD[(String, Int)] = complex1.map(x=>(x._1,(x._2,1))).reduceByKey((a, b)=>(a._1+b._1,a._2+b._2)).filter(a=>(a._2._1>150)).map(t=>(t._1,t._2._1/t._2._2)) // 注意:reduceByKey 自定义的函数 是对同一个key值的value做聚合操作 //(12,杨春,女 , 70) //(13,王英,女 , 73) //(12,宋江,男 , 60) //(13,林冲,男 , 53) //(13,李逵,男 , 63)
//过滤出 数学大于等于70,且年龄大于等于19岁的学生 filter方法返回一个boolean值 【数学成绩大于70并且年龄>=19】 为了将最后的数据集与com1做一个join,这里需要对返回值构造成com1格式的数据 val com2: RDD[(String, Int)] = complex2.filter(a=>{val line = a._1.split(",");line(4).equals("math") && a._2>=70 && line(2).toInt>=19}).map(a=>{val line2 = a._1.split(",");(line2(0)+","+line2(1)+","+line2(3),a._2.toInt)}) //(12,杨春,女 , 70) //(13,王英,女 , 80)
// val common: RDD[(String, (Int, Int))] = com1.join(com2) // common.foreach(println) // (12,杨春,女 , (70,70)) // (13,王英,女 , (73,80))
// 使用join函数聚合相同key组成的value元组 // 再使用map函数格式化元素 val result = com1.join(com2).map(a =>(a._1,a._2._1)) //(12,杨春,女,70) //(13,王英,女,73) //到这里就大功告成了!!!!!!!!!!
}}


二、基础练习题


题目如下:

以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论:

题目如下:

1. 在kafak中创建rng_comment主题,设置2个分区2个副本

2. 数据预处理,把空行和缺失字段的行过滤掉

3. 请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区

4. 使用Spark Streaming对接kafka

5. 使用Spark Streaming对接kafka之后进行计算


在mysql中创建一个数据库rng_comment

在数据库rng_comment创建vip_rank表,字段为数据的所有字段

在数据库rng_comment创建like_status表,字段为数据的所有字段

在数据库rng_comment创建count_conmment表,字段为 时间,条数


6. 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

7. 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

8. 分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

答案在这里:

1. 创建Topic

在命令行窗口执行Kafka创建Topic的命令,并指定对应的分区数和副本数

/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 2 --topic rng_comment 

2. 读取文件,并对数据做过滤并输出到新文件

object test01_filter { def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()
val sc: SparkContext = spark.sparkContext
// 读取数据 //testFile是多行数据 val rddInfo: RDD[String] = sc.textFile("E:\\rng_comment.txt")
// 对数据进行一个过滤 val RNG_INFO: RDD[String] = rddInfo.filter(data => {
// 判断长度:将每行的内容用tab键切割,判断最后的长度 // 判读是否为空字符: trim之后不为empty data.split("\t").length == 11 && !data.trim.isEmpty
})

// // 如果想直接将数据写入到Kafka,而不通过输出文件的方式// val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)//// def saveToKafka(INFO:RDD[String]): Unit ={//// try {//// INFO.foreach(x=>{// val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("rng_test",x.split("\t")(0),x.toString)//// kafkaProducer.send(record)// })//// }catch {// case e:Exception => println("发送数据出错:"+e)// }//// }
// 导入隐式转换 // 将RDD转换成DF import spark.implicits._ val df: DataFrame = RNG_INFO.toDF()
// 输出数据【默认分区数为2,这里我们指定分区数为1】 df.repartition(1).write.text("E:\\outputtest")
// 关闭资源 sc.stop() spark.stop()
}}


3. 读取新文件,将数据按照题意发送到Kafka的不同分区

需要先写一个实现自定义分区逻辑的java类

/*编写自定义分区逻辑 */public class ProducerPartition implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/* 编写自定义分区代码 */ //System.out.println(value.toString()); String[] str = value.toString().split("\t");
// 由题意可得,id为奇数的发送到一个分区中,偶数的发送到另一个分区 if (Integer.parseInt(str[0]) % 2 == 0){ return 0; }else { return 1; }
}
@Override public void close() {
}
@Override public void configure(Map<String, ?> configs) {
}}

然后在下面的程序中引用分区类的类路径

public class test02_send {
/* 程序的入口 */ public static void main(String[] args) throws IOException {
//编写生产数据的程序
//1、配置kafka集群环境(设置) Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消息确认机制 props.put("acks", "all"); //重试机制 props.put("retries", 0); //批量发送的大小 props.put("batch.size", 16384); //消息延迟 props.put("linger.ms", 1); //批量的缓冲区大小 props.put("buffer.memory", 33554432); // kafka key 和value的序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 根据题意得,需要自定义分区 props.put("partitioner.class", "com.czxy.scala.demo12_0415.han.ProducerPartition");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// 指定需要读取的文件 File file = new File("E:\\outputtest\\part-00000-fe536dc7-523d-4fdd-b0b5-1a045b8cb1ab-c000.txt");
// 创建对应的文件流,进行数据的读取 FileInputStream fileInputStream = new FileInputStream(file); // 指定编码格式进行读取 InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8"); // 创建缓冲流 BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
// 创建一个变量,用来保存每次读取的数据 String tempString = null;
// 循环遍历读取文件内容 while ((tempString = bufferedReader.readLine()) != null) {
// 利用kafka对象发送数据 kafkaProducer.send(new ProducerRecord<>("rng_comment", tempString));
// 发送完成之后打印数据 System.out.println("已发送:" + tempString); }
System.out.println("数据发送完毕!");
// 关闭kafka数据生产者 kafkaProducer.close();
}}


4. 先在数据库中创建好接收数据需要用到的表

create table vip_rank( `index` varchar(100) null comment '数据id', child_comment varchar(100) null comment '回复数量', comment_time DATE null comment '评论时间', content TEXT null comment '评论内容', da_v varchar(100) null comment '微博个人认证', like_status varchar(100) null comment '赞', pic varchar(100) null comment '图片评论url', user_id varchar(100) null comment '微博用户id', user_name varchar(100) null comment '微博用户名', vip_rank int null comment '微博会员等级', stamp varchar(100) null comment '时间戳');create table like_status( `index` varchar(100) null comment '数据id', child_comment varchar(100) null comment '回复数量', comment_time DATE null comment '评论时间', content varchar(10000) null comment '评论内容', da_v varchar(100) null comment '微博个人认证', like_status varchar(100) null comment '赞', pic varchar(100) null comment '图片评论url', user_id varchar(100) null comment '微博用户id', user_name varchar(100) null comment '微博用户名', vip_rank int null comment '微博会员等级', stamp varchar(100) null comment '时间戳');create table count_comment( time DATE null comment '时间', count int null comment '出现的次数', constraint rng_comment_pk primary key (time));

5. 使用Spark Streaming对接kafka之后进行计算

下面的代码完成了:

查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中

查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

object test03_calculate {

/* 将数据从kafka集群中读取,并将数据做进一步的处理过后,写入到mysql数据库中 */ def ConnectToMysql() ={
// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码 DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8", "root", "root") }
/** * 将数据写入到MySQL的方法 * @param tableName 表名 * @param data List类型的数据 */ def saveDataToMysql(tableName:String,data:List[String]): Unit ={
// 获取连接 val connection: Connection = ConnectToMysql() // 创建一个变量用来保存sql语句 val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)" // 将数据存入到mysql中 val ps: PreparedStatement = connection.prepareStatement(sql) ps.setString(1,data.head) ps.setString(2,data(1)) ps.setString(3,data(2)) ps.setString(4,data(3)) ps.setString(5,data(4)) ps.setString(6,data(5)) ps.setString(7,data(6)) ps.setString(8,data(7)) ps.setString(9,data(8)) ps.setString(10,data(9)) ps.setString(11,data(10))
// 提交[因为是插入数据,所以这里需要更新] ps.executeUpdate() // 关闭连接 connection.close()
}

def main(args: Array[String]): Unit = {
//1 创建sparkConf var conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1") //2 创建一个sparkcontext var sc = new SparkContext(conf) sc.setLogLevel("WARN") //3 创建streamingcontext var ssc = new StreamingContext(sc,Seconds(3))
//设置kafka对接参数 var kafkaParams= Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "SparkKafkaDemo", //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费 "auto.offset.reset" -> "earliest", //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护 "enable.auto.commit" -> (false: java.lang.Boolean) )
// 设置检查点的位置 ssc.checkpoint("sparkstreaming/")
//kafkaDatas 含有key和value //key是kafka成产数据时指定的key(可能为空) //value是真实的数据(100%有数据) val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, //设置位置策略 均衡 LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))
kafkaDatas.foreachRDD(rdd=>rdd.foreachPartition(line=>{
// 遍历每一个分区的数据 for (row <- line){
// 获取到行数据组成的array数组 val str: Array[String] = row.value().split("\t")
// 将数据转成List集合 val list: List[String] = str.toList
/* 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 */ if (list(9).equals("5")){ // 调用方法,将集合数据写入到指定的表中 saveDataToMysql("vip_rank",list) }
/* 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 */ if (Integer.parseInt(list(5))>10){ saveDataToMysql("like_status",list) }
}
}))

//5 开启计算任务 ssc.start() //6 等待关闭 ssc.awaitTermination() } }

运行成功后的效果

vip_rank

like_status

下面的代码完成了:
分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

object test04_count {

def ConnectToMysql() ={
// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码 DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_test?characterEncoding=UTF-8", "root", "root")
}
/** * 将数据存入到mysql中 * * @param time 时间 * @param count 数量 */ def saveDataToMysql(time: String, count: Int): Unit = { println(s"$time\t $count") if (time.contains("2018/10/20") || time.contains("2018/10/21") || time.contains("2018/10/22") || time.contains("2018/10/23")) { //获取连接 val connection: Connection = ConnectToMysql() //创建一个变量用来保存sql语句 val sql: String = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?" //将一条数据存入到mysql val ps: PreparedStatement = connection.prepareStatement(sql) ps.setString(1, time) ps.setInt(2, count) ps.setInt(3, count)
//提交 ps.executeUpdate() //关闭连接 connection.close() } }

def main(args: Array[String]): Unit = {

//1 创建sparkConf var conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1") //2 创建一个sparkcontext var sc: SparkContext =new SparkContext(conf) sc.setLogLevel("WARN") //3 创建StreamingContext var ssc: StreamingContext =new StreamingContext(sc,Seconds(5)) //设置缓存数据的位置 ssc.checkpoint("./TmpCount")
// 设置kafka的参数 var kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", // 集群位置 "key.deserializer" -> classOf[StringDeserializer], // key序列化标准 "value.deserializer" -> classOf[StringDeserializer], // value序列化标准 "group.id" -> "SparkKafkaDemo", // 分组id //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费 "auto.offset.reset" -> "earliest", //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护 "enable.auto.commit" -> (false: java.lang.Boolean)
)
// 接收Kafka的数据并根据业务逻辑进行计算 val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String]( ssc, // StreamingContext对象 LocationStrategies.PreferConsistent, // 位置策略 ConsumerStrategies.Subscribe[String,String](Array("rng_comment"),kafkaParams) // 设置需要消费的topic和kafka参数
)
// 2018/10/23 16:09 需要先获取到下标为2的数据,再按照空格进行切分,获取到年月日即可 val kafkaWordOne: DStream[(String, Int)] = kafkaDatas.map(z=>z.value().split("\t")(2).split(" ")(0)).map((_,1))
// 更新数据 val wordCounts: DStream[(String, Int)] = kafkaWordOne.updateStateByKey(updateFunc)
// 遍历RDD wordCounts.foreachRDD(rdd=>rdd.foreachPartition(line=>{
for(row <- line){
saveDataToMysql(row._1,row._2) //println("保存成功!") }
}))
println("完毕!")
// 开启计算任务 ssc.start()
// 等待关闭 ssc.awaitTermination()
}
//currentValues:当前批次的value值,如:1,1,1 (以测试数据中的hadoop为例) //historyValue:之前累计的历史值,第一次没有值是0,第二次是3 //目标是把当前数据+历史数据返回作为新的结果(下次的历史数据) def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={ // currentValues当前值 // historyValue历史值 val result: Int = currentValues.sum + historyValue.getOrElse(0)
Some(result)
}}


运行成功后的效果

count_comment


欢迎点赞+收藏+转发朋友圈素质三连


文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存