100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > flink大数据处理流式计算详解

flink大数据处理流式计算详解

时间:2021-06-02 10:00:39

相关推荐

flink大数据处理流式计算详解

flink大数据处理

文章目录

flink大数据处理二、WebUI可视化界面(测试用)三、Flink部署3.1 JobManager3.2 TaskManager3.3 并行度的调整配置3.4 区分 TaskSolt和parallelism并行度配置 四、Source Operator(资源算子)五、Sink Operator(输出算子)六、Flink滑动-滚动时间窗和触发器6.1 窗口API6.2 AggregateFunction增量聚合函数6.3 全窗口函数6.4 **processWindowFunction全窗口函数知识** 七、link乱序延迟时间处理-Watermark7.1 数据延迟处理 八、Flink乱序延迟时间处理-多层保证措施九、Flink的状态State管理十、Flink 复杂事件处理 CEP讲解+案例实战十一、Flink项目打包插件讲解+部署阿里云实战11.1本地安装11.2 测试本地安装11.3 **maven常用插件介绍和本地Flink项目打包** \* 添加打包插件 十二、docker 安装十三、总结

flink官方文档地址

离线计算和实时计算 :是对数据处理的【延迟】不一样(一个实时和非实时)流式计算和批量计算: 是对数据处理的【方式】不一样(一个流式和一个批量)结论:离线和批量不等价,实时和流式不等价,因为不是同个维度的东西

二、WebUI可视化界面(测试用)

访问:ip:8081方式一:服务端部署Flink集群(生产环境)方式二:本地依赖添加(测试开发)

<!--Flink web ui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency>

nc命令介绍 Linux nc命令用于设置网络路由的nc -lk 8888开启 监听模式,用于指定nc将处于监听模式, 等待客户端来链接指定的端口1 解压netcat-win32-1.11.zip2 配置解压好的目录路径到PATH环境变量3 测试

nc -l -p 8888

win | linux 需要安装 win 百度搜索博文参考不同系统安装 下载地址 /misc/netcat/netcat-win32-1.11.zip linux 安装 yum install -y netcatyum install -y nc

http://127.0.0.1:8081/

本地UI界面

三、Flink部署

运行流程 用户提交Flink程序到JobClient,JobClient的 解析、优化提交到JobManagerTaskManager运行task, 并上报信息给JobManager通俗解释 JobManager 包工头TaskManager 任务组长Task solt 工人 (并行去做事情)

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序

* 运行时由两种类型的进程组成

* 一个JobManager

* 一个或者多个TaskManager

什么是JobManager(大Boss,包工头) 协调 Flink 应用程序的分布式执行的功能 它决定何时调度下一个 task(或一组 task)对完成的 task 或执行失败做出反应协调 checkpoint、并且协调从失败中恢复等等 什么是TaskManager (任务组长,搬砖的人) 负责计算的worker,还有上报内存、任务运行情况给JobManager等至少有一个 TaskManager,也称为 worker执行作业流的 task,并且缓存和交换数据流在 TaskManager 中资源调度的最小单位是 task slot

3.1 JobManager

JobManager进程由三个不同的组件组

ResourceManager

负责 Flink 集群中的资源提供、回收、分配 - 它管理task slots

Dispatcher

提供了一个 REST 接口,用来提交 Flink 应用程序执行

为每个提交的作业启动一个新的 JobMaster。

运行 Flink WebUI 用来提供作业执行信息

JobMaster

负责管理单个JobGraph的执行,Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster

至少有一个 JobManager,高可用(HA)设置中可能有多个 JobManager,其中一个始终是leader,其他的则是standby

3.2 TaskManager

TaskManager中 task slot 的数量表示并发处理 task 的数量

一个 task slot 中可以执行多个算子,里面多个线程

算子 opetator

source

transformation

sink

对于分布式执行,Flink 将算子的 subtasks _链接_成tasks,每个 task 由一个线程执行

图中source和map算子组成一个算子链,作为一个task运行在一个线程上

将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量

5 个 subtask 执行,因此有 5 个并行线程 Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。Sub-Task 强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task图中source和map算子组成一个算子链,作为一个task运行在一个线程上 算子链接成 一个 task 它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量

