100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > flink读取不到文件_flink批处理从0到1

flink读取不到文件_flink批处理从0到1

时间:2020-01-07 03:13:40

相关推荐

flink读取不到文件_flink批处理从0到1

一、DataSet API之Data Sources(消费者之数据源)

介绍:

flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口来自定义无并行度的source, 或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。

类型:
基于文件

readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

基于集合

fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

代码实现:
1、fromCollection

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamingFromCollectionScala {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._

val data = List(10,15,20)

val text = env.fromCollection(data)//针对map接收到的数据执行加1的操作

val num = text.map(_+1)

num.print().setParallelism(1)

env.execute("StreamingFromCollectionScala")

}

}package xuwei.tech.batch;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;/**

*/public class BatchWordCountJava {

public static void main(String[] args) throws Exception{

val data = List(10,15,20)

String outPath = "D:\\data\\result";//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//获取文件中的内容

val text = env.fromCollection(data)

DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);

counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);

env.execute("batch word count");

}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{

public void flatMap(String value, Collector> out) throws Exception {

String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {

if(token.length()>0){

out.collect(new Tuple2(token,1));

}

}

}

}

}

2、readTextFile

package xuwei.tech.batch;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;/**

* Created by xuwei.tech on /10/8.

*/public class BatchWordCountJava {

public static void main(String[] args) throws Exception{

String inputPath = "D:\\data\\file";String outPath = "D:\\data\\result";//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//获取文件中的内容

DataSource<String> text = env.readTextFile(inputPath);

DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);

counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);

env.execute("batch word count");

}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{

public void flatMap(String value, Collector> out) throws Exception {

String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {

if(token.length()>0){

out.collect(new Tuple2(token,1));

}

}

}

}

}

二、DataSet API之Transformations

介绍:

Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

FlatMap:输入一个元素,可以返回零个,一个或者多个元素

MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】

Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

Aggregate:sum、max、min等

Distinct:返回一个数据集中去重之后的元素,data.distinct()

Join:内连接

OuterJoin:外链接

Cross:获取两个数据集的笛卡尔积

Union:返回两个数据集的总和,数据类型需要一致

First-n:获取集合中的前N个元素

Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

代码实现:
1、broadcast(广播变量)

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.MapFunction;

import org.apache.mon.functions.RichMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;/**

* broadcast广播变量

*

*

*

* 需求:

* flink会从数据源中获取到用户的姓名

*

* 最终需要把用户的姓名和年龄信息打印出来

*

* 分析:

* 所以就需要在中间的map处理的时候获取用户的年龄信息

*

* 建议吧用户的关系数据集使用广播变量进行处理

*

*

*

*

* 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量

*/public class BatchDemoBroadcast {

public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1:准备需要广播的数据

ArrayListString, Integer>> broadData = new ArrayList<>();

broadData.add(new Tuple2<>("zs",18));

broadData.add(new Tuple2<>("ls",20));

broadData.add(new Tuple2<>("ww",17));

DataSetString, Integer>> tupleData = env.fromCollection(broadData);//1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄

DataSet<HashMap> toBroadcast = tupleData.map(new MapFunction, HashMap>() {@Override

public HashMap map(Tuple2 value) throws Exception {

HashMap res = new HashMap<>();

res.put(value.f0, value.f1);

return res;

}

});//源数据

DataSource<String> data = env.fromElements("zs", "ls", "ww");//注意:在这里需要使用到RichMapFunction获取广播变量

DataSet<String> result = data.map(new RichMapFunction() {

List> broadCastMap = new ArrayList>();

HashMap allMap = new HashMap();/**

* 这个方法只会执行一次

* 可以在这里实现一些初始化的功能

*

* 所以,就可以在open方法中获取广播变量数据

*

*/@Overridepublic void open(Configuration parameters) throws Exception {

super.open(parameters);//3:获取广播数据this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");for (HashMap map : broadCastMap) {

allMap.putAll(map);

}

}@Override

public String map(String value) throws Exception {

Integer age = allMap.get(value);

return value + "," + age;

}

}).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作

result.print();

}

}

2、IntCounter(累加器)

package xuwei.tech.batch.batchAPI;

import org.apache.mon.JobExecutionResult;

import org.apache.mon.accumulators.IntCounter;

import org.apache.mon.functions.MapFunction;

