跳转至

2020-2021-2-编程作业1

2018级,MapReduce

KMeans

难易程度: * 易

待完成:

  • 请在 DSPPCode.mapreduce.kmeans_runner.impl 中创建 KMeansRunnerImpl, 继承 KMeansRunner, 实现抽象方法

题目描述:

  • 要求:作为一种迭代计算算法,K均值聚类的迭代终止条件可以是聚类中心集不再变化或者迭代的次数到达一个指定的值。小王正在编写一个K均值聚类的MapReduce程序,但在编写迭代的终止条件时,他陷入了瓶颈,不知该如何编写。现在,小王求助于你,希望你能帮他完成迭代终止条件的编写。小王希望K均值聚类算法能够在聚类中心集不再变化时终止。此外,要达到聚类中心集不再变化这一条件,K均值聚类算法或许会执行很多次迭代,这将花费很多时间。小王并不想程序花费过多时间来执行,因此,小王还希望为K均值聚类算法设定一个迭代次数上限(大于等于0),哪怕此时聚类中心集还在变化也希望迭代能够终止。

  • 输入格式

  • 数据集

    每行由一个n维的数据点和数据点所属类别标签构成。其中,数据点的各维度值之间用逗号分隔,而数据点与类别标签之间用制表符分隔。以下列数据集为例,第一行0,0 -1表示一个二维数据点(0, 0)的类别标签为-1。其中,-1表示数据点尚未知道其所属类别的标签。

    0,0 -1 1,2 -1 3,1 -1 8,8 -1 9,10 -1 10,7 -1

  • 聚类中心集

    每行由一个n维的聚类中心组成,各维度值之间用逗号分隔。每个聚类中心所对应的类别便签等于其行号。例如,聚类中心1,2所对应的类别标签为1。

    1,2 3,1

  • 输出格式:

输出格式与数据集的输入格式一致

0,0 1 1,2 1 3,1 1 8,8 2 9,10 2 10,7 2

KMeansRunnerImpl

package DSPPCode.mapreduce.kmeans_runner.impl;

import DSPPCode.mapreduce.kmeans_runner.question.KMeansRunner;

public class KMeansRunnerImpl extends KMeansRunner {

  @Override
  protected boolean isLastIteration(String[] args, int currentIteration) {
    // String oldcenter =
    if(currentIteration == true)
      return true;
    return false;
  }
}

Project

难易程度: * 易

待完成

  • 请在DSPPCode.mapreduce.project.impl中创建ProjectMapperImpl, 继承ProjectMapper, 实现抽象方法

题目描述

  • 要求:给定一张关系表,该关系表中存在若干条记录,每条记录包含"序号"、"姓名"和"城市"三个字段。其中,假设"序号"字段的值无重复,且均为整数,最小为0;关系表中记录很多,超过30亿。现要求将该关系表中每一条记录的"序号"字段的值加1后,再投影到"序号"和"姓名"两个字段上。即对于每一条记录,将其"序号"字段的值加1后,再输出其"序号"和"姓名"两个字段。

  • 输入格式:

每行包括三个字段:"序号"、"姓名"和"城市",各字段之间按逗号分隔。例如,下面第一行表示一条"序号"为0、"姓名"为Alice、"城市"为Shanghai的记录。

0,Alice,Shanghai 1,Bob,Nanjing 2,Mark,Beijing 3,Tom,Shanghai

  • 输出格式:

每行包括两个字段:"序号"和"姓名",字段之间按逗号分隔。

1,Alice 2,Bob 3,Mark 4,Tom

ProjectMapperImpl

package DSPPCode.mapreduce.project.impl;

import DSPPCode.mapreduce.project.question.ProjectMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.StringTokenizer;

public class ProjectMapperImpl extends ProjectMapper {

  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      String str = itr.nextToken();
      String[] strs = str.split(",");
      for(int i=0; i<strs.length; i++)
        System.err.print(strs[i] + " + ");
      System.err.println();
      Long n = Long.parseLong(strs[0]);
      n += 1;
      String ans = n.toString() + "," + strs[1];
      Text word = new Text();
      word.set(ans);
      System.err.println(str);
      System.err.println(ans);
      context.write(word, null);
    }
  }
}

