一、pom
<properties>
<maven
.compiler
.source
>1.8</maven
.compiler
.source
>
<maven
.compiler
.target
>1.8</maven
.compiler
.target
>
<scala.version>2.12.10</scala
.version
>
<spark.version>3.0.0</spark
.version
>
<hadoop.version>3.2.1</hadoop
.version
>
<encoding>UTF
-8</encoding
>
</properties
>
<dependencies>
<dependency>
<groupId>org
.scala
-lang
</groupId
>
<artifactId>scala
-library
</artifactId
>
<version>$
{scala
.version
}</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.spark
</groupId
>
<artifactId>spark
-core_2
.12</artifactId
>
<version>$
{spark
.version
}</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-client
</artifactId
>
<version>3.2.1</version
>
</dependency
>
</dependencies
>
<build>
<pLuginManagement>
<plugins>
<!--编译scala的插件
-- ->
<pLugin>
<groupId>net
.alchim31
.maven
</groupId
>
<artifactId>scala
-maven
-plugin
</artifactId
>
<version>3.2.2</version
>
</plugin
>
<!--编译java的插件
-->
<pLugin>
<groupId>org
.apache
.maven
.plugins
</groupId
>
<artifactId>maven
-compiler
-plugin
</artifactId
>
<version>3.5.1</version
>
</plugin
>
</plugins
>
</pLuginManagement
>
<plugins>
<plugin>
<groupId>net
.alchim31
.maven
</groupId
>
<artifactId>scala
-maven
-plugin
</artifactId
>
<executions>
<execution>
<id>scala
-compile
-first
</id
>
<phase>process
-resources
</phase
>
<goals>
<goal>add
-source
</goal
>
<goal>compile
</goal
>
</goals
>
</execution
>
<execution>
<id>scala
-test
-compile
</id
>
<phase>process
-test
-resources
</phase
>
<goals>
<goal>testCompile
</goal
>
</goals
>
</execution
>
</executions
>
</plugin
>
<plugin>
<groupId>org
.apache
.maven
.plugins
</groupId
>
<artifactId>maven
-shade
-plugin
</artifactId
>
<version>3.1.1</version
>
<executions>
<execution>
<phase>package</phase
>
<goals>
<goal>shade
</goal
>
</goals
>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact
>
<excludes>
<exclude>META
-INF
二、Spark3.0-JavaAPI程序
实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。
2.1 java匿名实现类
SparkConf conf
= SparkConf().setAppName("JavaWordCount");
JavaSparkContext jsc
= new JavaSparkContext(conf
);
JavaRDD
<String> lines
= jsc
.textFile(args
[0]);
JavaRDD
<String> words
= lines
.flatMap(new FlatMapFuntion<String,String>(){
@Override
public Iterator
<String> call(String line
) throws Expection
{
return Arrays
.asList(line
.split(" ")).iterator
;
}
});
JavaPairRDD
<String,Integer> wordAndOne
= words
.maoToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2
<String,Integer> call(String words
) throws Exception
{
return Tuple2
.apply(word
,1);
}
});
JavaPairRDD
<String,Integer> reduced
= wordAndOne
.reduceByKey(new Function2<Integer,Integer,Integer>(){
@Override
public Integer
call(Integer v1
,Integer v2
) throws Exception
{
return v1
+v2
;
}
});
JavaPairRDD
<Integer,String> swapped
= reduced
.mapToPair(new PairFunction<Tuple2
<String,Integer>,Integer
,String
>(){
@Override
public Tuple2
<Integer,String> call(Tuple2
<String,Integer> tp
) throws Exception
{
return tp
.swap();
}
});
JavaPairRDD
<Integer,String> sorted
= swapped
.sortedByKey(false);
JavaPairRDD
<String,Integer> result
= sorted
.mapToPair(new PairFunction<Tuple2
<String,Integer>,String
,Integer
>(){
@Override
public Tuple2
<String,Integer> call(Tuple2
<Integer,String> tp
) throws Exception
{
return tp
.swap();
}
});
result
.saveAsTextFile(args
[1]);
jsc
.stop();
2.2 Lambda表达式实现
SparkConf conf
= SparkConf().setAppName("LambdaJavaWordCount");
JavaSparkContext jsc
= new JavaSparkContext(conf
);
JavaRDD
<String> lines
= jsc
.textFile(args
[0]);
JavaRDD
<String> words
= lines
.flatMap(line
-> Arrays
.stream(line
.split(" ")).iterator());
JavaPairRDD
<String,Integer> wordAndOne
= words
.mapToPair(w
-> Tuple2
.apply(w
,1));
JavaPairRDD
<String,Integer> reduced
= wordAndOne
.reduceByKey((i
,j
) -> i
+j
);
JavaPairRDD
<String,Integer> sorted
= reduced
.mapToPair(tp
-> tp
.swap()).sortByKey(false)
.mapToPair(tp
-> tp
.swap());
sorted
.saveAsTextFile(args
[1]);
jsc
.stop();
2.3 程序打包
2.4 上传到Linux
2.5 启动HDFS
hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode
2.6 Spark执行jar包
bin
/spark
-3.0.0-bin
-hadoop3
.2/bin
/spark
-submit
--master spark
://hadoop1
:7077,hadoop2
:7077,hadoop3
:7077 --executor
-memory
1g
--total
-executor
-cores
5 --class com.wang.spark.LambdaJavaWordCount /root
/spark
-in
-active
-1.0-SNAPSHOT
.jar hdfs
://hadoop1
:9000/wc hdfs
://hadoop1
:9000/out
2.7 查看结果
hdfs -dfs -cat /out/*
三、本机执行
本地测试,不会建立集群链接,再本地的一个进程运行。
SparkConf conf
= SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]");
JavaSparkContext jsc
= new JavaSparkContext(conf
);
JavaRDD
<String> lines
= jsc
.textFile(args
[0]);
JavaRDD
<String> words
= lines
.flatMap(line
-> Arrays
.stream(line
.split(" ")).iterator());
JavaPairRDD
<String,Integer> wordAndOne
= words
.mapToPair(w
-> Tuple2
.apply(w
,1));
JavaPairRDD
<String,Integer> reduced
= wordAndOne
.reduceByKey((i
,j
) -> i
+j
);
JavaPairRDD
<String,Integer> sorted
= reduced
.mapToPair(tp
-> tp
.swap()).sortByKey(false)
.mapToPair(tp
-> tp
.swap());
sorted
.saveAsTextFile(args
[1]);
jsc
.stop();
运行时候,传入参数本地数据或者hdfs的数据。 如果出现这个错误,需要将pom中的scala的****放开 或者全部读取本机的文件。 执行结果如下: