查看原文
其他

大数据入门:Spark+Kudu的广告业务项目实战笔记(三)

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!

Spark+Kudu的广告业务项目实战系列:
Spark+Kudu的广告业务项目实战笔记(一)

这章的主要目标是对前两个需求的代码进行重构。

1.重构思路

2.trait

Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。

与接口不同的是,它还可以定义属性和方法的实现。

一般情况下Scala的类只能够继承单一父类,但是如果是 Trait(特征) 的话就可以继承多个,从结果来看就是实现了多重继承。

package com.imooc.bigdata.cp08.`trait`
import org.apache.spark.sql.SparkSession
//顶层数据处理接口trait DataProcess {
def process(spark:SparkSession)
}

3.Processor

3.1 需求一:ETL的Processor

package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcessimport com.imooc.bigdata.cp08.utils.{IPUtils, KuduUtils, SQLUtils, SchemaUtils}import org.apache.spark.sql.SparkSession
object LogETLProcessor extends DataProcess{ override def process(spark: SparkSession): Unit = {
//使用DataSourceAPI直接加载json数据 var jsonDF = spark.read.json("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\data-test.json") //jsonDF.printSchema() //jsonDF.show(false)
//导入隐式转换 import spark.implicits._ //加载IP库,建议将RDD转成DF val ipRowRDD = spark.sparkContext.textFile("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\ip.txt") val ipRuleDF = ipRowRDD.map(x => { val splits = x.split("\\|") val startIP = splits(2).toLong val endIP = splits(3).toLong val province = splits(6) val city = splits(7) val isp = splits(9)
(startIP, endIP, province, city, isp) }).toDF("start_ip", "end_ip", "province", "city", "isp") //ipRuleDF.show(false)
//利用Spark SQL UDF转换json中的ip import org.apache.spark.sql.functions._ def getLongIp() = udf((ip:String)=>{ IPUtils.ip2Long(ip) })
//添加字段传入十进制IP jsonDF = jsonDF.withColumn("ip_long", getLongIp()($"ip"))
//将日志每一行的ip对应省份、城市、运行商进行解析 //两个DF进行join,条件是:json中的ip在规则ip中的范围内 // val result = jsonDF.join(ipRuleDF, jsonDF("ip_long") // .between(ipRuleDF("start_ip"), ipRuleDF("end_ip"))) // //.show(false)
//用SQL的方式完成 jsonDF.createOrReplaceTempView("logs") ipRuleDF.createOrReplaceTempView("ips") val sql = SQLUtils.SQL val result = spark.sql(sql) //.show(false)
//Kudu val masterAddresses = "hadoop000" val tableName = "ods" val partitionId = "ip" val schema = SchemaUtils.ODSSchema
KuduUtils.sink(result,tableName,masterAddresses,schema,partitionId) spark.read.format("org.apache.kudu.spark.kudu") .option("kudu.master",masterAddresses) .option("kudu.table",tableName) .load().show()
}
}

3.2 需求二:ProvinceCityStatProcessor

package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcessimport com.imooc.bigdata.cp08.utils.{KuduUtils, SQLUtils, SchemaUtils}import org.apache.spark.sql.SparkSession
object ProvinceCityStatProcessor extends DataProcess{ override def process(spark: SparkSession): Unit = {
//从Kudu的ods表中读取数据,然后按照省份和城市分组即可 val sourceTableName = "ods" val masterAddress = "hadoop000"
val odsDF = spark.read.format("org.apache.kudu.spark.kudu") .option("kudu.table", sourceTableName) .option("kudu.master", masterAddress) .load() //odsDF.show(false)
odsDF.createOrReplaceTempView("ods") val result = spark.sql(SQLUtils.PROVINCE_CITY_SQL) //result.show(false)
//Kudu val sinkTableName = "province_city_stat" val partitionId = "provincename" val schema = SchemaUtils.ProvinceCitySchema

KuduUtils.sink(result,sinkTableName,masterAddress,schema,partitionId) spark.read.format("org.apache.kudu.spark.kudu") .option("kudu.master",masterAddress) .option("kudu.table",sinkTableName) .load().show() }}

4.项目入口

package com.imooc.bigdata.cp08
import com.imooc.bigdata.cp08.business.{LogETLProcessor, ProvinceCityStatProcessor}import org.apache.spark.sql.SparkSession
//整个项目的入口object SparkApp {
def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("SparkApp") .getOrCreate()
//ETL LogETLProcessor.process(spark) //省份 ProvinceCityStatProcessor.process(spark)
spark.stop() }
}

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

本文编辑:冷眼丶

微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连



文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存