Task Slots 任务槽

Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask ,每个subtask会以单独的线程来运行每个 worker(TaskManager)是一个 JVM 进程,可以在单独的线程中执行一个(1个solt)或多个 subtask为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)每个 task slot 代表 TaskManager 中资源的固定子集注意 所有Task Slot平均分配TaskManger的内存, TaskSolt 没有 CPU 隔离当前 TaskSolt 独占内存空间,作业间互不影响一个TaskManager进程里有多少个taskSolt就意味着多少个并发task solt数量建议是cpu的核数,独占内存,共享CPU

/projects/flink/flink-docs-release-1.13/zh/docs/concepts/flink-architecture/#tasks-and-operator-chains

/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/

Flink 是分布式流式计算框架 程序在多节点并行执行,所以就有并行度ParallelismDataStream 就像是有向无环图(DAG),每一个 数据流(DataStream) 以一个或多个 source 开始,以一个或多个 sink 结束 流程 一个数据流( stream) 包含一个或多个分区,在不同的线程/物理机里并行执行每一个算子( operator) 包含一个或多个子任务( subtask),子任务在不同的线程/物理机里并行执行一个算子的子任务subtask 的个数就是并行度( parallelism)

3.3 并行度的调整配置

Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级Flink并行度配置级别 (高到低) 算子 map( xxx ).setParallelism(2) 全局env env.setParallelism(2) 客户端cli ./bin/flink run -p 2 xxx.jar Flink配置文件 /conf/flink-conf.yaml 的 parallelism.defaul 默认值 某些算子无法设置并行度本地IDEA运行 并行度默认为cpu核数

3.4 区分 TaskSolt和parallelism并行度配置

task slot是静态的概念,是指taskmanager具有的并发执行能力;parallelism是动态的概念,是指 程序运行时实际使用的并发能力前者是具有的能力比如可以100个,后者是实际使用的并发,比如只要20个并发就行。Flink有3中运行模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

STREAMING 流处理BATCH 批处理AUTOMATIC 根据source类型自动选择运行模式,基本就是使用这个

四、Source Operator(资源算子)

第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。 Source来源 元素集合 env.fromElementsenv.fromColletionenv.fromSequence(start,end); 文件/文件系统 env.readTextFile(本地文件);env.readTextFile(HDFS文件); 基于Socket env.socketTextStream(“ip”, 8888) 自定义Source,实现接口自定义数据源,rich相关的api更丰富 并行度为1 SourceFunctionRichSourceFunction 并行度大于1 ParallelSourceFunctionRichParallelSourceFunction Connectors与第三方系统进行对接(用于source或者sink都可以) Flink本身提供Connector例如kafka、RabbitMQ、ES等注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败 Apache Bahir连接器 里面也有kafka、RabbitMQ、ES的连接器更多

和外部系统进行读取写入的

第一种 Flink 里面预定义的 source 和 sink。第二种 Flink 内部也提供部分 Boundled connectors。第三种是第三方 Apache Bahir 项目中的连接器。第四种是通过异步 IO 方式 异步I/O是Flink提供的非常底层的与外部系统交互

设置不同的并行度

