007Flink
print() / printToErr()writeAsText()Flink提供的sink自定义sink获取source的方式(自带的)
基于文件:readTextFile()
基于socket:socketTextStream
基于集合:fromCollection(Collection)
自定义数据源:addSource
实现SourceFunction<>接口,重写run、cancel,单并行度数据源
实现ParallelSourceFunction<>接口,Kafka多少个分区,这里设置多少个并行度
Flink自带的connector
Kafka
常见transformation操作
Map和filter
FlatMap、keyby、sum
一行变多行flatmap、一行对一行用map
keyBy按key分组
union:两个数据流合并到一起
connect、conMap和conFlatMap
split(new OutputSelector{…})
对流按规则进行切分,可用select(“XXX”,…)按规则名取出
常见sink操作
Print()/printToErr()
writeAsText()
自定义sink() 到redis
New flinkJedisPoolConfig.Builder().setHost()…
实现RedisMapper接口,重写getKeyFromData、getValueFromData、getCommandDescriptor
RedisCommad.LPUSH数据结构选择
Flink自带的connector->kafka、ES
连接redis客户端:
redis-cli -h 192.168.167.254 -p 6379
print() / printToErr()
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
writeAsText()
/*** 数据源:1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据,我们只需要偶数*/public class WriteTextDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接受到了数据:"+value);return value;}});SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long number) throws Exception {return number % 2 == 0;}});filterDataStream.writeAsText("D:\\flink\\src\\output\\test").setParallelism(1);env.execute("StreamingDemoWithMyNoPralalleSource");}}
Flink提供的sink
Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache NiFi (source/sink)Twitter Streaming API (source)Google PubSub (source/sink)自定义sink
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>
自定义redis sink
/*** 把数据写入redis*/public class SinkForRedisDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//对数据进行组装,把string转化为tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//创建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//创建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {//表示从接收的数据中获取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示从接收的数据中获取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}}}