Hadoop之倒排索引

本体系小说是读书《Hadoop权威指南 第三版》的笔记和小结。

本博客拔取创作共用版权协议, 要求签署、非商业用途和维持一致.
转载本博客著作必须也依照署名-非商业用途-保持一致的写作共用协议.

本文是第一篇,记录了何等在 Ubuntu14.04搭建Hadoop 2.6
伪分布环境,以及运行自带的wordcount实例的详细步骤。

私家博客地址:
http://andrewliu.tk

本文假诺读者对Hadoop并无太多了然,对Ubuntu的宽泛操作有早晚明白,会对一些操作予以了较为详细的解释或予以有助于更深刻精通的链接。

1. 体系参数配置


因而Hadoop的API对各个零部件的参数进行配备

  • org.apache.hadoop.conf 系统参数的部署文件处理API
  • org.apache.hadoop.fs 抽象文件系统API
  • org.apache.hadoop.dfs HDFS模块的实现
  • org.apache.hadoop.mapred MapReduce模块实现
  • org.apache.hadoop.ipc 封装了网络异步I/O的根基模块
  • org.apache.hadoop.io 定义了通用的I/O模块

要害分为以下几步:

2. 配置开发条件


Hadoop分为二种运行格局: 单机形式, 伪分布形式, 完全分步格局

  • 单机情势安装配置方便, 便于调试, 大数据下运行慢
  • 伪分布式形式在地头文件系统运行, 运行HDFS文件系统
  • 全然分布情势在多台机械的HDFS上运行
  1. 安装Java 8 环境

3. MapReduce框架


  • Map接口需要派生自Mapper<k1, v1, k2, v2>
  • Reduce接口需要派生自Reducer<k2, v2, k3, v3>

输入输出的数据类型要与集成时设置的数据类型一致,
Map的输出类型要和Reduce的输入类型对应.

  • Hadoop集群上的用户作业采纳先入先出调度策略
  • Map输出会经过shuffle过程交给Reduce处理
    shuffle对Map结果划分(partition), 排序(sort), 分割(spilt),
    遵照不同的划分将结果送给对应的Reduce

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class X {
    public static class Map 
    extends Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            // Map逻辑
            }
        }
    }

    public static class Reduce
    extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, 
            Context context) throws IOException, InterruptedException {
            // Reduce逻辑
        }
    }

    public int run(String[] args) throws Exception {
        // 运行配置
        Job job = new Job(getConf());
        job.setJarByClass(Score_Process.class);
        job.steJobName("Score_Process");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new Score_Process(), args);
        System.exit(ret);
    }
}
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        job.setJarByClass(Score_Process.class);
        job.steJobName("Score_Process");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = X.run(new Score_Process(), args);
        System.exit(ret);
    }
}

Mapper和Reducer基类中的其他函数:

  1. setup函数(在task启动之后数据处理前调用一遍, 对task的全局处理)
  2. cleanup函数(task销毁在此之前实施)
  3. run函数

    protected void setup(Context context) 
    throws IOException, InterruptedException {
    }
    protected void cleanup(Context context)
    throws IOException, InterruptedException {
    }
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while(context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }
  • 新建Hadoop专用账户
  • 配备本地ssh免登陆
  • 设置并安排Hadoop伪分布情势
  • 测试运行wordcount实例

4. MapReduce实战之倒排索引


倒排索引是一种索引方法,
被用来囤积在全文检索下某个单词在一个文档或者一组文档中的存储地方的映射.
它是文档检索系统中最常用的多寡结构. 通过倒排索引,
可以遵照单词连忙得到包含这些单词的文档列表。倒排索引紧要由七个部分构成:单词词典倒排文件

问题: 使用Hadoop集群测试编写的带词频属性的文档倒排算法,
在总结词语的倒排索引时,
除了要出口带词频属性的倒排索引,请再总结出各个词语的平均出现次数(平均出现次数
= 词语在全方位文档中冒出的频数总和 / 包含该词语的文档数)并输出.

