100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > flink读取不到文件_Flink流处理API——Source

flink读取不到文件_Flink流处理API——Source

时间:2024-06-28 16:07:02

相关推荐

flink读取不到文件_Flink流处理API——Source

本文主要从以下几个方面介绍Flink的流处理API——Source

一、从集合中读取数据

二、从文件中读取数据

三、从Kafka中读取数据

四、自定义Source

数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。

这三部分在Flink中分别被称为Source、Transform和Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink从Kafka0.8中读取数据报Failed to instantiate SLF4J LoggerFactory Reported exception)

org.apache.flink flink-scala_2.11 1.7.2org.apache.flink flink-streaming-scala_2.11 1.7.2providedorg.apache.flink flink-clients_2.11 1.7.2org.apache.flink flink-connector-kafka-0.8_2.11 1.7.2org.slf4j slf4j-api 1.7.22org.slf4j slf4j-log4j12 1.7.22org.apache.bahir flink-connector-redis_2.11 1.0mysql mysql-connector-java 5.1.38

一、从集合中读取数据

package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 样例类,传感器ID,时间戳,温度 (后面都使用这个样例类作为数据的类型)case class SensorReading(id: String, timestamo: Long, temperature: Double){ override def toString: String = { id+":"+ timestamo.toString + "," + temperature }}/***从集合中读取数据*/object Sensor { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val stream1: DataStream[SensorReading] = environment.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.72094571228),SensorReading("sensor_10", 1547718205, 38.101067604893444) )) stream1.print("Stream1:").setParallelism(1) environment.execute() }}

二、从文件中读取数据

package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 样例类,传感器ID,时间戳,温度case class SensorReading(id: String, timestamo: Long, temperature: Double){ override def toString: String = { id+":"+ timestamo.toString + "," + temperature }}/***从文件中读取数据*/object Sensor { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream2: DataStream[String] = environment.readTextFile( "D:ScalaCodeFlinkTestsrcmainesourcessensor.txt") stream2.print("Stream2:").setParallelism(1) environment.execute() }}

三、从Kafka中读取数据

Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092

zookeeper集群:slave2:2181,slave3:2181,slave3:2181

package xxximport java.util.Propertiesimport org.apache.mon.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08/** * 从kafka中读取数据 */object ReadDataFromKafka { def main(args: Array[String]): Unit = { // 设置读取的kafka参数 val properties = new Properties() properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092") properties.setProperty("group.id", "flink_group1") properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181") properties.setProperty("key.deserializer", "org.mon.serialization.StringDeserializer") // key的反序列化 properties.setProperty("value.deserializer", "org.mon.serialization.StringDeserializer") // value的反序列化 properties.setProperty("auto.offset.reset", "latest") // 偏移量 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 链接kafka读取数据 val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",new SimpleStringSchema(), properties)) kafkaStream.print().setParallelism(1) environment.execute("readDataFromKafka") }}

四、自定义Source

package xxximport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala._import scala.util.Random/** * 自定义Source */object ReadDataFromMySource { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[String] = environment.addSource(new MySource()) dataStream.print().setParallelism(1) environment.execute("MySource")}}class MySource extends SourceFunction[String]{ // 表示数据源是否正常运行 var running:Boolean = true // 数据正常生成 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val random = new Random() var temp = 1.to(10).map(i => (i, 100 + random.nextGaussian() * 100) ) while (running){// 更新数值temp = temp.map( t=>(t._1, t._2 + random.nextGaussian()))// 当前时间val curTime = System.currentTimeMillis()temp.foreach(t=>{ sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)})Thread.sleep(500) } } // 取消数据生成 override def cancel(): Unit ={ running = false }}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。