100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > Flink在流处理上的Source和sink操作 Flink--sink到kafka

Flink在流处理上的Source和sink操作 Flink--sink到kafka

时间:2020-12-30 10:17:37

相关推荐

Flink在流处理上的Source和sink操作 Flink--sink到kafka

一、Flink在流处理上常见的Source和sink操作

flink在流处理上的source和在批处理上的source基本一致。大致有4大类

1.基于本地集合的source(Collection-based-source)

2.基于文件的source(File-based-source)

3.基于网络套接字的source(Socket-based-source)

4.自定义Kafka的source(Custom-source)

1、基于集合的source

object DataSource001 {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment//0.用element创建DataStream(fromElements)val ds0: DataStream[String] = senv.fromElements("spark", "flink")ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))ds2.print()//3.用ArrayBuffer创建DataStreamval ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List创建DataStreamval ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))ds4.print()//5.用List创建DataStreamval ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))ds5.print()//6.用Vector创建DataStreamval ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))ds6.print()//7.用Queue创建DataStreamval ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))ds8.print()//9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))//ds12.print()//13.用ArraySeq创建DataStreamval ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))ds13.print()//14.用ArrayStack创建DataStreamval ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))//ds15.print()//16.用Range创建DataStreamval ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))ds16.print()//17.用fromElements创建DataStreamval ds17: DataStream[Long] = senv.generateSequence(1, 9)ds17.print()senv.execute(this.getClass.getName)}}

2、基于文件的source(File-based-source)

//TODO 2.基于文件的source(File-based-source)//0.创建运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//TODO 1.读取本地文件val text1 = env.readTextFile("data2.csv")text1.print()//TODO 2.读取hdfs文件val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt")text2.print()env.execute()

3、基于网络套接字的source(Socket-based-source)

val source = env.socketTextStream("IP", PORT)

4、自定义的source(Custom-source,以kafka为例)

Kafka基本命令:

● 查看当前服务器中的所有topicbin/kafka-topics.sh --list --zookeeper hadoop01:2181● 创建topicbin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test● 删除topicsh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。● 通过shell命令发送消息sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test● 通过shell消费消息bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1● 查看消费位置bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup● 查看某个Topic的详情bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181● 对分区数进行修改kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic

使用flink消费kafka的消息(不规范,其实需要自己手动维护offset):

object DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request /B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'-07-12','infantnum':'2'}#CS#/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest = if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//5.触发运算env.execute("flink-kafka-wordcunt")}}//保存结构化数据case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)

二、Flink--sink到kafka

package com.flink.DataStreamobject DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"val sinkKafka = "test2"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName, new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request /B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'-07-12','infantnum':'2'}#CS#/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest = if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//TODO sink到kafkaval p: Properties = new Propertiesp.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)p.setProperty("value.serializer", classOf[ByteArraySerializer].getName)val sink = new FlinkKafkaProducer09[String](sinkKafka, new SimpleStringSchema(), properties)remoteAddr.addSink(sink)//5.触发运算env.execute("flink-kafka-wordcunt")}}//保存结构化数据case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。