package cn.mesmile.flink.demo;import cn.mesmile.flink.jdkstream.VideoOrder;import org.apache.mon.functions.FilterFunction;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author zb* @date /8/21 16:56* @Description*/public class FlinkCustomSourceDemo04 {public static void main(String[] args) throws Exception {// 构建执行任务环境以及任务的启动的入口, 存储全局相关的参数// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建本地 UI 界面操作 127.0.0.1:8081final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);// 本机默认并行数为 12 ---> 本机配置为 6 核 12 线程VideoOrderSource videoOrderSource = new VideoOrderSource();DataStream<VideoOrder> videoOrderDataStream = env.addSource(videoOrderSource);videoOrderDataStream.filter(new FilterFunction<VideoOrder>() {@Overridepublic boolean filter(VideoOrder value) throws Exception {return value.getMoney() > 5;}}).setParallelism(3);videoOrderDataStream.print().setParallelism(4);//DataStream需要调用execute,可以取个名称env.execute("custom source job");}}

五、Sink Operator(输出算子)

Sink 输出源 预定义 printwriteAsText (过期) 自定义 SinkFunctionRichSinkFunction Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等 flink官方提供 Bundle Connector kafka、ES 等 Apache Bahir kafka、ES、Redis等

六、Flink滑动-滚动时间窗和触发器

/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/windows/

Windows are at the heart of processing infinite streams(Window是处理无限数据量的核心)

数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等

Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算

窗口认为是Bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶

time Window 时间窗口,即按照一定的时间规则作为窗口统计

time-tumbling-window 时间滚动窗口 (用的多)

time-sliding-window 时间滑动窗口 (用的多)

session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用

count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用

滑动窗口 Sliding Windows

窗口具有固定大小窗口数据有重叠例子:每10s统计一次最近1min内的订单数量 滚动窗口 Tumbling Windows 窗口具有固定大小窗口数据不重叠例子:每10s统计一次最近10s内的订单数量 【窗口大小size】 和 【滑动间隔 slide】 tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所以开发中不用

6.1 窗口API

有keyBy 用 window() api没keyBy 用 windowAll() api ,并行度低方括号 ([…]) 中的命令是可选的,允许用多种不同的方式自定义窗口逻辑一个窗口内 的是左闭右开countWindow没过期,但timeWindow在1.12过期,统一使用window;窗口分配器 Window Assigners 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window窗口上window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner 窗口触发器 trigger 用来控制一个窗口是否需要被触发每个 窗口分配器WindowAssigner 都有一个默认触发器,也支持自定义触发器 窗口 window function ,对窗口内的数据做啥?

定义了要对窗口中收集的数据做的计算操作

增量聚合函数

aggregate(agg函数,WindowFunction(){ })

窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中常见的增量聚合函数有 reduceFunction、aggregateFunctionmin、max、sum 都是简单的聚合操作,不需要自定义规则

AggregateFunction<IN, ACC, OUT>

IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据

全窗口函数

apply(new processWindowFunction(){ })

窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)

IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗

WindowFunction<IN, OUT, KEY, W extends Window>

如果想处理每个元素更底层的API的时候用

//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter

process(new KeyedProcessFunction(){processElement、onTimer}

基于数量的滚动窗口, 滑动计数窗口案例: 统计分组后同个key内的数据超过5次则进行统计 countWindow(5)只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)

6.2 AggregateFunction增量聚合函数

定义了要对窗口中收集的数据做的计算操作

增量聚合函数

aggregate(agg函数,WindowFunction(){ })

窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中常见的增量聚合函数有 reduceFunction、aggregateFunctionmin、max、sum 都是简单的聚合操作,不需要自定义规则

AggregateFunction<IN, ACC, OUT>

IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据

6.3 全窗口函数

apply(new WindowFunction(){ })

窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗

WindowFunction<IN, OUT, KEY, W extends Window>

6.4processWindowFunction全窗口函数知识

全窗口函数

process(new ProcessWindowFunction(){})

窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算

常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)

IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗

ProcessWindowFunction<IN, OUT, KEY, W extends Window>

窗口函数对比

增量聚合 aggregate(new AggregateFunction(){}); 全窗口聚合 apply(new WindowFunction(){})process(new ProcessWindowFunction(){}) //比上面apply强 * Flink里面定义窗口,可以引用不同的时间概念Flink里面时间分类 事件时间EventTime(重点关注) 事件发生的时间事件时间是每个单独事件在其产生进程上发生的时间,这个时间通常在记录进入 Flink 之前记录在对象中在事件时间中,时间值 取决于数据产生记录的时间,而不是任何Flink机器上的 进入时间 IngestionTime 事件到进入Flink 处理时间ProcessingTime 事件被flink处理的时间指正在执行相应操作的机器的系统时间是最简单的时间概念,不需要流和机器之间的协调,它提供最佳性能和最低延迟但是在分布式和异步环境中,处理时间有不确定性,存在延迟或乱序问题 事件时间(EventTime)已经能够解决所有的问题了,那为何还要用处理时间呢????处理时间(ProcessingTime)由于不用考虑事件的延迟与乱序,所以处理数据的速度高效如果一些应用比较重视处理速度而非准确性,那么就可以使用处理时间(ProcessingTime),但结果具有不确定性事件时间(EventTime)有延迟,但是能够保证处理的结果具有准确性,并且可以处理延迟甚至无序的数据

做了一个电商平台买 “超短男装衣服”,如果要统计10分钟内成交额,你认为是哪个时间比较好?

(EventTime) 下单支付时间是-11-11 01-01-01(IngestionTime ) 进入Flink时间-11-11 01-03-01(网络拥堵、延迟)(ProcessingTime)进入窗口时间-11-11 01-31-01(网络拥堵、延迟)

七、link乱序延迟时间处理-Watermark

