跳转至

2023-2024-2-编程作业1

2021级,MapReduce

【2024.4.20】全部完成

MapReduce Warm Up(送分)

待完成:

  • 请在DSPPCode.mapreduce.warm_up.impl中创建TokenizerMapperImpl, 继承TokenizerMapper, 实现抽象方法.

  • 请在DSPPCode.mapreduce.warm_up.impl中创建IntSumReducerImpl, 继承IntSumReducer, 实现抽象方法.

题目描述:

  • 现存在一份英文文本文件,要求统计文本中每个单次出现的次数,并将计数结果输出到文本中。其中,输出文本的每行由单词和频数组成,单次和频数之间用制表符分隔。

这个只需要根据视频的指南生成一个jar包提交即可,确实是送分。

TokenizerMapperImpl

package DSPPCode.mapreduce.warm_up.impl;

import DSPPCode.mapreduce.warm_up.question.TokenizerMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Pattern;

/**
 * 答案示例
 */
public class TokenizerMapperImpl extends TokenizerMapper {

  private static final IntWritable ONE = new IntWritable(1);

  private final Text word = new Text();

  private final Pattern pattern = Pattern.compile("\\W+");

  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      String str = itr.nextToken();
      str = pattern.matcher(str).replaceAll("");
      word.set(str);
      context.write(word, ONE);
    }
  }

}

IntSumReducerImpl

package DSPPCode.mapreduce.warm_up.impl;

import DSPPCode.mapreduce.warm_up.question.IntSumReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * 答案示例
 */
public class IntSumReducerImpl extends IntSumReducer {

  private IntWritable result = new IntWritable();

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }

}

Difference

难易程度: * 易

待完成

  • 请在DSPPCode.mapreduce.difference.impl中创建DifferenceMapperImpl, 继承DifferenceMapper, 实现抽象方法。
  • 请在DSPPCode.mapreduce.difference.impl中创建DifferenceReducerImpl, 继承DifferenceReducer, 实现抽象方法。

题目描述

  • 学校图书馆的自助借书系统因为网络原因崩溃了,无法同步更新每天借书的学生信息,两位图书管理员只能各自记录了一份借阅图书(R)和归还图书(S)的学生名单。现在,他们希望你能够帮助他们合并两份名单,整理为一份还未归还图书的学生名单。 请你通过MapReduce程序来实现以上功能。

  • 输入格式:输入只有一个文件,文本的第一列为学生姓名,第二列为图书ID,列与列之间用Tab分隔。

  • R: Alice 1 Bob 2 Sam 3 Era 4
  • S:
    Alice 1 Bob 2

  • 输出格式:与输入文件的格式相同 Era 4 Sam 3

不用很严谨,就是特别简单的一个题,我的思路用到了文件名。

需要修改的代码皆在impl目录下面

DifferenceMapperImpl

package DSPPCode.mapreduce.difference.impl;

import DSPPCode.mapreduce.difference.question.DifferenceMapper;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class DifferenceMapperImpl extends DifferenceMapper {
  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {
    String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
    context.write(value,new Text(fileName));
  }
}

DifferenceReducerImpl

package DSPPCode.mapreduce.difference.impl;

import DSPPCode.mapreduce.difference.question.DifferenceReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.*;
public class DifferenceReducerImpl extends DifferenceReducer {
  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException{
    Iterator<Text> itr = values.iterator();
    boolean flagR = false;
    boolean flagS = false;
    while (itr.hasNext()) {
      String s = itr.next().toString();
      if (s.equals("R")) {
        flagR = true;
      }
      if (s.equals("S")) {
        flagS = true;
      }
    }
    if (flagR && !flagS) {
      context.write(key, NullWritable.get());
    }
  }
}

Transitive Closure

难易程度: **中

待完成

  • 请在 DSPPCode.mapreduce.transitive_closure.impl 中创建 TransitiveClosureMapperImpl, 继承TransitiveClosureMapper, 实现抽象方法。
  • 请在 DSPPCode.mapreduce.transitive_closure.impl 中创建 TransitiveClosureReducerImpl, 继承TransitiveClosureReducer, 实现抽象方法。

题目描述:

child parent Jack Philip Jack Jesse Philip Terry Philip Alma

输入保存在文本中,文本的第一行为关系标识,其余行为子女和父母的对应关系。以上述示例为例,Jack Philip表示Jack与Philip之间存在着子女-父母的关系。

  • 输出格式:

