大數據平臺複習二. MapReduce

萌噠老司機 2022-01-07 18:29:39 阅读数:384

mapreduce

簡述

什麼是MapReduce
Hadoop下的一個負責分布式計算的組件
一個軟件系統,運行於HDFS之上
定義了一種實現分布式計算的框架
負責計算任務在集群中的分配調度、負載均衡、容錯處理、網絡通信等一系列問題
方便編程人員在不熟悉分布式並行編程的情况下,能够編寫程序對分布式環境下的大數據進行處理
借鑒了函數式編程
函數:集合之間的一種映射關系
不同於命令式編程,函數式編程關注集合之間的映射關系
函數可以作為另一個函數的輸入和輸出
MapReduce框架
將分布式環境下的並行大數據處理過程抽象為兩個函數:map和reduce
Map: <key1,value1> -> <key2, value2>
Reduce: <key2,value-list> -> <key3, value3>

在這裏插入圖片描述
在這裏插入圖片描述
MapReduce適合的場景:待處理的大數據集可以分成不同的數據塊,而且每個小數據塊可以獨立的並行的進行處理
比如求最大值最小值,計數等
但是中比特數不能計算,因為求中比特數每個小數據塊並不能進行獨立的計算。

在這裏插入圖片描述
在這裏插入圖片描述
Split是數據塊的描述,並不是具體的數據。

在這裏插入圖片描述
在這裏插入圖片描述
Partition分區操作
將map產生的不同<key, value>鍵值對分配給不同的reduce
默認是根據對map輸出的<key, value>鍵值對中的key值進行Hash運算(hash(key)%num_reduceTasks)的結果來將數據進行分區
實際中也可以通過重載分區接口來改寫分區的方式

Combine合並操作
將一個分區內相同key的value進行合並運算處理,以减少需要在map和reduce之間傳輸的數據量
常見的合並運算包括求和、取最大、取最小
在實際中是可選的,它要求編程人員重載合並接口並進行明確的設置
合並前: 合並後:
第一個分區:<”China”, 1> 第一個分區:<”China”, 1>
第二個分區:<”Hello”, 1> 第二個分區:<”Hello”, 2>
<”Hello”, 1> <”Hadoop”, 1>
<”Hadoop”, 1>

Merge文件合並操作
MapReduce將map產生的<key, value>數據寫入磁盤,每次寫入磁盤都會形成一個文件
在map過程結束之後,merge操作將map過程產生的多個文件合並成一個大文件
將不同文件中相同分區的<key, value>數據劃分到同一個分區並重新進行排序
將一個分區內來自不同文件的相同的key的數據進行合並形成一個新的<key, value-list>形式的數據
在這裏插入圖片描述
在這裏插入圖片描述
Reduce階段
根據用戶對reduce函數的定義,對每個新的<key, value-list>形式的鍵值對數據通過執行reduce函數進行處理
將最終的結果輸出到文件系統

Hadoop資源管理器:yarn
Yet Another Resource Negotiator
為集群提供通用的資源管理和調度服務
不僅能够運行MapReduce任務,而且能運行Spark等任務
主要包括兩個進程:ResourceManager、NodeManager

ResourceManager :
控制整個集群的資源
並負責向具體的計算任務分配基礎計算資源

NodeManager :
管理集群中單個節點的計算資源
跟踪管理節點上的每個容器以及節點的健康狀况

WordCount的MapReduce程序(idea+maven)

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 如下的groupID、artifactID、version標簽都是建立maven項目時所要填寫的信息。這些信息需要針對自己所建立的maven項目進行修改 -->
<groupId>com.liu</groupId>
<artifactId>WrodCount</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 示例所依賴的jar包都通過如下的標簽給出 -->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>
</project>

程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {

word.set(itr.nextToken());
//將結果寫入context
context.write(word, one);
}
}
}
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();
//從這可以看出reduce處理的輸入數據是<key, value-list>類型的鍵值對
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {

int sum = 0;
//reduce函數就是對列錶values中的數值進行相加
for (IntWritable val : values) {

sum += val.get();
}
result.set(sum);
//將結果寫入context
context.write(key, result);
}
}
/* 1.WordCount的main函數。 2.main函數主要創建一個job對象,然後對WordCount任務所需的map函數、reduce函數、輸入文件路徑、輸出文件路徑等信息進行配置。 */
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");//獲取一個任務實例
job.setJarByClass(WordCount.class);//設置工作類
job.setMapperClass(MyMapper.class);//設置Mapper類
job.setReducerClass(MyReducer.class);//設置Reducer類
job.setOutputKeyClass(Text.class);//設置輸出鍵值對中key的類型
job.setOutputValueClass(IntWritable.class);//設置輸出鍵值對中value的類型
FileInputFormat.addInputPath(job, new Path(args[0]));//設置輸入文件的路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));//設置輸出文件的路徑
FileSystem fs=FileSystem.get(conf);//獲取HDFS文件系統
fs.delete(new Path(args[1]),true);//删除輸出路徑下可能已經存在的文件
boolean result=job.waitForCompletion(true);//提交運行任務
System.exit(result? 0: 1);//如result為 false則等待任務結束
}
}

在這裏插入圖片描述

版权声明:本文为[萌噠老司機]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071829394134.html