一般我们都是用EventTime事件时间进行处理统计数据但数据由于网络问题延迟、乱序到达会导致窗口计算数据不准确需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟到达 当 12:01:10 秒数据到达的时候,不立刻触发窗口计算而是等一定的时间,等迟到的数据来后再关闭窗口进行计算 每天10点后就是迟到,需要扣工资老王上班 路途遥远(延迟) 经常迟到 HR就规定迟到5分钟后就罚款100元(5分钟就是watermark)迟到30分钟就是上午事假处理 (5~30分就是 allowLateness )不请假都是要来的 (超过30分钟就是侧输出流,sideOutPut兜底) 超过5分钟就不用来了吗?还是要来的继续工作的,不然今天上午工资就没了那如果迟到30分钟呢? 也要来的,不然就容易产生更大的问题,缺勤开除。。。。Watermark 水位线介绍 由flink的某个operator操作生成后,就在整个程序中随event数据流转 With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少) 衡量数据是否乱序的时间,什么时候不用等早之前的数据是一个全局时间戳,不是某一个key下的值是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会注意 Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担需要经过测试,和业务相关联,得出一个较合适的值即可 窗口触发计算的时机 watermark之前是按照窗口的关闭时间点计算的 [12:01:01,12:01:10 )watermark之后,触发计算的时机 窗口内有数据Watermaker >= Window EndTime窗口结束时间 触发计算后,其他窗口内数据再到达也被丢弃Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 window大小为10s,窗口是W1 [23:12:00~23:12:10) 、 W2[23:12:10~23:12:20) 下面是数据的event time数据A 23:12:07数据B 23:12:11数据C 23:12:08数据D 23:12:17数据E 23:12:09 没加入watermark,由上到下进入flink 数据B到了之后,W1就进行了窗口计算,数据只有A数据C 迟到了3秒,到了之后,由于W1已经计算了,所以就丢失了数据C 加入watermark, 允许5秒延迟乱序,由上到下进入flink 数据A到达 watermark = 12:07 - 5 = 12:02 < 12:10 ,所以不触发W1计算, A属于W1 数据B到达 watermark = max{ 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, B属于W2 数据C到达 watermark = max{12:08, 12:11, 12:07} - 5 = 12:06 < 12:10 ,所以不触发W1计算, C属于W1 数据D到达 watermark = max{12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 触发W1计算, D属于W2 数据E到达 watermark = max{12:09, 12:17, 12:08, 12:11, 12:07} - 5 = 12:12 > 23:12:10 , 之前已触发W1计算, 所以丢失了E数据, Watermaker 计算 = **当前计算窗口最大的事件时间 **- 允许乱序延迟的时间什么时候触发W1窗口计算 Watermaker >= Window EndTime窗口结束时间当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间 测试数据 窗口 [23:12:00 ~ 23:12:10) | [23:12:10 ~ 23:12:20)触发窗口计算条件 窗口内有数据watermark >= 窗口endtime即 当前计算窗口最大的事件时间 - 允许乱序延迟的时间 >= Window EndTime窗口结束时间

java,-11-11 23:12:07,10

java,-11-11 23:12:11,10

java,-11-11 23:12:08,10

mysql,-11-11 23:12:13,10 // 触发 13 - 3 ≥ 10

java,-11-11 23:12:13,10

java,-11-11 23:12:17,10

java,-11-11 23:12:09,10

java,-11-11 23:12:20,10

java,-11-11 23:12:22,10

java,-11-11 23:12:23,10 // 触发 23 -3 ≥ 20

窗口时间并行度调整为1

7.1 数据延迟处理

Flink 最后的兜底延迟数据处理 测输出流实战

超过了watermark的等待后,还有延迟数据到达怎么办?watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据数据超过了allowedLateness 后,就丢失了吗?用侧输出流 SideOutput

八、Flink乱序延迟时间处理-多层保证措施

简介: Flink乱序延迟时间处理-多层保证措施介绍和归纳

面试题:如何保证在需要的窗口内获得指定的数据?数据有乱序延迟 flink采用watermark 、allowedLateness() 、sideOutputLateData()三个机制来保证获取数据watermark的作用是防止数据出现延迟乱序,允许等待一会再触发窗口计算,提前输出allowLateness,是将窗口关闭时间再延迟一段时间.设置后就像window变大了 那么为什么不直接把window设置大一点呢?或者把watermark加大点?watermark先输出数据,allowLateness会局部修复数据并主动更新窗口的数据输出这期间的迟到数据不会被丢弃,而是会触发窗口重新计算 sideOutPut是最后兜底操作,超过allowLateness后,窗口已经彻底关闭了,就会把数据放到侧输出流 测输出流 OutputTag tag = new OutputTag(){}, 由于泛型查除问题,需要重写方法,加花括号 应用场景:实时监控平台 可以用watermark及时输出数据allowLateness 做短期的更新迟到数据sideOutPut做兜底更新保证数据准确性 总结Flink的机制 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据注意 Flink 默认的处理方式直接丢弃迟到的数据sideOutPut还可以进行分流功能DataStream没有getSideOutput方法,SingleOutputStreamOperator才有, 版本弃用API

新接口,WatermarkStrategyTimestampAssignerWatermarkGenerator因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式

新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了

九、Flink的状态State管理

Flink的状态State介绍和应用场景解析

什么是State状态 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等是一个Operator的运行的状态/历史值,是维护在内存中流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果,然后把结果更新到状态里面

有状态和无状态介绍

无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作

状态管理分类

ManagedState(用的多) Flink管理,自动存储恢复细分两类 Keyed State 键控状态(用的多) 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化ValueState、ListState、MapState等数据结构 Operator State 算子状态(用的少,部分source会用) ListState、UnionListState、BroadcastState等数据结构 RawState(用的少) 用户自己管理和维护存储结构:二进制数组

State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)

ValueState 简单的存储一个值(ThreadLocal / String) ValueState.value()ValueState.update(T value) ListState 列表 ListState.add(T value)ListState.get() //得到一个Iterator MapState 映射类型 MapState.get(key)MapState.put(key, value)

State状态后端:存储在哪里

Flink 内置了以下这些开箱即用的 state backends :

(新版)HashMapStateBackend、EmbeddedRocksDBStateBackend 如果没有其他配置,系统将使用 HashMapStateBackend。 (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend 如果不设置,默认使用 MemoryStateBackend。

状态详解

HashMapStateBackend 保存数据在内部作为Java堆的对象。 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作但是状态大小受集群内可用内存的限制场景: 具有大状态、长窗口、大键/值状态的作业。所有高可用性设置。 EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据 该数据库(默认)存储在 TaskManager 本地数据目录中与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级场景 具有非常大状态、长窗口、大键/值状态的作业。所有高可用性设置 旧版

MemoryStateBackend(内存,不推荐在生产场景使用)

FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)

RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)