Jack Terry Jack Alma

输出保存在文本中,文本的每一行为孙子女和祖父母的对应关系。以上述示例为例,Jack,Terry表示Jack与Terry之间存在着孙子女-祖父母关系。

需要修改的代码皆在impl目录下面

TransitiveClosureMapperImpl

package DSPPCode.mapreduce.transitive_closure.impl;

import DSPPCode.mapreduce.transitive_closure.question.TransitiveClosureMapper;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.StringTokenizer;
public class TransitiveClosureMapperImpl extends TransitiveClosureMapper{
  private Text child = new Text();
  private Text parent = new Text();
  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException{
    String[] v1 =value.toString().split("\\s+");
    String child = v1[0];
    String parent = v1[1];
    if (!child.equals("child")){
      context.write(new Text(child), new Text("-" + parent));
      context.write(new Text(parent), new Text("+" + child));
    }

  }

}

TransitiveClosureReducerImpl

package DSPPCode.mapreduce.transitive_closure.impl;

import DSPPCode.mapreduce.transitive_closure.question.TransitiveClosureReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.NullWritable;
public class TransitiveClosureReducerImpl extends TransitiveClosureReducer{
  List<Text> grand = new ArrayList<Text>();
  List<Text> child = new ArrayList<Text>();
  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException{
    for (Text v : values) {
      String s = v.toString();
      if (v.toString().startsWith("-")) {
        child.add(new Text(s.substring(1)));
      } else {
        grand.add(new Text(s.substring(1)));
      }
    }
    for (int i = 0; i < child.size(); i++) {
      for (int j = 0; j < grand.size(); j++) {
        context.write(grand.get(j),child.get(i) );
      }
    }
    grand.clear();
    child.clear();
  }

}

Frequent item analysis

难易程度: **中

待完成

  • 请在DSPPCode.mapreduce.frequent_item_analysis.impl中创建FrequentItemAnalysisMapperImpl,继承FrequentItemAnalysisMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.frequent_item_analysis.impl中创建FrequentItemAnalysisRunnerImpl,继承FrequentItemAnalysisRunner,实现抽象方法
  • 请在DSPPCode.mapreduce.frequent_item_analysis.impl中创建FrequentItemAnalysisReducerImpl,继承FrequentItemAnalysisReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.frequent_item_analysis.SortHelperImpl,SortHelper,实现抽象方法

问题描述

频繁项挖掘是一种数据挖掘技术,旨在找出数据集中经常一起出现的项集合。这些项集合被称为频繁项集,可以用于发现数据中的模式、关联和规律。

在频繁项挖掘中,有两个概念分别是阶,支持度。

  • 支持度:指在给定的数据集中,包含某个特定项集的事务数量与总事务数量的比例。计算支持度的公式如下: $\(\text{Support}(A) = \frac{\text{Number of transactions containing }A}{\text{Total number of transactions}}\)$ 其中\(A\)是一个频繁项集,\(\text{Number of transactions containing }A\)是包含项集\(A\)的事务数量,\(\text{Total number of transactions}\)是数据库中的总事务数量。

  • 阶:指的是项集的规模,即项集包含的项的数量。例如,一阶频繁项集是指包含单个项的集合,二阶频繁项集是指包含两个项的集合,依此类推。\(n\)阶频繁项集就是指那些包含\(n\)个项的频繁项集。

例如,在数据库中,我们有如下几行交易数据:

crackers,bread,banana 
crackers,coke,butter,coffee 
crackers,bread 
crackers,bread 
crackers,bread,coffee 
butter,coke 
butter,coke,bread,crackers 

同时设频繁项的支持度\(\text{Support = 0.5}\)则我们可以挖掘到下述频繁项集: - 一阶频繁项集:{crackers}, {bread} - 二阶频繁项集:{crackers,bread} - 三阶频繁项集:无

现给予一个交易记录文件,请使用MapReduce挖掘其\(n\)阶频繁项

输入格式:

输入一共一个文件Transactions,每一行记录一次交易中所购买的商品,不同商品使用英文逗号","隔开。

Runner的run方法输入String数组,长度为5,分别是输入路径,输出路径,阶数,支持度,行数

crackers,bread,banana 
crackers,coke,butter,coffee 
crackers,bread 
crackers,bread 
crackers,bread,coffee 
butter,coke 
butter,coke,bread,crackers 

