100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > Apache Flink 读取本地文件 处理数据 导入ES

Apache Flink 读取本地文件 处理数据 导入ES

时间:2021-06-16 17:48:25

相关推荐

Apache Flink 读取本地文件 处理数据 导入ES

需求

本地有一份文件使用Flink读取本地数据源处理数据,导入ES中提交Flink作业

环境

Flink :1.8.2Elasticsearch:6.2.3JDK:1.8

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.vincent</groupId><artifactId>hadoop-hdfs</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.8.2</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>2.8.5</hadoop.version><piler.source>${java.version}</piler.source><piler.target>${java.version}</piler.target></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><!-- Elasticsearch 6.x --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.vincent.Test</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><!-- This profile helps to make things run out of the box in IntelliJ --><!-- Its adds Flink's core classes to the runtime class path. --><!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles></project>

自定义一个工具类ElasticsearchSinkUtil.java

package com.vincent;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;import org.apache.flink.util.ExceptionUtils;import org.apache.http.HttpHost;import org.elasticsearch.ElasticsearchException;import org.elasticsearch.ElasticsearchParseException;import org.elasticsearch.action.ActionRequest;import mon.util.concurrent.EsRejectedExecutionException;import .SocketTimeoutException;import java.util.ArrayList;import java.util.List;public class ElasticSearchSinkUtil {public static List<HttpHost> getEsAddresses(String hosts) {String[] hostList = hosts.split(",");List<HttpHost> addresses = new ArrayList<>();for (String host : hostList) {String[] ip_port = host.split(":");String ip = ip_port[0];String port = ip_port[1];addresses.add(new HttpHost(ip, Integer.parseInt(port)));}return addresses;}public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {@Overridepublic void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {String description = actionRequest.getDescription();System.out.println("----------");System.out.println(description);System.out.println("===========");if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {System.out.println("超时异常");} else if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {// 异常1: ES队列满了(Reject异常),放回队列System.out.println("ES队列满了");requestIndexer.add(actionRequest);} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {System.out.println("parse异常" + description);} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchException.class).isPresent()) {System.out.println("出现异常");}}});data.addSink(esSinkBuilder.build()).setParallelism(parallelism);}}

Main方法

package com.vincent;import com.alibaba.fastjson.JSONObject;import org.apache.mon.functions.MapFunction;import org.apache.mon.functions.RuntimeContext;import org.apache.flink.api.java.tuple.Tuple7;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;import org.apache.http.HttpHost;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.Requests;import mon.xcontent.*;import java.io.IOException;import java.util.List;public class Test {public static void main(String[] args) throws Exception {String propertiesPath = args[0];ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesPath);List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get("es.hosts"));int bulk_size = parameterTool.getInt("es.bulk.flushMaxAction");int sinkParallelism = parameterTool.getInt("es.sink.parallelism");String rawPath = parameterTool.get("rawPath");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile(rawPath);SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple7<String, String, String, String, String, String, String>>() {@Overridepublic Tuple7<String, String, String, String, String, String, String> map(String s) throws Exception {String[] splits = s.split("\t");String field1= splits[0];String field2 = splits[1];String field3= splits[2];String field4= splits[3];String field5= splits[4];String field6= splits[5];String field7= splits[6];return new Tuple7<>(uid, timestamp, desc_info, related_identity, record_num, desc_type, date);}});ElasticSearchSinkUtil.addSink(esAddresses, bulk_size, sinkParallelism, map, new ElasticsearchSinkFunction<Tuple7<String, String, String, String, String, String, String>>() {@Overridepublic void process(Tuple7<String, String, String, String, String, String, String> data, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {IndexRequest indexRequest = null;try {indexRequest = createIndexRequest(data);} catch (IOException e) {e.printStackTrace();}requestIndexer.add(indexRequest);}public IndexRequest createIndexRequest(Tuple7<String, String, String, String, String, String, String> data) throws IOException {JSONObject jsonObject = new JSONObject();jsonObject.put("field1", data.f0);jsonObject.put("field2", data.f1);jsonObject.put("field3", JSONObject.parseObject(data.f2));jsonObject.put("field4", JSONObject.parseObject(data.f3));jsonObject.put("field5", data.f4);jsonObject.put("field6", data.f5);jsonObject.put("field7", data.f6);return Requests.indexRequest().index("my_index").type("type").source(jsonObject.toString(), XContentType.JSON);}});// map.setParallelism(1).print();env.execute("Test");}}

自定义一个配置文件

可以灵活地修改配置文件:

es.hosts=swarm-manager:9200,swarm-worker1:9200,swarm-worker2:9200es.bulk.flushMaxAction=200es.sink.parallelism=1# hdfs: hdfs://swarm-manager:9001/text/000000_0, windows: E:/test/hello.txt# rawPath=hdfs://swarm-manager:9001/text/000000_0rawPath=E:/test/000000_0

打包部署

使用mvn pakage打包应用,将生成的hadoop-hdfs-1.0-SNAPSHOT-shaded.jar拷贝至服务器中。

启动Flink集群

使用命令./flink-1.8.2/bin/start-cluster.bat启动集群

运行作业

使用命令:flink run ./hadoop-hdfs-1.0-SNAPSHOT-shaded.jar ./flink-es.properties就可以运行该作业了

在浏览器中输入http://服务器IP:8081可以查看作业运行情况

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。