1.nc -l 9090
public static void main(String[] args) throws Exception {
try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(loadConfig("config.properties"));
DataStreamtext = env.socketTextStream("localhost", 9090, "\n");
DataStream stream = text.flatMap(new LineMapper());
//stream.addSink(new SinkTest());
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
//env.setParallelism(1);
env.execute("WordCount from Kafka data");
} catch (Exception e) {
System.out.println(e);
}
}
2.读取文本文件
public static void main(String[] args) throws Exception {
try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(loadConfig("config.properties"));
DataStreamtext = env.readTextFile("/Users/duanxiaoqiu/test.txt");
//DataStreamtext = env.socketTextStream("localhost", 9090, "\n");
DataStream stream = text.flatMap(new LineMapper());
//stream.addSink(new SinkTest());
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
//env.setParallelism(1);
env.execute("WordCount from Kafka data");
} catch (Exception e) {
System.out.println(e);
}
}
3.读取其他类型文件