import org.apache.mon.functions.RichMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;/**

* 全局累加器

*

* counter 计数器

*

* 需求:

* 计算map函数中处理了多少数据

*

*

* 注意:只有在任务执行结束后,才能获取到累加器的值

*

*

*

* Created by xuwei.tech on /10/8.

*/public class BatchDemoCounter {

public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("a", "b", "c", "d");

DataSet<String> result = data.map(new RichMapFunction<String, String>() {//1:创建累加器private IntCounter numLines = new IntCounter();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);//2:注册累加器

getRuntimeContext().addAccumulator("num-lines",this.numLines);

}//int sum = 0;

@Overridepublic String map(String value) throws Exception {//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了

//sum++;

//System.out.println("sum:"+sum);this.numLines.add(1);

return value;

}

}).setParallelism(8);//result.print();

result.writeAsText("d:\\data\\count10");

JobExecutionResult jobResult = env.execute("counter");//3:获取累加器int num = jobResult.getAccumulatorResult("num-lines");

System.out.println("num:"+num);

}

}

3、cross(获取笛卡尔积)

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.JoinFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.CrossOperator;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;/**

* 获取笛卡尔积

*

* Created by xuwei.tech on /10/8.

*/public class BatchDemoCross {

public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//tuple2ArrayList<String> data1 = new ArrayList<>();

data1.add("zs");

data1.add("ww");//tuple2ArrayList<Integer> data2 = new ArrayList<>();

data2.add(1);

data2.add(2);DataSource<String> text1 = env.fromCollection(data1);DataSource<Integer> text2 = env.fromCollection(data2);CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);

cross.print();

}

}

4、registerCachedFile(Distributed Cache)

package xuwei.tech.batch.batchAPI;

import mons.io.FileUtils;

import org.apache.mon.functions.MapFunction;

import org.apache.mon.functions.RichMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import java.io.File;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;/**

* Distributed Cache

*/public class BatchDemoDisCache {

public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1:注册一个文件,可以使用hdfs或者s3上的文件

env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");

DataSource<String> data = env.fromElements("a", "b", "c", "d");

DataSet<String> result = data.map(new RichMapFunction<String, String>() {

private ArrayList<String> dataList = new ArrayList<String>();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);//2:使用文件File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");

List<String> lines = FileUtils.readLines(myFile);

for (String line : lines) {

this.dataList.add(line);System.out.println("line:" + line);

}

}

@Override

public String map(String value) throws Exception {//在这里就可以使用dataListreturn value;

}

});

result.print();

}

}

5、distinct

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.mon.functions.MapFunction;

import org.apache.mon.functions.MapPartitionFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.FlatMapOperator;

import org.apache.flink.util.Collector;

import java.util.ArrayList;

import java.util.Iterator;

public class BatchDemoDistinct {

public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<>();

data.add("hello you");

data.add("hello me");

DataSource<String> text = env.fromCollection(data);FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {

@Override

public void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.toLowerCase().split("\\W+");for (String word : split) {System.out.println("单词:"+word);

out.collect(word);

}

}

});

flatMapData.distinct()// 对数据进行整体去重

.print();

}

}

6、排序(first)

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.JoinFunction;

import org.apache.mon.operators.Order;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;/**

* 获取集合中的前N个元素

* Created by xuwei.tech on /10/8.

*/public class BatchDemoFirstN {

public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();

data.add(new Tuple2<>(2,"zs"));

data.add(new Tuple2<>(4,"ls"));

data.add(new Tuple2<>(3,"ww"));

data.add(new Tuple2<>(1,"xw"));

data.add(new Tuple2<>(1,"aw"));

data.add(new Tuple2<>(1,"mw"));

DataSourceString>> text = env.fromCollection(data);//获取前3条数据,按照数据插入的顺序text.first(3).print();System.out.println("==============================");//根据数据中的第一列进行分组,获取每组的前2个元素text.groupBy(0).first(2).print();System.out.println("==============================");//根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();System.out.println("==============================");//不分组,全局排序获取集合中的前3个元素,针对第一个元素升序,第二个元素倒序text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();

}

}

7、join

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.mon.functions.JoinFunction;

import org.apache.mon.functions.MapFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.FlatMapOperator;

import org.apache.flink.api.java.tuple.Tuple1;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.util.Collector;

import java.util.ArrayList;