输出格式:输出满足支持度\(n\)阶频繁项集合,其中频繁项内部使用字典序升序排序,不同商品使用英文逗号","隔开。一行输出一个频繁项。在\(n = 2,\text{support}=0.5\)情况下输出为

bread,crackers

需要修改的代码皆在impl目录下面

FrequentItemAnalysisMapperImpl

package DSPPCode.mapreduce.frequent_item_analysis.impl;

import DSPPCode.mapreduce.frequent_item_analysis.question.FrequentItemAnalysisMapper;
import DSPPCode.mapreduce.frequent_item_analysis.question.SortHelper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class FrequentItemAnalysisMapperImpl extends FrequentItemAnalysisMapper {

  private void subSet(List<String> strs, List<String> path, int i, Mapper<LongWritable, Text, Text, IntWritable>.Context context, int m)
      throws IOException, InterruptedException {
    if(i == strs.size() || path.size() == m) {
      if(path.size() != 0) {
        StringBuilder builder = new StringBuilder();
        for (String s : path) {
          builder.append(s);
          builder.append(",");
        }
        builder.deleteCharAt(builder.length() - 1);
        context.write(new Text(builder.toString()), new IntWritable(1));
      }
      return;
    }
    path.add(strs.get(i));
    subSet(strs, path, i + 1, context, m);
    path.remove(path.size() - 1);
    subSet(strs, path, i + 1, context, m);
  }

  @Override
  public void map(LongWritable key, Text value,
      Mapper<LongWritable, Text, Text, IntWritable>.Context context)
      throws IOException, InterruptedException {
    String s = value.toString();
    String[] split = s.split(",");
    List<String> list = new ArrayList<>();
    for (String s1 : split) {
      list.add(s1);
    }
    list = new SortHelperImpl().sortSeq(list);
    int m = context.getConfiguration().getInt("number.of.pairs", 0);
    subSet(list, new ArrayList<String>(), 0, context, m);
  }
}

Combiner

package DSPPCode.mapreduce.frequent_item_analysis.impl;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class Combiner extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,
      Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException {
    int count = 0;
    for (IntWritable value : values) {
      count += value.get();
    }
    context.write(key, new IntWritable(count));
  }
}

FrequentItemAnalysisReducerImpl

package DSPPCode.mapreduce.frequent_item_analysis.impl;

import DSPPCode.mapreduce.frequent_item_analysis.question.FrequentItemAnalysisReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class FrequentItemAnalysisReducerImpl extends FrequentItemAnalysisReducer {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Reducer<Text, IntWritable, Text, NullWritable>.Context context)
      throws IOException, InterruptedException {
    int count = 0;
    for (IntWritable value : values) {
      count += value.get();
    }
    String nStr = context.getConfiguration().get("number.of.pairs");
    Integer n = Integer.valueOf(nStr);
    int length = key.toString().split(",").length;
    if(n != length)
      return;
    String countStr = context.getConfiguration().get("count.of.transactions");
    Integer total = Integer.valueOf(countStr);
    String supportStr = context.getConfiguration().get("support");
    Double support = Double.valueOf(supportStr);
    double curSupport = (double)count / total;
    if(curSupport >= support) {
      context.write(key, NullWritable.get());
    }
  }
}

FrequentItemAnalysisRunnerImpl

package DSPPCode.mapreduce.frequent_item_analysis.impl;

import DSPPCode.mapreduce.frequent_item_analysis.question.FrequentItemAnalysisRunner;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
import java.net.URISyntaxException;

public class FrequentItemAnalysisRunnerImpl extends FrequentItemAnalysisRunner {

  @Override
  public void configureMapReduceTask(Job job) throws IOException, URISyntaxException {
    job.setCombinerClass(Combiner.class);
  }
}

SortHelperImpl

package DSPPCode.mapreduce.frequent_item_analysis.impl;

import DSPPCode.mapreduce.frequent_item_analysis.question.SortHelper;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

public class SortHelperImpl extends SortHelper {

  @Override
  public List<String> sortSeq(List<String> input) {
    input.sort(new Comparator<String>() {
      @Override
      public int compare(String o1, String o2) {
        return o1.compareTo(o2);
      }
    });
    return input;
  }
}

Common PageRank

难易程度: *** 难

待完成

  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinMapperImpl,继承PageRankJoinMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinReducerImpl,继承PageRankJoinReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankMapperImpl,继承PageRankMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法

