1 常见Transformation操作
1.1 map和filter
public class MapDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<Long> numberStream
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator
<Long> dataStream
= numberStream
.map(new MapFunction<Long, Long>() {
@Override
public Long
map(Long value
) throws Exception
{
System
.out
.println("接受到了数据:"+value
);
return value
;
}
});
SingleOutputStreamOperator
<Long> filterDataStream
= dataStream
.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number
) throws Exception
{
return number
% 2 == 0;
}
});
filterDataStream
.print().setParallelism(1);
env
.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.2 flatMap,keyBy和sum
public class WindowWordCountJava {
public static void main(String
[] args
) throws Exception
{
int port
;
try{
ParameterTool parameterTool
= ParameterTool
.fromArgs(args
);
port
= parameterTool
.getInt("port");
}catch (Exception e
){
System
.err
.println("no port set,user default port 9988");
port
=9988;
}
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
String hostname
="10.126.88.226";
String delimiter
="\n";
DataStreamSource
<String> textStream
= env
.socketTextStream(hostname
, port
, delimiter
);
SingleOutputStreamOperator
<WordCount> wordCountStream
= textStream
.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String line
, Collector
<WordCount> out
) throws Exception
{
String
[] fields
= line
.split("\t");
for (String word
: fields
) {
out
.collect(new WordCount(word
, 1L
));
}
}
}).keyBy("word")
.timeWindow(Time
.seconds(2), Time
.seconds(1))
.sum("count");
wordCountStream
.print().setParallelism(1);
env
.execute("socket word count");
}
public static class WordCount{
public String word
;
public long count
;
public WordCount(){
}
public WordCount(String word
,long count
){
this.word
=word
;
this.count
=count
;
}
@Override
public String
toString() {
return "WordCount{" +
"word='" + word
+ '\'' +
", count=" + count
+
'}';
}
}
}
1.3 union
public class unionDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<Long> text1
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource
<Long> text2
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
DataStream
<Long> text
= text1
.union(text2
);
DataStream
<Long> num
= text
.map(new MapFunction<Long, Long>() {
@Override
public Long
map(Long value
) throws Exception
{
System
.out
.println("原始接收到数据:" + value
);
return value
;
}
});
DataStream
<Long> sum
= num
.timeWindowAll(Time
.seconds(2)).sum(0);
sum
.print().setParallelism(1);
String jobName
= unionDemo
.class.getSimpleName();
env
.execute(jobName
);
}
}
1.4 connect,conMap和conFlatMap
public class ConnectionDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<Long> text1
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource
<Long> text2
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator
<String> text2_str
= text2
.map(new MapFunction<Long, String>() {
@Override
public String
map(Long value
) throws Exception
{
return "str_" + value
;
}
});
ConnectedStreams
<Long, String> connectStream
= text1
.connect(text2_str
);
SingleOutputStreamOperator
<Object> result
= connectStream
.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object
map1(Long value
) throws Exception
{
return value
;
}
@Override
public Object
map2(String value
) throws Exception
{
return value
;
}
});
result
.print().setParallelism(1);
String jobName
= ConnectionDemo
.class.getSimpleName();
env
.execute(jobName
);
}
}
1.5 Split和Select
public class SplitDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<Long> text
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
SplitStream
<Long> splitStream
= text
.split(new OutputSelector<Long>() {
@Override
public Iterable
<String> select(Long value
) {
ArrayList
<String> outPut
= new ArrayList<>();
if (value
% 2 == 0) {
outPut
.add("even");
} else {
outPut
.add("odd");
}
return outPut
;
}
});
DataStream
<Long> evenStream
= splitStream
.select("even");
DataStream
<Long> oddStream
= splitStream
.select("odd");
DataStream
<Long> moreStream
= splitStream
.select("odd","even");
evenStream
.print().setParallelism(1);
String jobName
= SplitDemo
.class.getSimpleName();
env
.execute(jobName
);
}
}
2常见sink操作
2.1 print() / printToErr()
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
2.2 writeAsText()
public class WriteTextDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<Long> numberStream
= env
.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator
<Long> dataStream
= numberStream
.map(new MapFunction<Long, Long>() {
@Override
public Long
map(Long value
) throws Exception
{
System
.out
.println("接受到了数据:"+value
);
return value
;
}
});
SingleOutputStreamOperator
<Long> filterDataStream
= dataStream
.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number
) throws Exception
{
return number
% 2 == 0;
}
});
filterDataStream
.writeAsText("src\\output\\test").setParallelism(1);
env
.execute("StreamingDemoWithMyNoPralalleSource");
}
}
2.3 Flink提供的sink
Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache NiFi (source/sink)Twitter Streaming API (source)Google PubSub (source/sink)
2.4 自定义sink
<dependency>
<groupId>org
.apache
.bahir
</groupId
>
<artifactId>flink
-connector
-redis_2
.12</artifactId
>
<version>1.0</version
>
</dependency
>
自定义redis sink
public class SinkForRedisDemo {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
DataStreamSource
<String> text
= env
.socketTextStream("hadoop100", 9000, "\n");
DataStream
<Tuple2
<String, String>> l_wordsData
= text
.map(new MapFunction<String
, Tuple2
<String, String>>() {
@Override
public Tuple2
<String, String> map(String value
) throws Exception
{
return new Tuple2<>("l_words", value
);
}
});
FlinkJedisPoolConfig conf
= new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();
RedisSink
<Tuple2
<String, String>> redisSink
= new RedisSink<>(conf
, new MyRedisMapper());
l_wordsData
.addSink(redisSink
);
env
.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2
<String, String>> {
@Override
public String
getKeyFromData(Tuple2
<String, String> data
) {
return data
.f0
;
}
@Override
public String
getValueFromData(Tuple2
<String, String> data
) {
return data
.f1
;
}
@Override
public RedisCommandDescription
getCommandDescription() {
return new RedisCommandDescription(RedisCommand
.LPUSH
);
}
}
}
3 DataSet算子操作(Sparkcore)
3.1 source
基于文件 readTextFile(path) 基于集合 fromCollection(Collection)
3.2 transform
算子概览
Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapPartition>:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】 Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下 Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值 Aggregate:sum、max、min等 Distinct:返回一个数据集中去重之后的元素,data.distinct() Join:内连接 OuterJoin:外链接
Cross:获取两个数据集的笛卡尔积 Union:返回两个数据集的总和,数据类型需要一致 First-n:获取集合中的前N个元素 Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序
MapPartition
public class MapPartitionDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<String> data
= new ArrayList<>();
data
.add("hello you");
data
.add("hello me");
DataSource
<String> text
= env
.fromCollection(data
);
DataSet
<String> mapPartitionData
= text
.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable
<String> values
, Collector
<String> out
) throws Exception
{
Iterator
<String> it
= values
.iterator();
while (it
.hasNext()) {
String next
= it
.next();
String
[] split
= next
.split("\\W+");
for (String word
: split
) {
out
.collect(word
);
}
}
}
});
mapPartitionData
.print();
}
}
distinct
public class DistinctDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<String> data
= new ArrayList<>();
data
.add("you jump");
data
.add("i jump");
DataSource
<String> text
= env
.fromCollection(data
);
FlatMapOperator
<String, String> flatMapData
= text
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value
, Collector
<String> out
) throws Exception
{
String
[] split
= value
.toLowerCase().split("\\W+");
for (String word
: split
) {
System
.out
.println("单词:"+word
);
out
.collect(word
);
}
}
});
flatMapData
.distinct()
.print();
}
}
join
public class JoinDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<Tuple2
<Integer, String>> data1
= new ArrayList<>();
data1
.add(new Tuple2<>(1,"zs"));
data1
.add(new Tuple2<>(2,"ls"));
data1
.add(new Tuple2<>(3,"ww"));
ArrayList
<Tuple2
<Integer, String>> data2
= new ArrayList<>();
data2
.add(new Tuple2<>(1,"beijing"));
data2
.add(new Tuple2<>(2,"shanghai"));
data2
.add(new Tuple2<>(3,"guangzhou"));
DataSource
<Tuple2
<Integer, String>> text1
= env
.fromCollection(data1
);
DataSource
<Tuple2
<Integer, String>> text2
= env
.fromCollection(data2
);
text1
.join(text2
).where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2
<Integer,String>, Tuple2
<Integer,String>, Tuple3
<Integer,String,String>>() {
@Override
public Tuple3
<Integer, String, String> join(Tuple2
<Integer, String> first
, Tuple2
<Integer, String> second
)
throws Exception
{
return new Tuple3<>(first
.f0
,first
.f1
,second
.f1
);
}
}).print();
System
.out
.println("==================================");
}
}
OutJoin
public class OuterJoinDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<Tuple2
<Integer, String>> data1
= new ArrayList<>();
data1
.add(new Tuple2<>(1,"zs"));
data1
.add(new Tuple2<>(2,"ls"));
data1
.add(new Tuple2<>(3,"ww"));
ArrayList
<Tuple2
<Integer, String>> data2
= new ArrayList<>();
data2
.add(new Tuple2<>(1,"beijing"));
data2
.add(new Tuple2<>(2,"shanghai"));
data2
.add(new Tuple2<>(4,"guangzhou"));
DataSource
<Tuple2
<Integer, String>> text1
= env
.fromCollection(data1
);
DataSource
<Tuple2
<Integer, String>> text2
= env
.fromCollection(data2
);
text1
.leftOuterJoin(text2
)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2
<Integer,String>, Tuple2
<Integer,String>, Tuple3
<Integer,String,String>>() {
@Override
public Tuple3
<Integer, String, String> join(Tuple2
<Integer, String> first
, Tuple2
<Integer, String> second
) throws Exception
{
if(second
==null
){
return new Tuple3<>(first
.f0
,first
.f1
,"null");
}else{
return new Tuple3<>(first
.f0
,first
.f1
,second
.f1
);
}
}
}).print();
System
.out
.println("=============================================================================");
text1
.rightOuterJoin(text2
)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2
<Integer,String>, Tuple2
<Integer,String>, Tuple3
<Integer,String,String>>() {
@Override
public Tuple3
<Integer, String, String> join(Tuple2
<Integer, String> first
, Tuple2
<Integer, String> second
) throws Exception
{
if(first
==null
){
return new Tuple3<>(second
.f0
,"null",second
.f1
);
}
return new Tuple3<>(first
.f0
,first
.f1
,second
.f1
);
}
}).print();
System
.out
.println("=============================================================================");
text1
.fullOuterJoin(text2
)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2
<Integer,String>, Tuple2
<Integer,String>, Tuple3
<Integer,String,String>>() {
@Override
public Tuple3
<Integer, String, String> join(Tuple2
<Integer, String> first
, Tuple2
<Integer, String> second
) throws Exception
{
if(first
==null
){
return new Tuple3<>(second
.f0
,"null",second
.f1
);
}else if(second
== null
){
return new Tuple3<>(first
.f0
,first
.f1
,"null");
}else{
return new Tuple3<>(first
.f0
,first
.f1
,second
.f1
);
}
}
}).print();
}
}
Cross
public class CrossDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<String> data1
= new ArrayList<>();
data1
.add("zs");
data1
.add("ww");
ArrayList
<Integer> data2
= new ArrayList<>();
data2
.add(1);
data2
.add(2);
DataSource
<String> text1
= env
.fromCollection(data1
);
DataSource
<Integer> text2
= env
.fromCollection(data2
);
CrossOperator
.DefaultCross
<String, Integer> cross
= text1
.cross(text2
);
cross
.print();
}
}
First-n 和 SortPartition
import java
.util
.ArrayList
;
public class FirstNDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<Tuple2
<Integer, String>> data
= new ArrayList<>();
data
.add(new Tuple2<>(2,"zs"));
data
.add(new Tuple2<>(4,"ls"));
data
.add(new Tuple2<>(3,"ww"));
data
.add(new Tuple2<>(1,"xw"));
data
.add(new Tuple2<>(1,"aw"));
data
.add(new Tuple2<>(1,"mw"));
DataSource
<Tuple2
<Integer, String>> text
= env
.fromCollection(data
);
text
.first(3).print();
System
.out
.println("==============================");
text
.groupBy(0).first(2).print();
System
.out
.println("==============================");
text
.groupBy(0).sortGroup(1, Order
.ASCENDING
).first(2).print();
System
.out
.println("==============================");
text
.sortPartition(0,Order
.ASCENDING
).sortPartition(1,Order
.DESCENDING
).first(3).print();
}
}
partition
public class HashRangePartitionDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<Tuple2
<Integer, String>> data
= new ArrayList<>();
data
.add(new Tuple2<>(1,"hello1"));
data
.add(new Tuple2<>(2,"hello2"));
data
.add(new Tuple2<>(2,"hello3"));
data
.add(new Tuple2<>(3,"hello4"));
data
.add(new Tuple2<>(3,"hello5"));
data
.add(new Tuple2<>(3,"hello6"));
data
.add(new Tuple2<>(4,"hello7"));
data
.add(new Tuple2<>(4,"hello8"));
data
.add(new Tuple2<>(4,"hello9"));
data
.add(new Tuple2<>(4,"hello10"));
data
.add(new Tuple2<>(5,"hello11"));
data
.add(new Tuple2<>(5,"hello12"));
data
.add(new Tuple2<>(5,"hello13"));
data
.add(new Tuple2<>(5,"hello14"));
data
.add(new Tuple2<>(5,"hello15"));
data
.add(new Tuple2<>(6,"hello16"));
data
.add(new Tuple2<>(6,"hello17"));
data
.add(new Tuple2<>(6,"hello18"));
data
.add(new Tuple2<>(6,"hello19"));
data
.add(new Tuple2<>(6,"hello20"));
data
.add(new Tuple2<>(6,"hello21"));
DataSource
<Tuple2
<Integer, String>> text
= env
.fromCollection(data
);
text
.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2
<Integer,String>, Tuple2
<Integer,String>>() {
@Override
public void mapPartition(Iterable
<Tuple2
<Integer, String>> values
, Collector
<Tuple2
<Integer, String>> out
) throws Exception
{
Iterator
<Tuple2
<Integer, String>> it
= values
.iterator();
while (it
.hasNext()){
Tuple2
<Integer, String> next
= it
.next();
System
.out
.println("当前线程id:"+Thread
.currentThread().getId()+","+next
);
}
}
}).print();
}
}
自定义partition
见 DataStream定义一partition
3.3 sink
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取 writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法 print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
3.4 Flink之广播变量
广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。 用法 1:初始化数据 DataSet toBroadcast = env.fromElements(1, 2, 3) 2:广播数据 .withBroadcastSet(toBroadcast, “broadcastSetName”); 3:获取数据 Collection broadcastSet = getRuntimeContext().getBroadcastVariable(“broadcastSetName”); 注意: 1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束 2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
public class BroadCastDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
ArrayList
<Tuple2
<String, Integer>> broadData
= new ArrayList<>();
broadData
.add(new Tuple2<>("zs",18));
broadData
.add(new Tuple2<>("ls",20));
broadData
.add(new Tuple2<>("ww",17));
DataSet
<Tuple2
<String, Integer>> tupleData
= env
.fromCollection(broadData
);
DataSet
<HashMap
<String, Integer>> toBroadcast
= tupleData
.map(new MapFunction<Tuple2
<String, Integer>, HashMap
<String, Integer>>() {
@Override
public HashMap
<String, Integer> map(Tuple2
<String, Integer> value
) throws Exception
{
HashMap
<String, Integer> res
= new HashMap<>();
res
.put(value
.f0
, value
.f1
);
return res
;
}
});
DataSource
<String> data
= env
.fromElements("zs", "ls", "ww");
DataSet
<String> result
= data
.map(new RichMapFunction<String, String>() {
List
<HashMap
<String, Integer>> broadCastMap
= new ArrayList<HashMap
<String, Integer>>();
HashMap
<String, Integer> allMap
= new HashMap<String, Integer>();
@Override
public void open(Configuration parameters
) throws Exception
{
super.open(parameters
);
this.broadCastMap
= getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map
: broadCastMap
) {
allMap
.putAll(map
);
}
}
@Override
public String
map(String value
) throws Exception
{
Integer age
= allMap
.get(value
);
return value
+ "," + age
;
}
}).withBroadcastSet(toBroadcast
, "broadCastMapName");
result
.print();
}
}
Flink之Counter(计数器)
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器
(Accumulator
)实现
IntCounter
, LongCounter 和 DoubleCounter
用法
1:创建累加器
private IntCounter numLines
= new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines
);
3:使用累加器
this.numLines
.add(1);
4:获取累加器的结果
myJobExecutionResult
.getAccumulatorResult("num-lines")
public class CounterDemo {
public static void main(String
[] args
) throws Exception
{
ExecutionEnvironment env
= ExecutionEnvironment
.getExecutionEnvironment();
DataSource
<String> data
= env
.fromElements("a", "b", "c", "d");
DataSet
<String> result
= data
.map(new RichMapFunction<String, String>() {
private IntCounter numLines
= new IntCounter();
@Override
public void open(Configuration parameters
) throws Exception
{
super.open(parameters
);
getRuntimeContext().addAccumulator("num-lines",this.numLines
);
}
@Override
public String
map(String value
) throws Exception
{
this.numLines
.add(1);
return value
;
}
}).setParallelism(8);
result
.writeAsText("d:\\data\\mycounter");
JobExecutionResult jobResult
= env
.execute("counter");
int num
= jobResult
.getAccumulatorResult("num-lines");
System
.out
.println("num:"+num
);
}
}
4 有状态的流
单词计数案例再次演示
public class WordCount {
public static void main(String
[] args
) throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource
<String> dataStream
= env
.socketTextStream("localhost", 8888);
SingleOutputStreamOperator
<Tuple2
<String, Long>> wordOneStream
= dataStream
.flatMap(new FlatMapFunction<String
, Tuple2
<String, Long>>() {
@Override
public void flatMap(String line
, Collector
<Tuple2
<String, Long>> out
) throws Exception
{
String
[] fields
= line
.split(",");
for (String word
: fields
) {
out
.collect(Tuple2
.of(word
, 1L
));
}
}
});
SingleOutputStreamOperator
<Tuple2
<String, Long>> result
= wordOneStream
.keyBy(0)
.sum(1);
result
.print();
env
.execute(WordCount
.class.getSimpleName());
}
}
访问地址8081端口
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
数据流
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
修改一下并行度为4