public class BatchDemoJoin {

public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//tuple2

ArrayListString>> data1 = new ArrayList<>();

data1.add(new Tuple2<>(1,"zs"));

data1.add(new Tuple2<>(2,"ls"));

data1.add(new Tuple2<>(3,"ww"));//tuple2

ArrayListString>> data2 = new ArrayList<>();

data2.add(new Tuple2<>(1,"beijing"));

data2.add(new Tuple2<>(2,"shanghai"));

data2.add(new Tuple2<>(3,"guangzhou"));

DataSourceString>> text1 = env.fromCollection(data1);

DataSource> text2 = env.fromCollection(data2);

text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标

.equalTo(0)//指定第二个数据集中需要进行比较的元素角标

.with(new JoinFunction, Tuple2, Tuple3>() {@Override

public Tuple3 join(Tuple2 first, Tuple2 second)

throws Exception {

return new Tuple3<>(first.f0,first.f1,second.f1);

}

}).print();

System.out.println("==================================");//注意,这里用map和上面使用的with最终效果是一致的。

/*text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标

.equalTo(0)//指定第二个数据集中需要进行比较的元素角标

.map(new MapFunction,Tuple2>, Tuple3>() {

@Override

public Tuple3 map(Tuple2, Tuple2> value) throws Exception {

return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);

}

}).print();*/

}

}

8、outerJoin

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.JoinFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;/**

* 外连接

*

* 左外连接

* 右外连接

* 全外连接

*/public class BatchDemoOuterJoin {

public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//tuple2

ArrayListString>> data1 = new ArrayList<>();

data1.add(new Tuple2<>(1,"zs"));

data1.add(new Tuple2<>(2,"ls"));

data1.add(new Tuple2<>(3,"ww"));//tuple2

ArrayListString>> data2 = new ArrayList<>();

data2.add(new Tuple2<>(1,"beijing"));

data2.add(new Tuple2<>(2,"shanghai"));

data2.add(new Tuple2<>(4,"guangzhou"));

DataSourceString>> text1 = env.fromCollection(data1);

DataSource> text2 = env.fromCollection(data2);/**

* 左外连接

*

* 注意:second这个tuple中的元素可能为null

*

*/

text1.leftOuterJoin(text2)

.where(0)

.equalTo(0)

.with(new JoinFunction, Tuple2, Tuple3>() {@Override

public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {

if(second==null){

return new Tuple3<>(first.f0,first.f1,"null");

}else{

return new Tuple3<>(first.f0,first.f1,second.f1);

}

}

}).print();

System.out.println("=============================================================================");/**

* 右外连接

*

* 注意:first这个tuple中的数据可能为null

*

*/

text1.rightOuterJoin(text2)

.where(0)

.equalTo(0)

.with(new JoinFunction, Tuple2, Tuple3>() {@Override

public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {

if(first==null){

return new Tuple3<>(second.f0,"null",second.f1);

}

return new Tuple3<>(first.f0,first.f1,second.f1);

}

}).print();

System.out.println("=============================================================================");/**

* 全外连接

*

* 注意:first和second这两个tuple都有可能为null

*

*/

text1.fullOuterJoin(text2)

.where(0)

.equalTo(0)

.with(new JoinFunction, Tuple2, Tuple3>() {@Override

public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {

if(first==null){

return new Tuple3<>(second.f0,"null",second.f1);

}else if(second == null){

return new Tuple3<>(first.f0,first.f1,"null");

}else{

return new Tuple3<>(first.f0,first.f1,second.f1);

}

}

}).print();

}

}

9、union

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.JoinFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.UnionOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;/**

* Created by xuwei.tech on /10/8.

*/public class BatchDemoUnion {public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayListString>> data1 = new ArrayList<>();

data1.add(new Tuple2<>(1,"zs"));

data1.add(new Tuple2<>(2,"ls"));

data1.add(new Tuple2<>(3,"ww"));

ArrayListString>> data2 = new ArrayList<>();

data2.add(new Tuple2<>(1,"lili"));

data2.add(new Tuple2<>(2,"jack"));

data2.add(new Tuple2<>(3,"jessic"));

DataSourceString>> text1 = env.fromCollection(data1);

DataSource> text2 = env.fromCollection(data2);

UnionOperator> union = text1.union(text2);

union.print();

}

}

三、DataStream API之partition

介绍:

Rebalance:对数据集进行再平衡,重分区,消除数据倾斜

Hash-Partition:根据指定key的哈希值对数据集进行分区

partitionByHash()

Range-Partition:根据指定的key对数据集进行范围分区

.partitionByRange()

Custom Partitioning:自定义分区规则

自定义分区需要实现Partitioner接口