题目描述:

  • 基于两个输入文本(网页链接关系、初始的网页排名)实现网页链接排名算法(阻尼系数以0.85计算)。 本题对网页排名值的收敛条件做了简化,如果当某一网页当前排名值与上一轮迭代排名值之间差值的绝对值小于1e-6,那么认为该网页的排名值已经收敛。 迭代停止的条件为达到最大迭代次数或某次迭代中所有网页均收敛。 网页总数N在测试阶段由后台自动给出。
  • 输入格式:文本中的第一列都为网页名,列与列之间用空格分隔。其中,

网页链接关系文本中的其他列为出站链接,如A B D表示网页A链向网页B和D(所有网页权重按1.0计算) A B D B C C A B D B C 初始的网页排名文本第二列为该网页的排名值,如 A 1 表示网页A的排名为1 A 1 B 1 C 1 D 1

  • 输出格式: 要求分两步完成。第一步连接网页链接关系和初始的网页排名两个文件,输出连接结果: A 1 B D B 1 C C 1 A B D 1 B C 第二步输出网页的链接关系和最终的排名值: A 0.21436248817266176 B D B 0.3633209225962085 C C 0.40833002013844744 A B D 0.1302651623462253 B C

注意:连接阶段无需将排名值解析为数值类型, 计算网页排名的阶段请将排名值解析为 double 类型变量进行计算。 输出结果的小数位数无需处理

需要修改的代码皆在impl目录下

PageRankJoinMapperImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinMapper;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class PageRankJoinMapperImpl extends PageRankJoinMapper {

  @Override
  public void map(LongWritable key, Text value,
      Mapper<LongWritable, Text, Text, ReduceJoinWritable>.Context context)
      throws IOException, InterruptedException {
    String line = value.toString();
    String[] segments = line.split(" ", 2);
    ReduceJoinWritable val = new ReduceJoinWritable();
    val.setData(segments[1]);
    Pattern pattern = Pattern.compile("[0-9]*\\.?[0-9]+");
    Matcher isNum = pattern.matcher(segments[1]);
    if(isNum.matches()) val.setTag("2");
    else val.setTag("1");
    context.write(new Text(segments[0]), val);
  }
}

PageRankJoinReducerImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinReducer;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class PageRankJoinReducerImpl extends PageRankJoinReducer {

  @Override
  public void reduce(Text key, Iterable<ReduceJoinWritable> values,
      Reducer<Text, ReduceJoinWritable, Text, NullWritable>.Context context)
      throws IOException, InterruptedException {
    String[] row = new String[2];
    for(ReduceJoinWritable value : values){
      if(value.getTag().equals("2")){
        row[0] = value.getData();
      }
      else if(value.getTag().equals("1")){
        row[1] = value.getData();
      }
    }
    String line = key.toString() + " " + row[0] + " " + row[1];
    context.write(new Text(line), NullWritable.get());
  }
}

PageRankMapperImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankMapper;

public class PageRankMapperImpl extends PageRankMapper {

}

PageRankReducerImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankReducer;
import DSPPCode.mapreduce.common_pagerank.question.PageRankRunner;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class PageRankReducerImpl extends PageRankReducer {

  @Override
  public void reduce(Text key, Iterable<ReducePageRankWritable> values,
      Reducer<Text, ReducePageRankWritable, Text, NullWritable>.Context context)
      throws IOException, InterruptedException {
    double pr = 0.0;
    String outPage = "";
    double lastPr = 0.0;
    for(ReducePageRankWritable value : values){
      if(value.getTag().equals(ReducePageRankWritable.PR_L)){
        try {
          pr += Double.parseDouble(value.getData());
        } catch (NumberFormatException e){
          e.printStackTrace();
        }
      }
      else{
        String[] segment = value.getData().split(" ", 3);
        outPage += segment[2];
        lastPr = Double.parseDouble(segment[1]);
      }
    }
    int totalPage = context.getConfiguration().getInt("1", 0);
    pr = 0.85 * pr + (1 - 0.85) / totalPage;
    if(Math.abs(pr - lastPr) < PageRankRunner.DELTA){
      context.getCounter(PageRankRunner.GROUP_NAME, PageRankRunner.COUNTER_NAME).increment(1L);
    }
    String out = key.toString() + " " + String.valueOf(pr) + " " + outPage;
    context.write(new Text(out), NullWritable.get());
  }
}