Hadoop 2.0:主流开源云架构(四)

Hadoop 2.0:主流开源云架构(四)

码农世界 2024-06-18 后端 110 次浏览 0个评论

目录

    • 五、Hadoop 2.0访问接口
      • (一)访问接口综述
      • (二)浏览器接口
      • (三)命令行接口
      • 六、Hadoop 2.0编程接口
        • (一)HDFS编程
        • (二)Yarn编程

          五、Hadoop 2.0访问接口

          (一)访问接口综述

            Hadoop 2.0分为相互独立的几个模块,访问各个模块的方式也是相互独立的,但每个模块访问方式可分为:浏览器接口、Shell接口和编程接口。

          (二)浏览器接口

          Web地址配置文件配置参数
          HDFShttp://NameNodeHostName:50070hdfs-site.xml{dfs.namenode.http-address}
          Yarnhttp://ResourceManagerHostName:8088yarn-site.xml{yarn.resourcemanager.webapp.address}
          MapReducehttp://JobHistoryHostName:19888mapred-site.xml{mapreduce.jobhistory.webapp.address}

            在Hadoop 2.0里,MapReduce是Yarn不可缺少的模块,这里的JobHistory是一个任务独立模块,用来查看历史任务,和MapReduce并行处理算法无关。

          (三)命令行接口

          1. HDFS

            以tar包方式部署时,其执行方式是HADOOP_HOME/bin/hdfs,当以完全模式部署时,使用HDFS用户执行hdfs即可。

          Hadoop 2.0:主流开源云架构(四)

          2. Yarn

            以tar包方式部署时,其执行方式是HADOOP_HOME/bin/yarn,当以完全模式部署时,使用Yarn用户执行yarn即可。

          Hadoop 2.0:主流开源云架构(四)

            每一条命令都包含若干条子命令,Yarn的Shell命令也主要分为用户命令和管理员命令。

          3. Hadoop

            以tar包方式部署时,其执行方式是HADOOP_HOME/bin/Hadoop,当以完全模式部署时,在终端直接执行hadoop。

          Hadoop 2.0:主流开源云架构(四)

            这个脚本既包含HDFS里最常用命令fs(即HDFS里的dfs),又包含Yarn里最常用命令jar,可以说是HDFS和Yarn的结合体。此外,distcp用mapreduce来实现两个Hadoop集群之间大规模数据复制。

          4. 其他常用命令

            sbin/目录下的脚本主要分为两种类型:启停服务脚本和管理服务脚本。其中,脚本hadoop-daemon.sh可单独用于启动本机服务,方便本机调试,start/stop类脚本适用于管理整个集群,读者只要在命令行下直接使用这些脚本,它会自动提示使用方法。

          Hadoop 2.0:主流开源云架构(四)

          六、Hadoop 2.0编程接口

          (一)HDFS编程

          Hadoop 2.0:主流开源云架构(四)

          1. HDFS编程实例

          【例1】 请编写一简单程序,要求实现在HDFS里新建文件myfile,并且写入内容“china cstor cstor cstor china”。

          代码如下:

          public class Write {
              public static void main(String[] args) throws IOException {
                  Configuration conf = new Configuration();       //实例化配置文件
                  Path inFile = new Path("/user/joe/myfile");      //命名一个文件
                  FileSystem hdfs = FileSystem.get(conf);         //获取文件系统
                  FSDataOutputStream OutputStream = hdfs.create(inFile);   //获取文件流
                  outputStream.writeUTF("china cstor cstor cstor china");   //使用流向文件里写内容
                  outputStream.flush();
                  outputStream.close();
              }
          }
          

          假定程序打包后称为hdfsOperate.jar,并假定以joe用户执行程序,主类为Write,主类前为包名,则命令执行如下:

          [joe@cMaster~]$ hadoop jar hdfsOperate.jar cn.cstor.data.hadoop.hdfs.write.Write
          

          成功执行上述命令后,可使用如下两种方式确认文件已经写入HDFS。

          第一种方式:使用Shell接口,以joe用户执行如下命令:

          [joe@cMaster~]$ hdfs dfs -cat ls            #类似于Linux的ls,列举HDFS文件
          [joe@cMaster~]$ hdfs dfs -cat myfile        #类似于Linux的cat,查看文件
          

          第二种方式:使用Web接口,浏览器地址栏打开http://namenodeHostName:50070,点击Browse the filesystem,进入文件系统,接着查看文件/user/jioe/myfile即可。

          【例2】 请编写一简单程序,要求输出HDFS里刚写入的文件myfile的内容。

          代码如下:

          public class Read {
              public static void main(String[] args) throws IOException {
                  Configuration conf = new Configuration();
                  Path inFile = new Path("/user/joe/myfile");      //HDFS里欲读取文件的绝对路径
                  FileSystem hdfs = FileSystem.get(conf);
                  FSDataIutputStream inputStream = hdfs.open(inFile);   //获取输出流
                  System.out.println("myfile:"+inputStream.readUTF());   //使用输出流读取文件
                  inputStream.close();
              }
          }
          

          下面是命令执行方式及其结果:

          [joe@cMaster~]# hadoop jar hdfsOperate.jar cn.cstor.data.hadoop.hdfs.read.Read
          myfile: china cstor cstor china
          

          【例3】 请编写一简单代码,要求输出HDFS里文件myfile相关属性(如文件大小、拥有者、集群副本数,最近修改时间等)。

          代码如下:

          public class Status {
              public static void main(String[] args)throws Exception {
                  Configuration conf = new Configuration();
                  Path file = new Path("/user/joe/myfile");
                  System.out.println("FileName:"+file.getName());
                  FileSystem hdfs = file.getFileSystem(conf);
                  FileStatus[] fileStatus = hdfs.listStatus(file);
                  for (FileStatus status: fileStatus) {
                      System.out.println("FileOwner:"+status.getOwner());
                      System.out.println("FileReplication:"+status.getReplication();
                      System.out.println("FileModificationTime:"+new Date(status.getModificationTime());
                      System.out.println("FileBlockSize:"+status.getBlockSize());
                  }
              }
          }
          

          程序执行方式及其结果如下:

          [joe@cMaster~] Hadoop jar hdfsOperate.jar cn.cstor.data.Hadoop.hdfs.file.Status
          FileName: myfile
          FileOwner: joe
          FileReplication: 3
          FileModification Time: Tue Nov 12 05:24:02 PST 2013 
          

          上面我们通过三个例题介绍了HDFS文件最常用操作,但这仅仅是三个小演示程序,在真正处理HDFS文件流时,可以使用缓冲流将底层文件流一层层包装,可大大提高读取效率。

          2. HDFS编程基础

          (1)Hadoop统一配置文件类Configuration

            Hadoop的每一个实体(Common,HDFS,Yarn)都有与其相对应的配置文件,Configuration类是联系几个配置文件的统一接口。

            Hadoop各模块间传递的一切值都必须通过Configuration类实现,其他方式均无法获取程序设置的参数,若想实现参数最好使用Configuration类的get和set方法。

          (2)取得HDFS文件系统接口

            在Hadoop源代码中,HDFS相关代码大都存放在org.apache.Hadoop.hdfs包里。但是,我们编写代码操作HDFS里的文件时,不可以调用这些代码,而是通过org.apache.hadoop.fs包里的FileSystem类实现。

          Hadoop 2.0:主流开源云架构(四)

            FileSystem类是Hadoop访问文件系统的抽象类,它不仅可以获取HDFS文件系统服务,也可以获取其他文件系统(比如本地文件系统)服务,为程序员访问各类文件系统提供统一接口。

          (3)HDFS常用流和文件状态类

            Common还提供了一些处理HDFS文件的常用流:fs包下的FSDataInputStream,io包下的缓冲流DataInputBuffer,util包下的LineReader等等。用户可以和Java流相互配合使用。

          (二)Yarn编程

            Yarn是一个资源管理框架,由ResourceManager(RM)和NodeManager(NM)。但RM和NM不参与计算逻辑。称由ApplicationMaster和Client组成的处理逻辑相同的一类任务为逻辑实体,可以定义Map型、MapReduce型、MapReduceMap型和CPU密集型任务。

          1. 概念和流程

            在资源管理框架中,RM负责资源分配,NodeManager负责管理本地资源。在计算框架中,Client负责提交任务,RM启动任务对应的ApplicationMaster。

          (1)编程时使用的协议

          ① ApplicationClientProtocol:Client<–>ResourceManager。

          Client通知RM启动任务(如要求RM启动ApplicationMaster),获取任务状态或终止任务时使用的协议。

          ② ApplicationMasterProtocol:ApplicationMaster<–>ResourceManager。

          ApplicationMaster向RM注册/注销申请资源时用到的协议。

          ③ ContainerManager:ApplicationMaster<–>NodeManager。

          ApplicationMaster启动/停止获取NM上的Container状态信息时所用的协议。

          (2)一个Yarn任务的执行流程简析

            Client提交任务时,通过调用ApplicationClientProtocol#getNewApplication从RM获取一个ApplicationId,然后再通过ApplicationClientProtocol#submitApplication提交任务。

            ApplicationMaster则负责此次任务的处理全过程,RM会选定一个Container来启动ApplicationMaster,ApplicationMaster会通过心跳包与RM保持通信,ApplicationMaster须向RM注销自己。

          (3)编程步骤小结

          ① Client端

          步骤1:获取ApplicationId

          步骤2:提交任务

          ② ApplicationMaster端

          步骤1:注册

          步骤2:申请资源

          步骤3:启动Container

          步骤4:重复步骤2、3,直至任务完成

          步骤5:注销

          Yarn提供了三个Application-Master实现:DistributedShell、unmanaged-am-launcher、MapReduce。

          2. 实例分析

            DistributedShell是Yarn自带的一个应用程序编程实例,相当于Yarn编程中的“Hello World”,它的功能是并行执行用户提交的Shell命令或Shell脚本。

            从Hadoop官方网站下载Hadoop-2.2.0-src.tar.gz(Hadoop源码包)并解压后,依次进入Hadoop-yarn-project\Hadoop-yarn\Hadoop-yarn-applications,下面就是Yarn自带的两个Yarn编程实例。

            Client主要向RM提交任务,ApplicationMaster向RM申请资源,并与NM协商启动Container完成任务。

          (1)Client类主要代码:

          YarnClient yarnClient = YarnClient.createYarnClient();    //新建Yarn客户端
          yarnClient.start();       //启动Yarn客户端
          YarnClientApplication app = yarnClient.createApplication();    //获取提交程序句柄
          ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();   //获取上下文句柄
          ApplicationId appId = appContext.getApplicationId();    //获取RM分配的appId 
          appContext.setResource(capability);     //设置任务其他信息举例
          appContext.setQueue(amQueue);
          appContext.setPriority(priority);
          //实例化ApplicationMaster对应的Container
          ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
          amContainer.setCommands(commands);       //参数commands为用户预执行的Shell命令
          appContext.setAMContainerSpec(amContainer);    //指定ApplicationMaster的Container 
          yarnClient.submitApplication(appContext);      //提交作业
          

            从代码中能看到,关于RPC的代码已经被上一层代码封装了,Client端编程简单地说就是获取YarmClientApplication,接着设置ApplicationSubmissionContext,最后提交任务。

          (2)ApplicationMaster类最主要代码:

          //新建RM代理
          AMRMClientAsync amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
          amRMClient.init(conf);
          amRMClient.start();
          //向RM注册
          amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
          containerListener = createNMCallbackHandler();
          //新建NM代理
          NMClientAsync nmClientAsync = new NMClientAsyncImpl(containerListener);
          nmClientAsync.init(conf);
          nmClientAsync.start();
          //向RM申请资源
          for(int i=0; i
              ContainerRequest containerAsk = setupContainerAskForRM();
              amRMClient.addContainerRequest(containerAsk);
          }
          numRequestedContainers.set(numTotalContainers);
          //设置Container上下文
          ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
          ctx.setCommands(commands);
          //要求NM启动Container 
          nmClientAsync.startContainerAsync(container, ctx);
          //containerListener汇报此NM完成任务后,关闭此NM
          nmClientAsync.stop();
          //向RM注销
          amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
          amRMClient.stop();
          

            源码中的ApplicationMaster有1000行,上述代码给出了源码里最重要的几个步骤。

          3. 代码执行方式

          默认情况下Yarn包里已经有分布式Shell的代码了,可以使用任何用户执行如下命令:

          $Hadoop jar /usr/lib/Hadoop-yarn/Hadoop-yarn-applications-distributedshell.jar
          > org.apache.Hadoop.yarn.applications.distributedshell.Client 
          > -jar /usr/lib/Hadoop-yarn/Hadoop-yarn-applications-distributedshell.jar
          > -shell_command  '/bin/date' -num_containers 100
          

          4. 实例分析-MapReduce

          Hadoop 2.0:主流开源云架构(四)

          Yarn框架处理MR程序时默认类
          InputFormatTextInputFormat
          RecordReaderLineRecordReader
          InputSplitFileSplit
          MapIdentityMapper
          Combine不使用
          PartitionerHashPartitioner
          GroupCompatator不使用
          ReduceIdentityReducer
          OutputFormatFileOutputFormat
          RecordWriterLineRecordWriter
          OutputCommitterFileOutputCommitter

          MapReduce编程示例——WordCount

          下面是MapReduce自带的最简单代码, MapReduce算法实现统计文章中单词出现次数,源代码如下:

          public class WordCount
              //定义map类,一般继承自Mapper类,里面实现读取单词,写出<单词,1>
              public static class TokenizerMapper extends Mapperc {
              private final static Int Writale one = new IntWritable(1);
              private Text word = new Text();
              //map方法,划分一行文本,读一单词写出一个<单词,1>
              public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  while(itr.hasMoreTokens()) {
                  word.set(itr.nextToken());
                  context.write(word, one);        //写出<单词,1>
              }}} 
          //定义reduce类,对相同的单词,把它们中的VList值全部相加
          public static class IntSumReducer extends Reducer {
              private IntWritable result = new IntWritable();
              public void reduce(Text key, Iterable values, Context context throws IOException, InterruptedException {
                  int sum = 0;
                  for(IntWritable val: values) {
                      sum += val.get();        //相当于,将两个1相加
                  }
                  result.set(sum);
                  context.write(key,result);      //写出这个单词,和这个单词出现次数<单词,单词出现次数>
              }}
              public static void main(String[] args) throws Exception {    //主方法,函数入口
                  Configuration conf = new Configuration();        //实例化配置文件类
                  Job job = new Job(conf, "WordCount");        //实例化Job类
                  job.setInputFormatClass(TextInputFormat.class);      //指定使用默认输入格式类
                  TextInputFormat.setInputPaths(job, inputPaths);       //设置待处理文件的位置
                  job.setJarByClass(WordCount.class);        //设置主类名
                  job.setMapperClass(TokenizerMapper.class);    //指定使用上述自定义Map类
                  job.setMapOutputKeyClass(Text.class);     //指定Map类输出的,K类型
                  job.setMapOutputValueClass(IntWritable.class);      //指定Map类输出的-K,V>,V类型
                  job.setPartitionerClass(HashPartitioner.class);      //指定使用默认的HashPartitioner类
                  job.setReducerClass(IntSumReducer.class);     //指定使用上述自定义Reduce类
                  job.setNumReduceTasks(Integer.parseInt(numOfReducer);    //指定Reduce个数
                  job.setOutputKeyClass(Text.class);        //指定Reduce类输出的K类型
                  job.setOutputValueClass(Text.class);        //指定Reduce类输出的,V类型
                  job.setOutputFormatClass(TextOutputFormat.class);      //指定使用默认输出格式类
                  TextOutputFormat.setOutputPath(job, outputDir);        //设置输出结果文件位置
                  System.exit(job.waitForCompletion(true)?0:1);        //提交任务并监控任务状态
              }
          }
          

转载请注明来自码农世界,本文标题:《Hadoop 2.0:主流开源云架构(四)》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,110人围观)参与讨论

还没有评论,来说两句吧...

Top