出口格式:
用语1 平均出现次数,文档名1:词频;文档名2:词频;文档名3:词频;…
用语2 平均出现次数,文档名1:词频;…
.

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class InvertedIndex {
    public static class Map 
    extends Mapper<Object, Text, Text, Text> {
        private Text keyWord = new Text();
        private Text valueDocCount = new Text();

        public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
            //获取文档
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreTokens()) {
                keyWord.set(itr.nextToken() + ":" + fileName);  // key为key#doc
                valueDocCount.set("1"); // value为词频
                context.write(keyWord, valueDocCount);
            }
        }
    }

    public static class InvertedIndexCombiner
        extends Reducer<Text, Text, Text, Text> {
        private Text wordCount = new Text();
        private Text wordDoc = new Text();
        //将key-value转换为word-doc:词频
        public void reduce(Text key, Iterable<Text> values, 
            Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");  // 找到:的位置
            int splitFileName = key.toString().indexOf(".txt.segmented");
            wordDoc.set(key.toString().substring(0, splitIndex));  //key变为单词
            wordCount.set(key.toString().substring(splitIndex + 1, splitFileName) + ":" + sum);  //value变为doc:词频
            context.write(wordDoc, wordCount);
        }
    }

    public static class Reduce
        extends Reducer<Text, Text, Text, Text> {
        private Text temp = new Text();

        public void reduce(Text key, Iterable<Text> values, 
            Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            Iterator<Text> it = values.iterator();
            StringBuilder all = new StringBuilder();
            //形成最终value
            for(;it.hasNext();) { 
                count++;
                temp.set(it.next());
                all.append(temp.toString());  //将一个文档:1添加到总的string value串中
                int splitIndex = temp.toString().indexOf(":");  // 找到:的位置
                temp.set(temp.toString().substring(splitIndex + 1));  //取出词频字符串
                sum += Integer.parseInt(temp.toString());
                if (it.hasNext()) {
                    all.append(";");
                }
            }
            float averageCount = (float)sum / (float)count;
            FloatWritable average = new FloatWritable(averageCount);
            all.insert(0, average.toString() + ",");
            context.write(key, new Text(all.toString()));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf, "Inverted Index");  //配置作业名
        //配置作业的各个类
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}

在单机上运行流程可以查看另一篇博文MapReduce之WordCount

1. 安装Java 8

Java 8 正式版 于 2014 年 3 月发表,该版本是一个有根本改观的本子,对 JAVA
带来了诸多新特征。详细信息能够参见 Java 8
新特性概述

4.1. 集群运行流程

#第一步对文件打包的过程就不详细解释了, 可以参考单机运行流程
#本机文件复制到集群
$ scp -r InvertedIndexer.jar 集群用户名@集群IP地址:集群目的文件夹  #范例: scp -r InvertedIndexer.jar 2015st27@114.212.190.91:WorkSpace

#ssh远程登录集群
$ ssh 集群用户名@集群IP #范例:ssh 2015st27@114.212.190.91

#如果密码正确会登录集群服务器, 集群上运行Jar包
$ hadoop jar WorkSpace/InvertedIndex.jar InvertedIndex /user/input output

1.1设置基础开发套件

安装Ubuntu下的底子开发套件,其中囊括接下去 1.2 中要用到的
add-apt-repository 命令:

sudo apt-get install software-properties-common
sudo apt-get install python-software-properties

5. 参阅链接


1.2 通过ppa安装Java 8

有关ppa(Personal Package Archives)的演讲能够参考如何是Ubuntu
PPA以及为啥要用它

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
# 便捷: sudo apt-get install software-properties-common && sudo apt-get install python-software-properties && sudo apt-get update && sudo apt-get install oracle-java8-installer

1.3 验证安装的Java版本

在遵照上述的步调安装了Java之后,可以经过以下命令检测是否安装成功,及安装的本子:

java -version

1.4 通过PPA配置Java的环境

1.2中充分的Webupd8 ppa
仓库提供了一个包来设置Java的环境变量,通过以下命令安装:

sudo apt-get install oracle-java8-set-default

完了上述步骤后即现已打响的布置了安转Hadoop所需的Java开发环境。

2. 新建Hadoop专用账户

为了营造一个更为独立的Hadoop运行条件,我们得以为系统新建一个Hadoop账户,将来推行Hadoop相关操作时皆以该用户的地点登陆。

2.1 新建hadoop用户

sudo useradd -m  hadoop     #-m参数表示同时创建用户的家目录
sudo usermod -s  /bin/bash hadoop    # 指定默认登陆shell
sudo passwd hadoop    #修改用户默认的密码
# 便捷: export user=heamon7 && sudo useradd -m $user && sudo usermod -s /bin/bash $user && sudo passwd $user

2.2 提高hadoop用户的权力