配置

方式一:可以flink-conf.yaml使用配置键在 中配置默认状态后端state.backend

配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend)

或实现状态后端工厂StateBackendFactory的类的完全限定类名

#全局配置例子一

state.backend: hashmap

state.checkpoint-storage: jobmanager

#全局配置例子二

state.backend: rocksdb

state.checkpoints.dir: file:///checkpoint-dir/

state.checkpoint-storage: filesystem

方式二:代码 单独job配置例子

//代码配置一(基于内存)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());//代码配置二(基于磁盘)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");//或者env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));- 备注:使用 RocksDBStateBackend 需要加依赖<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId><version>1.13.1</version></dependency>

Flink的Checkpoint-SavePoint和端到端状态一致性介绍

什么是Checkpoint 检查点 Flink中所有的Operator的当前State的全局快照默认情况下 checkpoint 是禁用的Checkpoint是把State数据定时持久化存储,防止丢失手工调用checkpoint,叫savepoint,主要是用于flink集群维护升级等底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性 开箱即用,Flink 捆绑了这些检查点存储类型: 作业管理器检查点存储 JobManagerCheckpointStorage文件系统检查点存储 FileSystemCheckpointStorage 配置

//全局配置checkpoints

state.checkpoints.dir: hdfs:///checkpoints/

//作业单独配置checkpoints

env.getCheckpointConfig().setCheckpointStorage(“hdfs:///checkpoints-data/”);

//全局配置savepoint

state.savepoints.dir: hdfs:///flink/savepoints

Savepoint 与 Checkpoint 的不同之处

类似于传统数据库中的备份与恢复日志之间的差异Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式

端到端(end-to-end)状态一致性

数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的

在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)

端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。

Source 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置 内部 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据 Sink: 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

十、Flink 复杂事件处理 CEP讲解+案例实战