Average Score

待完成:

  • 请在 DSPPCode.mapreduce.average_score.impl 中创建 ScoreMapperImpl, 继承 ScoreMapper, 实现抽象方法

  • 请在 DSPPCode.mapreduce.average_score.impl 中创建 ScoreReducerImpl, 继承 ScoreReducer, 实现抽象方法

题目描述:

  • 某班级有若干名同学,共修读三门必修课,现需计算这三门课程的班级平均成绩(向下取整)

  • 输入格式: 学号,数学分析分数,概率论分数,实变函数分数

10160001,98,80,75 10160002,53,94,77 10160003,61,86,91

  • 输出格式: 课程名,平均分

Function of Real Variable 81 Mathematical analysis 70 Probability Theory 86

(课程名通过 Util.getCourseName 获得)

ScoreMapperImpl

package DSPPCode.mapreduce.average_score.impl;

import DSPPCode.mapreduce.average_score.question.ScoreMapper;
import DSPPCode.mapreduce.average_score.question.Util;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Pattern;

public class ScoreMapperImpl extends ScoreMapper {

    private final Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String str = itr.nextToken();
            String[] strs = str.split(",");
            int l = strs.length;
            for (int i = 1; i < l; i++){
                String course = Util.getCourseName(i-1);
                word.set(course);
                try {
                    int n = Integer.parseInt(strs[i]);
                    IntWritable num = new IntWritable(n);
                    context.write(word, num);
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                }

            }
        }
    }
}

ScoreReducerImpl

package DSPPCode.mapreduce.average_score.impl;

import DSPPCode.mapreduce.average_score.question.ScoreReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class ScoreReducerImpl extends ScoreReducer{

    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
        int sum = 0, num = 0;
        for (IntWritable val : values) {
            sum += val.get();
            num += 1;
        }
        int avg = sum/num;
        result.set(avg);
        context.write(key, result);
    }
}

Consumer Statistics

待完成:

  • 请在 DSPPCode.mapreduce.consumer_statistics.impl 中创建 ConsumerMapperImpl, 继承 ConsumerMapper, 实现抽象方法

  • 请在 DSPPCode.mapreduce.consumer_statistics.impl 中创建 ConsumerReducerImpl, 继承 ConsumerReducer, 实现抽象方法

题目描述:

  • 某超市有一份消费数据,该数据包括消费者id、消费时间、消费金额以及消费者的会员属性字段。每条数据表示某一顾客的一笔交易信息。其中,假设消费金额均为整数,最小为1;超市交易量很大,超过30亿。现需使用MapReduce对该数据进行处理以统计有多少会员和非会员在超市内进行了消费。此外,超市还要求分别计算出会员和非会员在超市内的消费金额总数。

  • 输入格式:

3819 2021-03-27 21:30 357 vip 3231 2021-03-27 21:30 77 non-vip

输入包括四个字段:消费者id、消费时间、消费金额以及消费者的会员属性。以上述示例为例,3819 2021-03-27 21:30 357 vip表示id为3819的vip消费者在2021-03-27 21:30时刻消费了357元

  • 输出格式:

non-vip 1 77 vip 1 357

输出包括三个字段:会员属性、人数、消费金额总数,各字段间用制表符分隔。以上述示例为例,non-vip 1 77表示超市内仅有1个非会员用户进行了消费,并且消费了77元。

ConsumerMapperImpl

package DSPPCode.mapreduce.consumer_statistics.impl;

import DSPPCode.mapreduce.consumer_statistics.question.ConsumerMapper;
import DSPPCode.mapreduce.consumer_statistics.question.Consumer;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ConsumerMapperImpl extends ConsumerMapper{
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String[] cons = value.toString().split("\\s+");
        int l = cons.length;
        boolean isv = false;
        if (cons[l-1] == VIP)
            isv = true;
        int m = 0;
        try {
            m = Integer.parseInt(cons[l-2]);
        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
        Consumer consumer = new Consumer(cons[0], m, isv);
        context.write(new Text(cons[l-1]), consumer);
    }
}

