100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > Flink常见流处理API

Flink常见流处理API

时间:2022-09-27 08:24:24

相关推荐

Flink常见流处理API

Flink 流处理API的编程可以分为environment,source,transform,sink四大部分

1 Flink支持的数据类型

在Flink底层因为要对所有的数据序列化,反序列化对数据进行传输,以便通过网络传送它们,或者从状态后端、检查点和保存点读取它们。所以Flink要有一套自己的类型提取系统,就是TypeInformation机制。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。这里其实就是说在转换过程中必须是他支持的数据类型才能转换成TypeInformation。

基本上我们一般能够用到的数据类型常见的都支持,如下:

(1)Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …

(2)Java和Scala元组(Tuples),最多25个字段,不支持空字段

(3)cala样例类(case classes),最多22个字段,不支持空字段

(4) Java简单对象(POJOs)

(5)Row具有任意数量字段的元组并支持空字段

(6)Arrays, Lists, Maps, Enums, 等等

2 执行环境Environment

Flink编程的第一步首先是创建一个执行环境,表示当前执行程序的上下文。Environment可以通过以下几种方式构建

(1)getExecutionEnvironment

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment或val env: ExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。

(2)createLocalEnvironment

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

返回本地执行环境,需要在调用时指定默认的并行度。

(3)createRemoteEnvironment

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", YOURJobManagerHOST,"YOURPATH//wordcount.jar")

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

3 Source

(1)从集合读取数据

val stream = env.fromCollection(List("a","b","c"))

(2)从文件读取数据

val stream = env.readTextFile("YOUR_FILE_PATH")

(3)kafka消息队列的数据作为来源

需要引入kafka连接器的依赖:

<!-- /artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version></dependency>

具体代码如下:

val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.mon.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.mon.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val stream = env.addSource(new FlinkKafkaConsumer011[String]("topic_name", new SimpleStringSchema(), properties))

(4)自定义Source

除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:

val stream = env.addSource( new MySource() )case class CustomSource(id:String,times:String)class MySource extends SourceFunction[CustomSource]{// running表示数据源是否还在正常运行var running: Boolean = trueoverride def cancel(): Unit = {running = false}override def run(ctx: SourceFunction.SourceContext[CustomSource]): Unit = {while(running){ctx.collect(CustomSource(UUID.randomUUID().toString,System.currentTimeMillis().toString))Thread.sleep(100)}}}

4 Transform

(1)map:输入一个元素,输出一个元素,可以用来做一些清洗,转换工作。DataStream → DataStream

val streamMap = stream.map { x => x * 2 }

(2)flatMap:和Map相似,可以理解为将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个,多用于拆分操作。DataStream → DataStream

val streamFlatMap = stream.flatMap{x => x.split(" ")}

(3)filter:过滤筛选,将所有符合判断条件的结果集输出,DataStream → DataStream

val streamFilter = stream.filter{x => x > 1}

(4)KeyBy:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的,返回KeyedStream。DataStream -> KeyedStream

注意:以下类型无法作为key①POJO类,且没有实现hashCode函数②任意形式的数组类型

dataStream.keyBy("someKey") // Key by field "someKey"dataStream.keyBy(0) // Key by the first element of a Tuple

(5)滚动聚合算子(Rolling Aggregation)

对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果,常见的有sum(),min(),max(),minBy(),maxBy()等,KeyedStream → DataStream

min(),max(), minBy(),maxBy()这些算子可以针对KeyedStream的每一个支流做聚合

keyedStream.sum(0)keyedStream.sum("key")keyedStream.min(0)keyedStream.min("key")keyedStream.max(0)keyedStream.max("key")keyedStream.minBy(0)keyedStream.minBy("key")keyedStream.maxBy(0)keyedStream.maxBy("key")

min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含的最小值的元素(同样元原理适用于max和maxBy)

(6)fold:用一个初始的一个值,与其每个元素进行滚动合并操作。KeyedStream → DataStream

val result: DataStream[String] =keyedStream.fold("start")((str, i) => { str + "-" + i })

当应用于序列(1,2,3,4,5)时,发出序列“start-1”、“start-1-2”、“start-1-2”,…

(6)reduce:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。KeyedStream → DataStream

case class WC(val word: String, val count: Int)val wordCounts = stream.groupBy("word").reduce {(w1, w2) => new WC(w1.word, w1.count + w2.count)}

(7) Split 和 Select

Split :根据某些特征把一个DataStream拆分成两个或者多个DataStream。DataStream → SplitStream

Select:从一个SplitStream中获取一个或者多个DataStream。SplitStream→DataStream

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")})val even = split select "even"val odd = split select "odd"val all = split.select("even","odd")

(8) Connect和 CoMap、CoFlatMap

Connect:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据形式不发生任何变化,两个流相互独立。DataStream,DataStream → ConnectedStreams

CoMap,CoFlatMap:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。ConnectedStreams → DataStream

someStream : DataStream[Int] = ...otherStream : DataStream[String] = ...val connectedStreams = someStream.connect(otherStream)connectedStreams.map((_ : Int) => true,(_ : String) => false)connectedStreams.flatMap((_ : Int) => true,(_ : String) => false)

Connect与 Union 区别:①Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。② Connect只能操作两个流,Union可以操作多个。

(9)iterate

在流程中创建一个反馈循环,将一个操作的输出重定向到之前的操作。DataStream --> IterativeStream --> DataStream

initialStream.iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))}}

