经验之谈:让老采购商告知哪里是栾树一站购苗的好去处

小说:经验之谈:让老采购商告知哪里是栾树一站购苗的好去处作者:通密更新时间:2019-05-25字数:19922

朱标面色一沉,朱允?刹幻魉?裕?胩┝?τ檬忠焕??蜕?溃骸靶⊥跻???褪橇址纭!

小叶女贞常规高度是多少?

韩?瑶还没搞明白怎么回事呢,紧接着就听到“砰”的一声闷响,窗口的玻璃忽然被打裂,而他们之前所在的位置,也被打出一个枪眼来。
“既然打完了那么就听我说一下,我有一件比较重要的事情要告诉大家。”娜洁希坦说道。

“你可以这么理解,不过现在我也算是客人一个,老板娘你用这样的语气我可以理解为你是在赶客人,或者说我是众多客人里面让你看我不爽的一个。”刘皓暗赞一声,不愧是马叮当,冷静从容,自信,骄傲,这种骄傲并不是形容人骄傲的贬义词,用来形容马叮当则是变成了褒义。

本文基于Windows平台Eclipse,以使用MapReduce编程模型统计文本文件中相同单词的个数来详述了整个编程流程及需要注意的地方。不当之处还请留言指出。

前期准备

hadoop集群的搭建

编程环境搭建

1、将官网下载的hadoop安装包解压,并记住下图所示的目录

2、创建java project,右键工程--->build path--->Configure build path

3、进行如下图操作

4、新建MapReduce编程要使用的环境包,如下图操作

5、将下图所示的commom包以及lib文件夹下所有的包导入

6、将下图所示的hdfs包和lib文件夹下所有的包导入

7、将下图所示的包以及lib文件夹下所有的包导入

8、将下图所示的包以及lib文件夹下的所有包导入

9、将新建的好的hadoop_mr库导入

编写map阶段的map函数

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 第一个参数:默认情况下是mapreduce框架所读文件的起始偏移量,类型为Long,在mr框架中类型为LongWritable
 * 第二个参数:默认情况下是框架所读到的内容,类型为String,在mr框架中为Text
 * 第三个参数:框架输出数据的key,在该单词统计的编程模型中输出的是单词,类型为String,在mr框架中为Text
 * 第四个参数:框架输出数据的value,在此是每个所对应单词的个数,类型为Integer,在mr框架中为IntWritable
 * @author Administrator
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//  map阶段的逻辑
//  对每一行输入数据调用一次我们自定义的map()方法
    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
//      将传入的每一行数据转为String
        String line = value.toString();
//      根据空格将单词划分
        String[] words = line.split(" ");
        
        for(String word: words){
            //将word作为输出的key,1作为输出的value    <word,1>
            context.write(new Text(word), new IntWritable(1));
        }
//      mr框架不会在map处理完一行数据就发给reduce,会先将结果收集
    }
}

编写reduce阶段的reduce函数

package com.cnblogs._52mm;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * reduce的输入是map的输出
 * 第一个和第二个参数分别是map的输出类型
 * 第三个参数是reduce程序处理完后的输出值key的类型,单词,为Text类型
 * 第四个参数是输出的value的类型,每个单词所对应的总数,为IntWritable类型
 * @author Administrator
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    /**
     * map输出的内容相当于:
     *          <i,1><i,1><i,1><i,1><i,1><i,1>...
     *          <am,1><am,1><am,1><am,1><am,1><am,1>...
     *          <you,1><you,1><you,1><you,1><you,1><you,1>...   
     */
    
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        
//      Iterator<IntWritable> iterator = values.iterator();
//      while(iterator.hasNext()){
//          count += iterator.next().get();
//      }
        
        for(IntWritable value: values){
            count += value.get();
        }
        
        context.write(key, new IntWritable(count));
    }
}

编写驱动类

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
 * 相当于yarn集群的客户端,封装mapreduce的相关运行参数,指定jar包,提交给yarn
 * @author Administrator
 *
 */