partitionCustom(partitioner, "someKey")

或者partitionCustom(partitioner, 0)

代码实现:
1、partitionByRange或partitionByHash

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.MapPartitionFunction;

import org.apache.mon.operators.Order;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

import java.util.ArrayList;

import java.util.Iterator;/**

* Hash-Partition

*

* Range-Partition

*

*

* Created by xuwei.tech on /10/8.

*/public class BatchDemoHashRangePartition {

public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayListInteger, String>> data = new ArrayList<>();

data.add(new Tuple2<>(1,"hello1"));

data.add(new Tuple2<>(2,"hello2"));

data.add(new Tuple2<>(2,"hello3"));

data.add(new Tuple2<>(3,"hello4"));

data.add(new Tuple2<>(3,"hello5"));

data.add(new Tuple2<>(3,"hello6"));

data.add(new Tuple2<>(4,"hello7"));

data.add(new Tuple2<>(4,"hello8"));

data.add(new Tuple2<>(4,"hello9"));

data.add(new Tuple2<>(4,"hello10"));

data.add(new Tuple2<>(5,"hello11"));

data.add(new Tuple2<>(5,"hello12"));

data.add(new Tuple2<>(5,"hello13"));

data.add(new Tuple2<>(5,"hello14"));

data.add(new Tuple2<>(5,"hello15"));

data.add(new Tuple2<>(6,"hello16"));

data.add(new Tuple2<>(6,"hello17"));

data.add(new Tuple2<>(6,"hello18"));

data.add(new Tuple2<>(6,"hello19"));

data.add(new Tuple2<>(6,"hello20"));

data.add(new Tuple2<>(6,"hello21"));

DataSourceInteger, String>> text = env.fromCollection(data);/*text.partitionByHash(0).mapPartition(new MapPartitionFunction, Tuple2>() {

@Override

public void mapPartition(Iterable> values, Collector> out) throws Exception {

Iterator> it = values.iterator();

while (it.hasNext()){

Tuple2 next = it.next();

System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);

}

}

}).print();*/text.partitionByRange(0).mapPartition(new MapPartitionFunctionInteger,String>, Tuple2>() {

@Overridepublic void mapPartition(Iterable> values, CollectorInteger, String>> out) throws Exception {

IteratorInteger, String>> it = values.iterator();

while (it.hasNext()){

Tuple2<Integer, String> next = it.next();System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);

}

}

}).print();

}

}

2、mapPartition

package xuwei.tech.batch.batchAPI;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.mon.functions.MapFunction;

import org.apache.mon.functions.MapPartitionFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapPartitionOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

import java.util.ArrayList;

import java.util.Iterator;/**

* Created by xuwei.tech on /10/8.

*/public class BatchDemoMapPartition {

public static void main(String[] args) throws Exception{//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<>();

data.add("hello you");

data.add("hello me");

DataSource<String> text = env.fromCollection(data);/*text.map(new MapFunction() {

@Override

public String map(String value) throws Exception {

//获取数据库连接--注意,此时是每过来一条数据就获取一次链接

//处理数据

//关闭连接

return value;

}

});*/

DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction() {@Override

public void mapPartition(Iterable values, Collector out) throws Exception {//获取数据库连接--注意,此时是一个分区的数据获取一次连接【优点,每个分区获取一次链接】

//values中保存了一个分区的数据

//处理数据Iterator<String> it = values.iterator();

while (it.hasNext()) {

String next = it.next();

String[] split = next.split("\\W+");for (String word : split) {

out.collect(word);

}

}//关闭链接

}

});

mapPartitionData.print();

}

}

四、DataSet API之Data Sink(数据落地)

介绍:

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法

print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

代码:
1、writeAsCsv

package xuwei.tech.batch;

import org.apache.mon.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;/**

* Created by xuwei.tech on /10/8.

*/public class BatchWordCountJava {

public static void main(String[] args) throws Exception{

String inputPath = "D:\\data\\file";String outPath = "D:\\data\\result";//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//获取文件中的内容

DataSource<String> text = env.readTextFile(inputPath);

DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);

counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);

env.execute("batch word count");

}public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{

public void flatMap(String value, Collector> out) throws Exception {

String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {

if(token.length()>0){

out.collect(new Tuple2(token,1));

}

}

}

}

}

致力于大数据,机器算法,人工智能学习,共享于有需要人士,希望共享内容对大家有用,欢迎大家转发关注。

flink流处理从0到1

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。