什么是FlinkCEP CEP全称 Complex event processing 复杂事件处理FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用地址:/projects/flink/flink-docs-release-1.13/docs/libs/cep/ 用途 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件允许业务定义要从输入流中提取的复杂模式序列 使用流程 定义patternpattern应用到数据流,得到模式流从模式流 获取结果

DataStream<Event> input = ...Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.process(new PatternProcessFunction<Event, Alert>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});- CEP并不包含在flink中,使用前需要自己导入<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.version}</artifactId><version>${flink.version}</version></dependency>

模式(Pattern):定义处理事件的规则 三种模式PatternAPI 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组 近邻模式 严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()宽松近邻:允许中间出现不匹配的事件,API是.followedBy()非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()指定时间约束:指定模式在多长时间内匹配有效,API是within如果您不希望事件类型直接跟随另一个,notNext()如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()模式分类 单次模式:接收一次一个事件循环模式:接收一个或多个事件 其他参数 times:指定固定的循环执行次数greedy:贪婪模式,尽可能多触发oneOrMore:指定触发一次或多次timesOrMore:指定触发固定以上的次数optional:要么不触发要么触发指定的次数

十一、Flink项目打包插件讲解+部署阿里云实战

Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

* 文档:/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/

* Local 本地部署,直接启动进程,适合调试使用

* 直接部署启动服务

* Standalone Cluster集群部署,flink自带集群模式

* Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率

* Kubernetes 部署

* Docker部署

11.1本地安装

* Flink下载地址

* /zh/downloads.html

* flink版本 1.13.1(课程安装包那边有提供)

* 步骤

* 解压 tar -zxvf

* 目录介绍

* conf

* flink-conf.yaml

#web ui 端口rest.port=8081​#调整jobmanager.memory.process.size: 1000mtaskmanager.memory.process.size: 1000m* bin* start-cluster.sh* stop-cluster.sh* yarn-session.sh* example* 启动 bin/start-cluster.sh* 停止 bin/stop-cluster.sh* 查看进程 jps* TaskManagerRunner* StandaloneSessionClusterEntrypoint* 网络安全组或者防火墙开放端口 8081* 访问地址 http://ip:8081

11.2 测试本地安装

flink测试官方案例

创建文件

cd /usr/local/software/flink/examples/sourcevim xdclass_source.txt​java xdclassspringboot springcloudhtml flinkspringboot redisjava flinkkafka flinkjava springboot

bin目录运行

./flink run /usr/local/software/flink/examples/batch/WordCount.jar --input /usr/local/software/flink/examples/source/xdclass_source.txt --output /usr/local/software/flink/examples/source/xdclass_result.txt

访问web UI (有个小bug,内存不够,页面访问失败则重新点击或者加大内存)

11.3maven常用插件介绍和本地Flink项目打包* 添加打包插件

<build><finalName>xdclass-flink</finalName><plugins>​<!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>${file.encoding}</encoding></configuration></plugin>​<!-- 添加依赖到jar包 --><!--<plugin>--><!--<artifactId>maven-assembly-plugin</artifactId>--><!--<configuration>--><!--<descriptorRefs>--><!--<descriptorRef>jar-with-dependencies</descriptorRef>--><!--</descriptorRefs>--><!--</configuration>--><!--<executions>--><!--<execution>--><!--<id>make-assembly</id>--><!--<phase>package</phase>--><!--<goals>--><!--<goal>single</goal>--><!--</goals>--><!--</execution>--><!--</executions>--><!--</plugin>-->​<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin></plugins></build>

打包插件 maven-jar-plugin,默认的打包插件,用来打普通的jar包,需建立lib目录里来存放需要的依赖包maven-shade-plugin (推荐) 将依赖的jar包打包到当前jar包,成为fat JAR包,也可以防止类冲突 隔离maven-assembly-plugin,大数据项目用的比较多,支持自定义的打包结构,比如sql/shell等 测试插件 maven-surefire-plugin, 用于mvn 生命周期的测试阶段的插件,通过参数设置在junit下控制测试

运行

通过WebUI部署Flink项目到阿里云Linux运行

* 访问WebUI

* 上传jar包

* 选择main入口类APP

* 提交任务查看情况

* Task Solt 是指taskmanager的并发执行能力,parallelism是指taskmanager实际使用的并发能力

taskmanager.numberOfTaskSlots:4​假如每一个taskmanager中的分配4个TaskSlot,那有3个taskmanager一共有12个TaskSlot