ConsumerReducerImpl

package DSPPCode.mapreduce.consumer_statistics.impl;

import DSPPCode.mapreduce.consumer_statistics.question.Consumer;
import DSPPCode.mapreduce.consumer_statistics.question.ConsumerReducer;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashMap;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ConsumerReducerImpl extends ConsumerReducer{
    @Override
    protected void reduce(Text key, Iterable<Consumer> values, Context context) throws IOException, InterruptedException{
        HashMap<String, Boolean> Customer = new HashMap<>();
        BigInteger people = BigInteger.ZERO;
        BigInteger money = BigInteger.ZERO;
        for (Consumer value:values){
            money = money.add(BigInteger.valueOf(value.getMoney()));
            if (!Customer.containsKey(value.getId()))
                people = people.add(BigInteger.ONE);
            Customer.put(value.getId(), true);
        }
        String ans = key.toString() + "\t" + people.toString() + "\t" + money.toString();
        System.out.println(ans);
        context.write(new Text(ans), NullWritable.get());
    }
}

KMeans

待完成:

  • 请在 DSPPCode.mapreduce.kmeans.impl 中创建 KMeansMapperImpl, 继承 KMeansMapper, 实现抽象方法

  • 请在 DSPPCode.mapreduce.kmeans.impl 中创建 KMeansReducerImpl, 继承 KMeansReducer, 实现抽象方法

题目描述:

  • 实现KMeans算法,在相邻两次迭代的聚类中心之间的距离小于某一阈值后终止迭代。输入包含数据集和聚类中心集,输出为聚类结果。

注:

(1)相邻两次迭代的聚类中心之间的距离指欧式距离,所有聚类中心点迭代前后的欧式距离都小于设定阈值时,即可停止迭代。

(2)测试阶段,后台会将所有测试用例的阈值设定为0.05进行测试)

  • 数据集输入格式: 坐标1,坐标2,...,坐标n 所属类别

0,0 -1 1,2 -1 3,1 -1 8,8 -1 9,10 -1 10,7 -1

  • 聚类中心输入格式: 坐标1,坐标2,...,坐标n

1,2 3,1

  • 聚类结果输出格式: 坐标1,坐标2,...,坐标n 所属类别(类别编号从0开始计)

``` 0,0 0 1,2 0 3,1 0 8,8 1 9,10 1 10,7 1

```

KMeansMapperImpl

package DSPPCode.mapreduce.kmeans.impl;

import DSPPCode.mapreduce.kmeans.question.KMeansMapper;
import DSPPCode.mapreduce.kmeans.question.KMeansRunner;
import DSPPCode.mapreduce.kmeans.question.utils.CentersOperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/* 步骤1:确定输入键值对[K1,V1]的数据类型为[LongWritable,Text],确定输出键值对[K2,V2]的数据类型为[Text,Text] */
public class KMeansMapperImpl extends KMeansMapper {

    private List<List<Double>> centers = new ArrayList<>();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        /* 步骤2:编写处理逻辑将[K1,V1]转换为[K2,V2]并输出 */
        String[] dimensions;
        List<Double> point = new ArrayList<>();
        int centerIndex = 1;
        double minDistance = Double.MAX_VALUE;
//        int iteration = context.getConfiguration().getInt(KMeansRunner.ITERATION, 0);

        if (centers.size() == 0) {
            // 获取广播的聚类中心集路径
            String centersPath = context.getCacheFiles()[0].toString();
            // 将聚类中心加载到集合centers
            centers = CentersOperation.getCenters(centersPath, true);
        }

        // 解析数据点
        dimensions = value.toString().split("[,\\t]");
        for (int i = 0; i < dimensions.length - 1; i++) {
            point.add(Double.parseDouble(dimensions[i]));
        }