sudo命令能够让你切换身份来执行命令,其履行进程是:

  1. 当用户执行sudo时,系统于/etc/sudoers文件中找寻该用户是否富有sudo的权力;
  • 当用户所有可进行sudo的权能后,便让用户输入用户自己的密码来认同;
  • 若密码输入成功,便开始实行sudo后续接得命令(但root执行sudo时不需要输入密码,
    若欲切换的身价与实施者的身价一样,则也不需要输入密码);

因此我们需要编辑/etc/sudoers文件,将我们的hadoop用户增长进去:

sudo chmod u+w /etc/sudoers    #为当前用户添加对/etc/sudoers文件的写权限,该文件默认root只读
sudo vim /etc/sudoers

在文书中找到这两行:

# User privilege specification
root ALL=(ALL:ALL) ALL

在其下部添加一行

hadoop ALL=(ALL) NOPASSWD:ALL

此间解释一下这一行三个参数的意趣,第一个是用户的账号,表示系统中得哪个账户可以应用sudo命令,默认是root,第二个是登录者的起点主机名,默认值root可以来自其他一台网络主机,第多少个参数是可切换的地位,默认root可以切换成任何用户,若添加冒号,则意味着的是用户组;第六个参数是可进行的授命,需接纳绝对路径,默认root可以实施其它命令。其中ALL是例外的基本点字,表示其它主机、用户,或指令。
由此添加下边一行之后,大家的hadoop用户今后也具有和root一样的权杖。
最后
末段咱们撤消文件的写权限:

sudo chmod u-w /etc/sudoers
#合并操作 sudo chmod u+w /etc/sudoers && sudo vim /etc/sudoers && sudo chmod u-w /etc/sudoers

3. 布置本地ssh免登陆

因为Hadoop是由此ssh管理各样零部件,并实现通信的,为了避免每一回需要输入密码,大家得以安排本地ssh免登陆。关于ssh的登陆形式及原理可以参见这里
SSH原理与运用

su  hadoop
ssh-keygen -t rsa    #用RSA算法生成公钥和密钥
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys    #授权本机ssh免登陆

可以经过以下命令测试是否配备成功:

ssh localhost    

若配置成功,则应该不需要输入密码。接着退出ssh登陆:

exit

地点是只要已经设置了SSH,倘使没有安装,可以经过以下命令安装并启动:

sudo apt-get install openssh-server
sudo /etc/init.d/ssh start

4. 安装并安排Hadoop伪分布形式

4.1 下载Hadoop 2.6

那里间接从官网下载Hadoop并内置hadoop用户主目录:

cd ~
wget http://apache.claz.org/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz

解压并简化路径:

tar xzf hadoop-2.6.0.tar.gz
mv hadoop-2.6.0 hadoop

4.2. 布局环境变量

率先设置Hadoop运行所急需的环境变量,我们编辑 ~/.bashrc
文件,添加我们的环境变量

 #注意此处的路径和你的hadoop文件最后解压存放的位置是一致的
export HADOOP_HOME=/home/hadoop/hadoop   
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

下一场大家再一次导入~/.bashrc文件中的内容到shell中:

$ source ~/.bashrc

关于地点那条命令的施行原理,请查看这里这里

跟着大家编辑文件$HADOOP_HOME/etc/hadoop/hadoop-env.sh ,为Hadoop设置
Java环境变量:

vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh

修改其中JAVA_HOME的值为:

export JAVA_HOME=/usr/lib/jvm/java-8-oracle    #如果按照前面安装Java 8 的方法,则Java应该在此路径,也可以通过 echo $JAVA_HOME 命令来查看

4.3 修改Hadoop的配置文件

Hadoop在目录 $HADOOP_HOME/etc/hadoop
下有很多部署文件,我们在此布局伪分布式,需要修改其中六个布局文件。首先进入配置文件目录:

$ cd $HADOOP_HOME/etc/hadoop

下一场编辑 core-site.xml 文件:

vim core-site.xml

在 configuration
标签里添加属性,这里需要充足的是HDFS的NameNode的地址,修改成以下值:

<configuration>
<property>
  <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
</property>
</configuration>

随着修改 hdfs-site.xml 文件:

vim hdfs-site.xml

修改为以下值:

<configuration>
<property>
 <name>dfs.replication</name>
 <value>1</value>
</property>
<property>
 <name>dfs.name.dir</name>
 <value>file:///home/hadoop/hadoopdata/hdfs/namenode</value>
</property>
<property>
 <name>dfs.data.dir</name>
 <value>file:///home/hadoop/hadoopdata/hdfs/datanode</value>
</property>
</configuration>

