100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > spark大数据分析:spark Struct Strreaming(21) 数据流处理

spark大数据分析:spark Struct Strreaming(21) 数据流处理

时间:2023-01-08 02:12:58

相关推荐

spark大数据分析:spark Struct Strreaming(21) 数据流处理

文章目录

根据文件生成工作流根据kafka生成工作流以kafka为数据源,通过Batch模式生成工作流根据指定速率生成工作流

根据文件生成工作流

当文件夹命名为"key=value"形式时,Struct Strreaming会自动遍历当前文件夹下的子文件,根据文件名实现自动分区

package structimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.Triggerimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object StructStream02 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[*]").appName("Chapter9_4_1").getOrCreate()spark.sparkContext.setLogLevel("WARN")val userSchema = StructType(List(StructField("name", StringType, nullable = false),StructField("sex", StringType, nullable = false),StructField("age", IntegerType, nullable = false)))spark.readStream.format("csv").schema(userSchema).load("D://a.txt")val result = spark.sql("SELECT sex, AVG(age) FROM t_user GROUP BY sex")val query = result.writeStream.outputMode("complete").trigger(Trigger.ProcessingTime(0)).format("console").start()query.awaitTermination()}}

根据kafka生成工作流

kafka默认无须自己管理偏移量,在不设置checkPoint的情况下,默认是最新偏移量开始读取数据,在设置checkPoint后,程序重启时,继续上一次的偏移量开始消费

package structimport org.apache.spark.sql.SparkSessionobject StructStream03 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[*]").appName("Chapter9_4_3").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val inputDataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "note01:9092,note02:9092,note03:9092").option("subscribe", "StructStream03").load()val keyValueDataset = inputDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]val subwayDataFrame = keyValueDataset.flatMap(t => {val arr = t._2.split(",")Array((arr(0), arr(1)), (arr(0), arr(2)))}).toDF("city", "station_in_or_out")subwayDataFrame.createTempView("t_subway")val result = spark.sql("SELECT city, station_in_or_out, count(1) as hot FROM t_subway GROUP BY city, station_in_or_out ORDER BY city, hot desc")val query = result.writeStream.outputMode("complete").format("console").option("checkpointLocation", "./StructStream03").start()query.awaitTermination()}}

以kafka为数据源,通过Batch模式生成工作流

这种模式一般需要设置消费时的起始偏移量和结束偏移量,在不设置CheckPoint情况下默认起始方式偏移量earlist,结束偏移量为latest,该模式为一次性作业,非持续性处理数据

package structimport org.apache.spark.sql.SparkSessionobject StructStream04 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[*]").appName("StructStream04").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val inputDataFrame = spark//batch方式.read.format("kafka").option("kafka.bootstrap.servers", "note01:9092,note02:9092,note03:9092").option("subscribe", "StructStream04").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()val keyValueDataset = inputDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]val subwayDataFrame = keyValueDataset.flatMap(t => {val arr = t._2.split(",")Array((arr(0), arr(1)), (arr(0), arr(2)))}).toDF("city", "station_in_or_out")subwayDataFrame.createTempView("t_subway")val result = spark.sql("SELECT city, station_in_or_out, count(1) as hot FROM t_subway GROUP BY city, station_in_or_out ORDER BY city, hot desc")val query = result.write.format("console").save()}}

根据指定速率生成工作流

package structimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.Triggerobject StructStream05 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[*]").appName("Chapter9_4_5").getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("WARN")val rows = spark.readStream.format("rate")//每秒处理条数.option("rowsPerSecond ", 10).option("rampUpTime ", 2).option("numPartitions ", 2).load()val query = rows.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(2000)).format("console").start()query.awaitTermination()}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。