        // 遍历聚类中心集并计算与数据点的距离
        for (int i = 0; i < centers.size(); i++) {
            double distance = 0;
            List<Double> center = centers.get(i);
            // 计算数据点与当前聚类中心之间的距离
            for (int j = 0; j < center.size(); j++) {
                distance += Math.pow((point.get(j) - center.get(j)), 2);
            }
            distance = Math.sqrt(distance);
            // 如果距离小于当前记录的最小距离则将数据点分配给当前聚类中心(类别号标识)
            if (distance < minDistance) {
                minDistance = distance;
                centerIndex = i;
            }
        }

        // 从输入值中截取数据点
        String pointData = value.toString().split("\t")[0];
        if (KMeansRunner.compareResult == true) {
            context.write(new Text(pointData), new Text(String.valueOf(centerIndex)));
        } else {
            // 输出以类别号为键,数据点为值的键值对
            context.write(new Text(String.valueOf(centerIndex)), new Text(pointData));
        }
    }
}

KMeansReducerImpl

package DSPPCode.mapreduce.kmeans.impl;

import DSPPCode.mapreduce.kmeans.question.KMeansReducer;
import DSPPCode.mapreduce.kmeans.question.KMeansRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* 步骤1:确定输出键值对[K2,V2]的数据类型为[Text,Text] ,确定输出键值对[K3,V3]的数据类型为[Text,NullWritable] */
public class KMeansReducerImpl extends KMeansReducer {

  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
    /* 步骤2:编写处理逻辑将[K2,V2]转换为[K3,V3]并输出 */
    List<List<Double>> points = new ArrayList<>();
    // 解析数据点并保存到集合points
    for (Text text : values) {
      String value = text.toString();
      List<Double> point = new ArrayList<>();
      for (String s : value.split(",")) {
        point.add(Double.parseDouble(s));
      }
      points.add(point);
    }

    StringBuilder newCenter = new StringBuilder();
    // 计算每个维度的平均值从而得到新的聚类中心
    for (int i = 0; i < points.get(0).size(); i++) {
      double sum = 0;
      // 计算第i个维度值的和
      for (List<Double> data : points) {
        sum += data.get(i);
      }
      // 计算平均值得到新的聚类中心的第i个维度值并生成需要输出的数据
      newCenter.append(sum / points.size());
      newCenter.append(",");
    }

    context.write(new Text(newCenter.toString()), NullWritable.get());
  }
}

Max Temperature

待完成:

  • 请在 DSPPCode.mapreduce.max_temperature.impl 中创建 MaxTemperatureMapperImpl, 继承 MaxTemperatureMapper, 实现抽象方法

  • 请在 DSPPCode.mapreduce.max_temperature.impl 中创建 MaxTemperatureReducerImpl, 继承 MaxTemperatureReducer, 实现抽象方法

题目描述:

  • 给定一份年份与温度的统计表,现需计算统计表中每个年份的最大温度。

  • 输入格式: 年份 温度

1990 21 1990 18 1991 35 1992 30 1990 25 1992 21 1992 40

  • 输出格式: 年份 最高温度

1990 25 1991 35 1992 40

MaxTemperatureMapperImpl

package DSPPCode.mapreduce.max_temperature.impl;

import DSPPCode.mapreduce.max_temperature.question.MaxTemperatureMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Pattern;

public class MaxTemperatureMapperImpl extends MaxTemperatureMapper {

    private static final IntWritable ONE = new IntWritable(1);

    private final Text word = new Text();
    private final Text word2 = new Text();

    private final Pattern pattern = Pattern.compile("\\W+");

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String str = itr.nextToken();
            str = pattern.matcher(str).replaceAll("");
            word.set(str);
            str = itr.nextToken();
            str = pattern.matcher(str).replaceAll("");
            int n = 0;
            try {
                n = Integer.parseInt(str);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
            IntWritable tmp = new IntWritable(n);
            context.write(word, tmp);
        }
    }

}

MaxTemperatureReducerImpl

package DSPPCode.mapreduce.max_temperature.impl;

