100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > 【Flink流式计算框架】常见sink操作

【Flink流式计算框架】常见sink操作

时间:2019-09-08 05:44:31

相关推荐

【Flink流式计算框架】常见sink操作

007Flink

print() / printToErr()writeAsText()Flink提供的sink自定义sink

获取source的方式(自带的)

基于文件:readTextFile()

基于socket:socketTextStream

基于集合:fromCollection(Collection)

自定义数据源:addSource

实现SourceFunction<>接口,重写run、cancel,单并行度数据源

实现ParallelSourceFunction<>接口,Kafka多少个分区,这里设置多少个并行度

Flink自带的connector

Kafka

常见transformation操作

Map和filter

FlatMap、keyby、sum

一行变多行flatmap、一行对一行用map

keyBy按key分组

union:两个数据流合并到一起

connect、conMap和conFlatMap

split(new OutputSelector{…})

对流按规则进行切分,可用select(“XXX”,…)按规则名取出

常见sink操作

Print()/printToErr()

writeAsText()

自定义sink() 到redis

New flinkJedisPoolConfig.Builder().setHost()…

实现RedisMapper接口,重写getKeyFromData、getValueFromData、getCommandDescriptor

RedisCommad.LPUSH数据结构选择

Flink自带的connector->kafka、ES

连接redis客户端:

redis-cli -h 192.168.167.254 -p 6379

print() / printToErr()

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

writeAsText()

/*** 数据源:1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据,我们只需要偶数*/public class WriteTextDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接受到了数据:"+value);return value;}});SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long number) throws Exception {return number % 2 == 0;}});filterDataStream.writeAsText("D:\\flink\\src\\output\\test").setParallelism(1);env.execute("StreamingDemoWithMyNoPralalleSource");}}

Flink提供的sink

Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache NiFi (source/sink)Twitter Streaming API (source)Google PubSub (source/sink)

自定义sink

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>

自定义redis sink

/*** 把数据写入redis*/public class SinkForRedisDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//对数据进行组装,把string转化为tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//创建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//创建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {//表示从接收的数据中获取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示从接收的数据中获取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。