MapReduce编程实践

    科技2024-03-18  96

    MapReduce编程实践

    重要知识点:

    MapReduce是一种分布式并行编程模型,是Hadoop核心子项目之一,如果已经安装了Hadoop,就不需要另外安装MapReduce。主要的理论知识点包括:MapReduce概述、MapReduce的工作流程,WordCount实例分析,MapReduce的具体应用。掌握MapReduce的核心思想、编程模型、工作原理和实例分析。MapReduce的程序可以用Eclipse编译运行或使用命令行编译打包运行,本实验使用Eclipse编译运行MapReduce程序。
    实验内容与步骤:

    一、Eclipse的安装与配置 该部分实验前面已经做过,省略 https://blog.csdn.net/weixin_43640161/article/details/108691921

    二、Hadoop-Eclipse-Plugin的安装与配置

    安装 Hadoop-Eclipse-Plugin

    要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 hadoop-eclipse-plugin 下载地址:http://pan.baidu.com/s/1i4ikIoP

    下载后,将 release 中的 hadoop-eclipse-kepler-plugin-2.6.0.jar (还提供了 2.2.0 和 2.4.1 版本)复制到 Eclipse 安装目录的 plugins 文件夹中

    终端命令:sudo mv hadoop-eclipse-plugin-2.6.0.jar /opt/eclipse/plugins/

    运行 eclipse -clean 重启 Eclipse 即可(添加插件后只需要运行一次该命令,以后按照正常方式启动就行了)。

    终端命令:eclipse -clean

    提示:如果对命令不是很熟悉,也可以手动找到jar包直接拷贝过去,然后找到安装文件目录双击重新启动eclipse完成第一步操作。

    配置 Hadoop-Eclipse-Plugin

    在继续配置前请确保已经开启了 Hadoop。 终端命令: start-all.sh

    启动 Eclipse 后就可以在左侧的Project Explorer中看到 DFS Locations(若看到的是 welcome 界面,点击左上角的 x 关闭就可以看到了。

    没用显示的话,可以查看 看到以上画面也算成功

    插件需要进一步的配置。

    第一步:选择 Window 菜单下的 Preference。

    此时会弹出一个窗体,窗体的左侧会多出 Hadoop Map/Reduce 选项,点击此选项,选择 Hadoop 的安装目录(如/bigdata/hadoop,如果不好选择目录,直接输入就行)。

    第二步:切换 Map/Reduce 开发视图,选择 Window 菜单下选择 Window -> Perspective -> Open Perspective -> Other,弹出一个窗体,从中选择 Map/Reduce 选项即可进行切换。

    第三步:建立与 Hadoop 集群的连接,点击 Eclipse软件右下角的 Map/Reduce Locations 面板,在面板中单击右键,选择 New Hadoop Location。

    在弹出来的 General 选项面板中,General 的设置要与 Hadoop 的配置一致。一般两个 Host 值是一样的,如果是伪分布式,填写 localhost 即可,另外我使用的Hadoop伪分布式配置,设置 fs.defaultFS 为 hdfs://localhost:9000,则 DFS Master 的 Port 要改为 9000。Map/Reduce(V2) Master 的 Port 用默认的即可,Location Name 随意填写。

    最后的设置如下图所示:

    Advanced parameters 选项面板是对 Hadoop 参数进行配置,实际上就是填写 Hadoop 的配置项(/bigdata/hadoop3.1.1/etc/hadoop中的配置文件),如我配置了 hadoop.tmp.dir ,就要进行相应的修改。但修改起来会比较繁琐,我们可以通过复制配置文件的方式解决(下面会说到)。

    总之,我们只要配置 General 就行了,点击 finish,Map/Reduce Location 就创建好了。

    三、在 Eclipse 中操作 HDFS 中的文件

    配置好后,点击左侧 Project Explorer 中的 MapReduce Location (点击三角形展开)就能直接查看 HDFS 中的文件列表了(HDFS 中要有文件,如下图是 WordCount 的输出结果),双击可以查看内容,右键点击可以上传、下载、删除 HDFS 中的文件,无需再通过繁琐的 hdfs dfs -ls 等命令进行操作了。 以下input/myLocalFile.txt文件记录了文件结果。

    如果无法查看,可右键点击 Location 尝试 Reconnect 或重启 Eclipse。

    Tips: HDFS 中的内容变动后,Eclipse 不会同步刷新,需要右键点击 Project Explorer中的 MapReduce Location,选择 Refresh,才能看到变动后的文件。

    四、在 Eclipse 中创建 MapReduce 项目 点击 File 菜单,选择 New -> Project…:选择 Map/Reduce Project,点击 Next。

    填写 Project name 为 WordCount 即可,点击 Finish 就创建好了项目。

    此时在左侧的 Project Explorer 就能看到刚才建立的项目了。

    接着右键点击刚创建的 WordCount 项目src,选择 New -> Class,需要填写两个地方:在 Package 处填写 org.apache.hadoop.examples;在 Name 处填写 WordCount。

    创建 Class 完成后,在 Project 的 src 中就能看到 WordCount.java 这个文件。将如下 WordCount 的代码复制到该文件中。

    package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public WordCount() { } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); if(otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true)?0:1); } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public IntSumReducer() { } public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; IntWritable val; for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { val = (IntWritable)i$.next(); } this.result.set(sum); context.write(key, this.result); } } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public TokenizerMapper() { } public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } }

    五、通过 Eclipse 运行 MapReduce

    在运行 MapReduce 程序前,还需要执行一项重要操作(也就是上面提到的通过复制配置文件解决参数设置问题):将 /bigadata/hadoop3.1.1/etc/hadoop 中将有修改过的配置文件(如伪分布式需要 core-site.xml 和 hdfs-site.xml),以及 log4j.properties 复制到 WordCount 项目下的 src 文件夹(~/workspace/WordCount/src)中:

    终端命令: cp core-site.xml ~/workspace/WordCount/src cp hdfs-site.xml ~/workspace/WordCount/src cp log4j.properties ~/workspace/WordCount/src

    提示:上述操作也可以手动找到相关文件,直接拷贝粘贴过去,然后刷新即可。 没有复制这些文件的话程序将无法正确运行,本实验最后再解释为什么需要复制这些文件。

    复制完成后,务必右键点击 WordCount 选择 refresh 进行刷新(不会自动刷新,需要手动刷新),可以看到文件结构如下所示:

    点击工具栏中的 Run 图标,或者右键点击 Project Explorer 中的 WordCount.java,选择 Run As -> Run on Hadoop,就可以运行 MapReduce 程序了。不过由于没有指定参数,运行时会提示 “Usage: wordcount “,需要通过Eclipse设定一下运行参数。

    右键点击刚创建的 WordCount.java,选择 Run As -> Run Configurations,在此处可以设置运行时的相关参数(如果 Java Application 下面没有 WordCount,那么需要先双击 Java Application)。切换到 “Arguments” 栏,在 Program arguments 处填写 “input output” 就可以了。

    或者也可以直接在代码中设置好输入参数。可将代码 main() 函数的 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 改为: // String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); String[] otherArgs=new String[]{“input”,“output”}; /* 直接设置输入参数 */ 至此,你就可以使用 Eclipse 方便的进行 MapReduce程序的开发了。

    六、单词统计测试

    新建一个文件input,并写入4行单词,如下: 首先,使用vim编辑器,在本地Linux文件系统的“/home/hadoop/”目录下创建一个文件input,里面可以随意输入一些单词,比如,输入如下四行:

    hello hadoop hello hbase hello mapreduce hello hdfs hbase

    然后,可以使用如下命令把本地文件系统的“/home/hadoop/input”上传到HDFS中的当前用户目录的根目录下,也就是上传到HDFS的“/user/hadoop/”目录下。然后查看是否上传成功。

    终端命令: hdfs dfs -put ./input /user/hadoop(这里的hadoop是我的用户名) hdfs dfs -ls -R hdfs dfs -text input 注意:如果之前实验遗留的input文件夹,会使上传出错。要提前删除。 终端命令:hdfs dfs -rm -r /user/hadoop/input 重新运行WordCount.java文件,如果提示成功,刷新 DFS Location 后就能看到输出的 output 文件夹。(建议重启Eclipse)

    七、在 Eclipse 中运行 MapReduce 程序会遇到的问题

    在使用 Eclipse 运行 MapReduce 程序时,会读取 Hadoop-Eclipse-Plugin 的 Advanced parameters 作为 Hadoop 运行参数,如果我们未进行修改,则默认的参数其实就是单机(非分布式)参数,因此程序运行时是读取本地目录而不是 HDFS 目录,就会提示 Input 路径不存在。

    Exception in thread “main” org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/home/hadoop/workspace/WordCountProject/input 所以我们需要将配置文件复制到项目中的 src 目录,来覆盖这些参数。让程序能够正确运行。

    log4j 用于记录程序的输出日记,需要 log4j.properties 这个配置文件,如果没有复制该文件到项目中,运行程序后在 Console 面板中会出现警告提示:

    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 虽然不影响程序的正确运行的,但程序运行时无法看到任何提示消息(只能看到出错信息)。

    到了这一步,本次实验就完成了,你今天学会了吗?

    Processed: 0.009, SQL: 8