测试数据

AA,-11-11 12:01:01,-1BB,-11-11 12:01:02,1AA,-11-11 12:01:04,-1AA,-11-11 12:01:05,-1

并行度和solt的疑惑 Task Slots 是具备的并发能力,大于 Parallelism并行度(实际用的)数据流里面算子的最大并行度就是Parallelism, 2-2-2-3-1 这样的并行度,最大就是3(同个任务job里面)

十二、docker 安装

* 每个 Flink 集群的作业里,都是有客户端在运行,主要是获取 Flink 应用程序的代码,将其转换为 JobGraph 并提交给 JobManager JobManager 将工作分配到 TaskManagers 上,在那里运行实际的操作符(例如源、转换和接收器)客户端是啥? 我们前面测试的 bin/flink run xxx 这个就是客户端的一种,提交任务给flink集群运行或者WebUI界面那边提交任务 Local 本地部署,直接启动进程,适合调试使用 直接部署启动服务 Standalone Cluster集群部署,flink自带集群模式Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率 Yarn集群三种模式介绍,Session模式JM 负载瓶颈,main 方法在客户端执行保留了Standalone的优势,需要事先申请资源,启动固定数量的JobManager和TaskManger (JobManager只有一个)常驻在内存,提交到这个集群的作业可以直接运行Session的资源总量有限,多个job之间不是隔离的,故可能会造成资源的争用或者宕机影响适合场景:对延迟非常敏感但运行时长较短的作业Per-Job模式(Docker-Compose不支持)JM 负载瓶颈,main 方法在客户端执行各自形成单独的Flink集群,拥有专属的JobManager和TaskManager一个作业的TaskManager失败不会影响其他作业的运行, 作业完成后相关资源会被清除当机器上有多个 client 时,有较高的网络负载:传输 jar 、消耗大量的 CPU 来执行 main方法适合场景:规模大长时间运行的作业Application模式(Flink 1.11版本中)和Per-Job模式类似主要是为了解决 Per-Job Mode 的不足,避免 带宽、CPU 的热点问题 注意: Docker 上的 Flink 不支持Per-Job 模式。 HA模式 standalone cluster HAYARN cluster HA需要JDK、zookeeper HA、flink 等程序进行构建,至少需要三个物理机。K8S云厂商:阿里云、华为云、亚马逊云等 官方文档 常规文档 /projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/ docker-compose文档 /projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-with-docker-compose 机器和配置准备 关闭local模式部署的flink进程安装docker和docker-compose 创建docker-compose.yml 文件 _Session Cluster_模式

version: "3.7"services:jobmanager:image: flink:scala_2.12-java8ports:- "8081:8081"command: jobmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanager​taskmanager:image: flink:scala_2.12-java8depends_on:- jobmanagercommand: taskmanagerscale: 3environment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 2

每个 manage 有 2 个 slot 所以最大并行度为 3(个manage)* 2(个slot) =6

version: "3.7"services:flink-jobmanager-01:image: flink:scala_2.12-java8container_name: flink-jobmanager-01hostname: flink-jobmanager-01expose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01flink-taskmanager-01:image: flink:scala_2.12-java8container_name: flink-taskmanager-01hostname: flink-taskmanager-01expose:- "6121"- "6122"depends_on:- flink-jobmanager-01command: taskmanagerlinks:- "flink-jobmanager-01:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01flink-taskmanager-02:image: flink:scala_2.12-java8container_name: flink-taskmanager-02hostname: flink-taskmanager-02expose:- "6121"- "6122"depends_on:- flink-jobmanager-01command: taskmanagerlinks:- "flink-jobmanager-01:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01

端口说明

The Web Client is on port 8081JobManager RPC port 6123TaskManagers RPC port 6122TaskManagers Data port 6121​注意:expose暴露容器给link到当前容器的容器ports是暴露容器端口到宿主机端口进行映

问题 内存不足 (其他程序不运行,最少也需要1核2g,建议是4或者8g)网络安全组没开放端口 8081

十三、总结

课程总结 SourceTransformationSink 特性 WindowWatermark窗口函数CEPState和Checkpoint Flink进阶 Flink内存管理和优化、Blink、SQL、Table API 、容错、HA、新特性多流连接Join、触发器、定时器、通信组件Akka/RPC原理、JobGraph流程

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。