上边的 dfs.replication
属性是指定副本数量,这里数据只保留一份,默认是三份;而 dfs.name.dir
属性指定的是NameNode在地头的保留地址,dfs.data.dir
属性指定的是DataNode在本地的保存地址。

下一场修改 mapred-site.xml
文件,可是默认Hadoop只提供了该配置的模版文件,我们需要复制模板编辑:

cp mapred-site.xml.template mapred-site.xml
vim mapred-site.xml

将该文件修改为以下值:

<configuration>
 <property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
 </property>
</configuration>

从字面意义也可以看出这里指定了MapReduce运行在Hadoop的Yarn框架上。

终极修改 yarn-site.xml 文件:

vim yarn-site.xml

修改为以下值:

<configuration>
 <property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
 </property>
</configuration>

那里 yarn.nodemanager.aux-services
属性指定了Yarn的NodeManager获取数据的方法是shuffle,关于shuffle的牵线能够看那里MapReduce:详解shuffle过程

4.4 启动Hadoop

在应用HDFS文件系统前,大家需要先将其格式化:

hdfs namenode -format

这会儿应有有大量日志输出……
接下来启动HDFS:

start-dfs.sh

此时理应有雅量日记输出……
最后启动Yarn:

start-yarn.sh

这会儿应当有大气日记输出……

这时候因此jps命令,应该可以寓目除了jps本身外,还有5个Java进程,如下:

hadoop@ubuntu:~$ jps
14049 Jps
13811 ResourceManager
13459 DataNode
13642 SecondaryNameNode
13931 NodeManager
13342 NameNode

5. 测试运行wordcount实例

Hadoop自带有很多MapReduce的例子,其中一个比较和编程语言领域的 Hello
World
齐名的是wordcount,能够统计出文件中每个单词出现的次数,我们可以用wordcount示例测试Hadoop环境。

5.1 上传输入文件到HDFS

先是大家在地面文件系统新建几个公文文件:

cd ~
mkdir input
echo "hello hadoop" > input/f1.txt
echo "hello world" > input/f2.txt 
echo "hello wordcount" > input/f3.txt

下一场经过以下命令上传这四个文本到HDFS:

hdfs dfs -mkdir -p /home/hadoop/input
hdfs dfs -put input/* /home/hadoop/input

可以通过以下命令查看上传的文件:

hdfs dfs -ls /home/hadoop/input/

5.2 执行wordcount示例

Hadoop 通过把MapReduce代码捆绑到 jar
文件上,然后利用以下命令运行Streaming作业:

hadoop jar <jar> [mainClass] args..

在 目录 $HADOOP_HOME/share/hadoop/mapreduce/
下有一些周转MapReduce任务所需的已编译好的jar包,其中
hadoop-mapreduce-examples-2.6.0.jar
包含了一些演示程序,wordcount便在里边,我们可以透过下边的吩咐来运作wordcount:

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /home/hadoop/input  /home/hadoop/output

留意此处的output文件夹不应当事先存在,应该由hadoop自己创办,否则会实施破产。

5.3 查看运行结果

稍后片刻,待程序运行截止,我们可以通过以下命令来查看暴发的结果文件:

hdfs dfs -ls /home/hadoop/output

此间的出口是:

hadoop@ubuntu:~$ hdfs dfs -ls /home/hadoop/output
15/06/18 02:47:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2015-06-18 02:13 /home/hadoop/output/_SUCCESS
-rw-r--r--   1 hadoop supergroup         32 2015-06-18 02:13 /home/hadoop/output/part-r-00000

能够经过以下命令查看最后的出口结果:

hdfs dfs -cat /home/hadoop/output/part-r-00000

这边看看的结果是:

hadoop@ubuntu:~$ hdfs dfs -cat /home/hadoop/output/part-r-00000
15/06/18 02:49:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hadoop  1
hello   3
java    1
world   1
hadoop@ubuntu:~$ 

至此,Hadoop的单机伪分布环境搭建成功。
一度遵照此方法搭建了三遍,写完本文后,又在一个簇新的 Ubuntu 14.04
上一步步搭建了一回,一切正常。


终极,学习Hadoop到明天,虽然谈不上早已入门,但感受是:Hadoop入门过程辛苦的局部不是环境的搭建,而在于出现谬误时对日记的辨析,和问题的一定,然后还有就是出现性能问题时解决方案的追寻。

扩充阅读:

发表评论

电子邮件地址不会被公开。 必填项已用*标注