之前详解了Flink,下面看看Flink的使用示例,主要分为三类Flink使用示例。
1.Flink流式处理示例
下面是一个简单的 Flink 流式处理示例,它在5秒的窗口中计算来自web套接字的字数。
示例如下:
public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
2.批处理示例
Flink 同样支持批处理,例如对静态数据集进行处理,可以使用 DataSet API。
下面是一个简单的 Flink 批处理示例:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("input.txt"); DataSet<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split("\\s")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }) .groupBy(0) .sum(1); wordCounts.print();
3.Flink CEP 示例
link 支持复杂事件处理(CEP),例如通过定义模式来识别事件序列中的有趣事件。
CEP 的意思是复杂事件处理,例如:起床–>洗漱–>吃饭–>上班等一系列串联起来的事件流,形成的模式称为 CEP。
下面是一个简单的 Flink CEP 示例:
DataStream<Event> inputStream = ...; Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { public boolean filter(Event value) { return value.getName().equals("start"); } }).followedBy("middle").where(new SimpleCondition<Event>() { public boolean filter(Event value) { return value.getName().equals("middle"); } }).followedBy("end").where(new SimpleCondition<Event>() { public boolean filter(Event value) { return value.getName().equals("end"); } }); DataStream<Event> resultStream = CEP.pattern(inputStream, pattern).select(new PatternSelectFunction<Event, Event>() { public Event select(Map<String, List<Event>> pattern) { return pattern.get("end").get(0); } });
以上就是Flink常见的三种Flin使用示例,详细的Flink教程,请查看:Flink教程(万字图文全面详解)
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》