(10)extract timestamps

提取记录中的时间戳来跟需要事件时间的window一起发挥作用。DataStream --> DataStream

stream.assignTimestamps { timestampExtractor }

5 Sink

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache NiFi (source/sink)Twitter Streaming API (source)Apache ActiveMQ (source/sink)Apache Flume (sink)Redis (sink)Akka (sink)Netty (source)

(1)kafka

需要添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version></dependency>

主函数中添加sink

datastream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))

(2)redis

添加依赖

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>

定义一个redis的mapper类,用于定义保存到redis时调用的命令:

class RedisExampleMapper extends RedisMapper[(String, String)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2}

在主函数中调用:

val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

(3)Elasticsearch

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.10.0</version></dependency>

在主函数中调用:

import org.apache.mon.functions.RuntimeContextimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunctionimport org.apache.flink.streaming.connectors.elasticsearch.RequestIndexerimport org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHostimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.client.Requestsimport java.util.ArrayListimport java.util.Listval input: DataStream[String] = ...val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))val esSinkBuilder = new ElasticsearchSink.Builer[String](httpHosts,new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]json.put("data", element)return Requests.indexRequest().index("my-index").type("my-type").source(json)}})// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be bufferedesSinkBuilder.setBulkFlushMaxActions(1)// provide a RestClientFactory for custom configuration on the internally created REST clientesSinkBuilder.setRestClientFactory(restClientBuilder -> {restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)})// finally, build and add the sink to the job's pipelineinput.addSink(esSinkBuilder.build)

(4)JDBC 自定义sink

以mysql为例,添加依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency>

添加MysqlJdbcSink

class MysqlJdbcSink() extends RichSinkFunction[(String, String)]{var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// open 主要是创建连接override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")insertStmt = conn.prepareStatement("INSERT INTO mysqljdbcsink (id, name) VALUES (?, ?)")updateStmt = conn.prepareStatement("UPDATE mysqljdbcsink SET id = ? WHERE name = ?")}// 调用连接,执行sqloverride def invoke(value: (String, String), context: SinkFunction.Context[_]): Unit = {updateStmt.setString(1, value._1)updateStmt.setString(2, value._2)updateStmt.execute()if (updateStmt.getUpdateCount == 0) {insertStmt.setString(1, value._1)insertStmt.setString(2, value._2)insertStmt.execute()}}override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}}

主函数中调用

dataStream.addSink(new MysqlJdbcSink())

6 UDF函数

6.1 函数类(Function Classes)

函数类:就是在Flink里面每一步运算,转换,包括source和sink。每一个算子里面的参数都可以传入一个所谓的函数类。就提供了更多更灵活的实现自己功能的方法。Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等

实现了FilterFunction接口如下:

class MyFilter extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}}val filterStream = stream.filter(new FlinkFilter)

将函数实现成匿名类

val filterStream = stream.filter(new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}})

6.2 富函数(Rich Functions)

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。如RichMapFunction, RichFlatMapFunction,RichFilterFunction

Rich Function有一个生命周期的概念。典型的生命周期方法有:

①open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

②close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

③getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {var subTaskIndex = 0override def open(configuration: Configuration): Unit = {subTaskIndex = getRuntimeContext.getIndexOfThisSubtask// 以下可以做一些初始化工作,}override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {if (in % 2 == subTaskIndex) {out.collect((subTaskIndex, in))}}override def close(): Unit = {// 以下做一些清理工作}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。