import DSPPCode.mapreduce.max_temperature.question.MaxTemperatureReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class MaxTemperatureReducerImpl extends MaxTemperatureReducer {

    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = 0;
        for (IntWritable val : values) {
            max = max > val.get() ? max : val.get();
        }
        result.set(max);
        context.write(key, result);
    }

}

PageRank

待完成

  • 请在DSPPCode.mapreduce.pagerank.impl中创建PageRankMapperImpl,继承PageRankMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法

题目描述:

  • 基于两个输入文本(网页链接关系、网页排名)实现网页链接排名算法(阻尼系数以0.85计算),网页总数N在测试阶段由后台自动给出。

  • 输入格式:文本中的第一列都为网页名,列与列之间用空格分隔。其中,

网页链接关系文本中的其他列为出站链接,如A B D 表示网页A链向网页B和D(所有网页权重按1.0计算)

A B D B C C A B D B C

网页排名文本第二列为该网页的排名值,如 A 1 表示网页A的排名为1

 ```

A 1 B 1 C 1 D 1 ```

  • 输出格式: 输出文本仅包含网页名以及排名值,排名值四舍五入保留五位小数

A 0.21436 B 0.36332 C 0.40833 D 0.13027

PageRankMapperImpl

package DSPPCode.mapreduce.pagerank.impl;

import DSPPCode.mapreduce.pagerank.question.PageRankMapper;
import DSPPCode.mapreduce.pagerank.question.ReducePageRankWritable;
import DSPPCode.mapreduce.pagerank.question.PageRankRunner;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

public class PageRankMapperImpl extends PageRankMapper {

    private Map<String, String> rankTable = new HashMap<>();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /* 步骤2:编写处理逻辑将[K1,V1]转换为[K2,V2]并输出 */
        if (PageRankRunner.iteration == 0 && rankTable.isEmpty()) {
            URI uri = context.getCacheFiles()[0];
            FileSystem fs = FileSystem.get(uri, new Configuration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(uri))));
            String content;
            while ((content = reader.readLine()) != null) {
                String[] datas = content.split(" ");
                rankTable.put(datas[0], datas[1]);
            }
        }

        if (PageRankRunner.iteration == 0) {
            // 以空格为分隔符切分
            String[] pageInfo = value.toString().split(" ");
            // 网页的排名值
            double pageRank = Double.parseDouble(rankTable.get(pageInfo[0]));
            // 网页的出站链接数
            int outLink = pageInfo.length - 1;
            ReducePageRankWritable writable;
            writable = new ReducePageRankWritable();
            // 计算贡献值并保存
            writable.setData(String.valueOf(pageRank / outLink));
            // 设置对应标识
            writable.setTag(ReducePageRankWritable.PR_L);
            // 对于每一个出站链接,输出贡献值
            for (int i = 1; i < pageInfo.length; i += 1) {
                context.write(new Text(pageInfo[i]), writable);
            }
            writable = new ReducePageRankWritable();
            // 保存网页信息并标识
            writable.setData(value.toString());
            writable.setTag(ReducePageRankWritable.PAGE_INFO);
            // 以输入的网页信息的网页名称为key进行输出
            context.write(new Text(pageInfo[0]), writable);
        }

        if(PageRankRunner.iteration != 0) {
            String[] pageInfo = value.toString().split(" ");
            // 网页的出站链接数
            int outLink = pageInfo.length - 2;
            // 网页的排名值
            double pageRank = Double.parseDouble(pageInfo[outLink + 1]);
            ReducePageRankWritable writable;
            writable = new ReducePageRankWritable();
            // 计算贡献值并保存
            writable.setData(String.valueOf(pageRank / outLink));
            // 设置对应标识
            writable.setTag(ReducePageRankWritable.PR_L);
            // 对于每一个出站链接,输出贡献值
            for (int i = 1; i < pageInfo.length - 1; i += 1) {
                context.write(new Text(pageInfo[i]), writable);
            }
            writable = new ReducePageRankWritable();
            // 保存网页信息并标识
            writable.setData(value.toString());
            writable.setTag(ReducePageRankWritable.PAGE_INFO);
            // 以输入的网页信息的网页名称为key进行输出
            context.write(new Text(pageInfo[0]), writable);
        }
    }
}