public class WordCountDriver {
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        Configuration conf = new Configuration();
//      将默认配置文件传给job
        Job job = Job.getInstance(conf);
        
//      告诉yarn  jar包在哪
        job.setJarByClass(WordCountDriver.class);
        
        //指定job要使用的map和reduce
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
//      指定map的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
//      指定最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
//      job的输入数据所在的目录
//      第一个参数:给哪个job设置
//      第二个参数:输入数据的目录,多个目录用逗号分隔
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
//      job的数据输出在哪个目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //将jar包和配置文件提交给yarn
//      submit方法提交作业就退出该程序
//      job.submit();
        
//      waitForCompletion方法提交作业并等待作业执行
//      true表示将作业信息打印出来,该方法会返回一个boolean值,表示是否成功运行
        boolean result = job.waitForCompletion(true);
//      mr运行成功返回true,输出0表示运行成功,1表示失败
        System.exit(result?0:1);
    }
    
}

运行MapReduce程序

1、打jar包(鼠标右键工程-->Export)

2、上传到hadoop集群上(集群中的任何一台都行),运行

#wordcounrt.jar是刚刚从eclipse打包上传到linux的jar包
#com.cnblogs._52mm.WordCountDriver是驱动类的全名
#hdfs的/wordcount/input目录下是需要统计单词的文本
#程序输出结果保存在hdfs的/wordcount/output目录下(该目录必须不存在,由hadoop程序自己创建)
hadoop jar wordcount.jar com.cnblogs._52mm.WordCountDriver /wordcount/input /wordcount/output

3、也可用yarn的web界面查看作业信息

ps:在这里可以看到作业的详细信息,失败还是成功一目了然

4、查看输出结果

hadoop fs -cat /wordcount/output/part-r-00000

也可查看hdfs的web界面

报错解决

Error: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

该错误是由于编写代码时impor了错误的包导致的(我错在Text包导错了),仔细检查一下,改正后重新打jar包上传。

 Output directory hdfs://mini1:9000/wordcount/output already exists

显然,该错误是由于reduce的输出目录必须是不存在才行,不能自己在hdfs上手动创建输出目录。

总结

  • map函数和reduce函数的输入输出类型要用hadoop提供的基本类型(可优化网络序列化传输)
  • LongWritable类型相当于java的Long类型,IntWritable类型相当于java的Integer类型,Text类型相当于java的String类型
  • reduce函数的输入类型等于map函数的输出类型
  • Job对象控制整个作业的执行。
  • job对象的setJarByClass()方法传递一个类,hadoop利用这个类来找到相应的jar文件
  • 运行作业前,输出目录不应该存在,否则hadoop会报错(为了防止覆盖了之前该目录下已有的数据)
  • setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,这两个函数的输出类型一般相同,如果不同,则通过setMapOutputKeyClass()和setMapOutputValueClass()来设置map函数的输出类型。
  • 输入数据的类型默认是TextInputFormat(文本),可通过InputFormat类来改变。
  • Job中的waitForCompletion()方法提交作业并等待执行完成,传入true作为参数则会将作业的详细信息打印出来。作业执行成功返回true,执行失败返回false。

作者:py小杰

博客地址:http://www.cnblogs.com/52mm/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显开头给出原文链接。

编辑:辛开密王

发布:2019-05-25 00:26:27

当前文章:http://adsl66.com/news/20190524493.html

地径6公分北美海棠哪里有卖的? 福建有种植榉树的基地吗? 陕西榆叶梅价格便宜吗? 紫穗槐苗市场价格 全国睡莲批发价最便宜的地方,就在华东花卉苗木集中产区 2公分竹子多少钱一棵? 显赫一时的榉树,它的过去、现在、未来是什么样的一个状态呢? 买优质低价的木瓜树去哪里?

35059 84926 10307 98739 84243 51209 66862 66797 35048 12804 22246 74790 30063 75939 51704 75534 26710 95807 77165 59457

我要说两句: (0人参与)

发布