PageRankReducerImpl

package DSPPCode.mapreduce.pagerank.impl;

import DSPPCode.mapreduce.pagerank.question.PageRankReducer;
import DSPPCode.mapreduce.pagerank.question.ReducePageRankWritable;
import DSPPCode.mapreduce.pagerank.question.PageRankRunner;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PageRankReducerImpl extends PageRankReducer {

    // 阻尼系数
    private static final double D = 0.85;

    @Override
    public void reduce(Text key, Iterable<ReducePageRankWritable> values, Context context)
            throws IOException, InterruptedException {
        /* 步骤2:编写处理逻辑将[K2,V2]转换为[K3,V3]并输出 */
        String[] pageInfo = null;
        // 从配置项中读取网页的总数
        int totalPage = context.getConfiguration().getInt(PageRankRunner.TOTAL_PAGE, 0);
        // 从配置项中读取当前迭代步数
        int iteration = context.getConfiguration().getInt(PageRankRunner.ITERATION, 0);
        double sum = 0;
        for (ReducePageRankWritable value : values) {
            String tag = value.getTag();
            // 如果是贡献值则进行求和,否则以空格为分隔符切分后保存到pageInfo
            if (tag.equals(ReducePageRankWritable.PR_L)) {
                sum += Double.parseDouble(value.getData());
            } else if (tag.equals(ReducePageRankWritable.PAGE_INFO)) {
                pageInfo = value.getData().split(" ");
            }
        }
        // 根据公式计算排名值
        double pageRank = (1 - D) / totalPage + D * sum;
        // 更新网页信息中的排名值
        if (PageRankRunner.iteration != 0)
            pageInfo[pageInfo.length - 1] = String.valueOf(pageRank);
        // 最后一次迭代输出网页名及排名值,而其余迭代输出网页信息
        StringBuilder result = new StringBuilder();
        if (iteration == (PageRankRunner.MAX_ITERATION - 1)) {
            result.append(pageInfo[0]).append(" ").append(String.format("%.5f", pageRank));
        } else {
            for (String data : pageInfo) {
                result.append(data).append(" ");
            }
            if (PageRankRunner.iteration == 0)
                result.append(String.valueOf(pageRank));
        }
//        StringBuilder result = new StringBuilder();
//        result.append(pageInfo[0]).append(" ").append(String.format("%.5f", pageRank));
        context.write(new Text(result.toString()), NullWritable.get());
    }
}

Word Count

待完成:

  • 请在DSPPCode.mapreduce.warm_up中创建TokenizerMapperImpl, 继承TokenizerMapper, 实现抽象方法.

  • 请在DSPPCode.mapreduce.warm_up中创建IntSumReducerImpl, 继承IntSumReducer, 实现抽象方法.

题目描述:

  • 现存在一份英文文本文件,要求统计文本中每个单次出现的次数,并将计数结果输出到文本中。其中,输出文本的每行由单词和频数组成,单次和频数之间用制表符分隔。

IntSumReducerImpl

package DSPPCode.mapreduce.warm_up.impl;

import DSPPCode.mapreduce.warm_up.question.IntSumReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * 答案示例
 */
public class IntSumReducerImpl extends IntSumReducer {

  private IntWritable result = new IntWritable();

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }

}

TokenizerMapperImpl

package DSPPCode.mapreduce.warm_up.impl;

import DSPPCode.mapreduce.warm_up.question.TokenizerMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Pattern;

/**
 * 答案示例
 */
public class TokenizerMapperImpl extends TokenizerMapper {

  private static final IntWritable ONE = new IntWritable(1);

  private final Text word = new Text();

  private final Pattern pattern = Pattern.compile("\\W+");

  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      String str = itr.nextToken();
      str = pattern.matcher(str).replaceAll("");
      word.set(str);
      context.write(word, ONE);
    }
  }

}