跳转至

2021-2022-2-编程作业1

2019级,MapReduce

Student Info

难易程度:* 易

待完成:

  • 请在 DSPPCode.mapreduce.student_info.impl 中创建 StudentInfoMapperImpl 和 StudentInfoReducerImpl, 分别继承 StudentInfoMapper 和 StudentInfoReudcer, 实现抽象方法

题目描述:

  • 某校统计了在校学生的性别和身高数据,现要求对这些数据进行处理以分别计算出男生身高的最大值与男生平均身高的差,女生身高的最小值与女生平均身高的差。

  • 输入格式:

数据保存在文件中,文件的每行由学生的序号、性别以及身高信息(单位为cm)组成。信息之间用逗号分隔。

1,F,170 2,M,178 3,M,174 4,F,165

  • 输出: 请输出性别和对应差值,中间用制表符分隔。 F -2.5 M 2.0

StudentInfoMapperImpl

package DSPPCode.mapreduce.student_info.impl;

import DSPPCode.mapreduce.student_info.question.StudentInfoMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class StudentInfoMapperImpl extends StudentInfoMapper {

  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {

    String[] info = value.toString().split(",");
    String gender = info[1];
    int height = Integer.parseInt(info[2]);
    // System.out.println(gender);
    // System.out.println(height);
    context.write(new Text(gender), new IntWritable(height));
  }
}

StudentInfoReducerImpl

package DSPPCode.mapreduce.student_info.impl;

import DSPPCode.mapreduce.student_info.question.StudentInfoReducer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class StudentInfoReducerImpl extends StudentInfoReducer {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int num = 0;
    int sum = 0;
    double avg = 0;
    int max = 0;
    int min = 1000;
    for(IntWritable value : values)
    {
      int tmp = value.get();
      num++;
      sum += tmp;
      if(tmp > max)
      {
        max = tmp;
      }
      if(tmp < min)
      {
        min = tmp;
      }
    }
    avg = (double) sum / (double) num;

    String gender = key.toString();
    if(gender.equals("M"))
    {
      context.write(new Text(gender), new DoubleWritable(max - avg));
    }else{
      context.write(new Text(gender), new DoubleWritable(min - avg));
    }

  }
}

Inverted Index

难易程度: ** 中

待完成

  • 新建DSPPCode.mapreduce.inverted_index.impl文件夹
  • 请在DSPPCode.mapreduce.inverted_index.impl中创建InvertedIndexMapperImpl, 继承InvertedIndexMapper, 实现抽象方法。
  • 请在DSPPCode.mapreduce.inverted_index.impl中创建InvertedIndexReducerImpl, 继承InvertedIndexReducer, 实现抽象方法。

题目描述

倒排索引是 Elasticsearch 中非常重要的索引结构,是从文档单词到文档 ID 的过程。 倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。 由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index) 现实中,倒排索引主要应用于搜索引擎中,用于构建单词到文档的索引,从而能够快速的通过用户的输入查找相关的网页。

本题目需要实现构建倒排索引的过程。具体来说,给定一组英文文档,使用空格进行分词(文档中不包含标点符号),将所有单词转换为小写,并排除停用词(stop word)后, 建立单词的倒排索引(输出key为单词,value为以文件名和单词出现次数组成的字符串,不同文件之间用";"分割 ,详见样例)。

样例

输入

//输入由多个文件的文本内容构成,下面列举了两个文件的文本内容 
//www.bbc.comnewsworld-asia-china-60615280
Ukraine invasion Can China do more to stop Russia's war in Ukraine
//www.bbc.comnewsworld-europe-60506682
Ukraine maps Ukraine says Russian ceasefire offer immoral
// stopwords.txt
can
and
to
in

输出

//输出格式为 单词 文件名1:次数1;文件名2:次数2;
Ukraine www.bbc.comnewsworld-asia-china-60615280:2;www.bbc.comnewsworld-europe-60506682:2  
invasion    www.bbc.comnewsworld-asia-china-60615280::1;
China   www.bbc.comnewsworld-asia-china-60615280::1;
do  www.bbc.comnewsworld-asia-china-60615280::1;
more    www.bbc.comnewsworld-asia-china-60615280::1;
stop    www.bbc.comnewsworld-asia-china-60615280::1;
Russia's    www.bbc.comnewsworld-asia-china-60615280::1;
war www.bbc.comnewsworld-asia-china-60615280::1;
maps    www.bbc.comnewsworld-europe-60506682:1;  
says    www.bbc.comnewsworld-europe-60506682:1;  
Russian www.bbc.comnewsworld-europe-60506682:1;  
ceasefire   www.bbc.comnewsworld-europe-60506682:1;
offer   www.bbc.comnewsworld-europe-60506682:1;
immoral www.bbc.comnewsworld-europe-60506682:1;

InvertedIndexMapperImpl

package DSPPCode.mapreduce.inverted_index.impl;

import DSPPCode.mapreduce.inverted_index.question.InvertedIndexMapper;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;



public class InvertedIndexMapperImpl extends InvertedIndexMapper {

  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {

    // 读取停用词
    URI uri = context.getCacheFiles()[0];
    FileSystem fs = FileSystem.get(uri, new Configuration());
    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(uri))));

    List<String> stopwords = new ArrayList<>();
    String content;
    while ((content = reader.readLine()) != null) {
      stopwords.add(content);
    }

    // 读取单词
    String[] words = value.toString().toLowerCase().split(" ");

    // 读取文件名
    //FileSplit split = (FileSplit) context.getInputSplit();
    split = (FileSplit) context.getInputSplit();
    String path_str = split.getPath().toString();
    String[] paths = path_str.split("/");
    String path = paths[paths.length - 1];
    //System.out.println(path_str);

    // 去除停用词
    for (String word : words) {
      int flag = 0;
      for (String str : stopwords) {
        if (word.equals(str)) {
          flag = 1;
          break;
        }
      }

      if (flag == 1) {
        //System.out.println(word);
        continue;
      }
      //System.out.println(path);
      context.write(new Text(word), new Text(path));
    }
  }
}

InvertedIndexReducerImpl

package DSPPCode.mapreduce.inverted_index.impl;

import DSPPCode.mapreduce.inverted_index.question.InvertedIndexReducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class InvertedIndexReducerImpl extends InvertedIndexReducer {


  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {

    // 统计
    Map<String, Integer> map = new HashMap<>();
    for (Text value : values) {
      //System.out.println(value.toString());
      String address = value.toString();
      if (map.containsKey(address)) {
        map.put(address, map.get(address) + 1);
      } else {
        map.put(address, 1);
      }
    }

    Set<String> set = map.keySet();
    Iterator<String> it = set.iterator();
    // 输出
    StringBuilder output_value = new StringBuilder("");
    while (it.hasNext()) {
      String key_it = it.next();
      Integer value_it = map.get(key_it);
      //System.out.println(value_it);
      String tmp = key_it + DELIMITER_COLON + value_it.toString() + DELIMITER_SEMI;
      output_value.append(tmp);
    }
    //System.out.println(output_value.toString());
    //System.out.println(key);
    if(key.toString().length()>0)
    {
      context.write(key, new Text(output_value.toString()));
    }
  }
}

Common PageRank

难易程度: *** 难

待完成

  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinMapperImpl,继承PageRankJoinMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinReducerImpl,继承PageRankJoinReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankMapperImpl,继承PageRankMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法

题目描述:

  • 基于两个输入文本(网页链接关系、初始的网页排名)实现网页链接排名算法(阻尼系数以0.85计算)。 本题对网页排名值的收敛条件做了简化,如果当某一网页当前排名值与上一轮迭代排名值之间差值的绝对值小于1e-6,那么认为该网页的排名值已经收敛。 迭代停止的条件为达到最大迭代次数或某次迭代中所有网页均收敛。 网页总数N在测试阶段由后台自动给出。
  • 输入格式:文本中的第一列都为网页名,列与列之间用空格分隔。其中,

网页链接关系文本中的其他列为出站链接,如A B D表示网页A链向网页B和D(所有网页权重按1.0计算) A B D B C C A B D B C 初始的网页排名文本第二列为该网页的排名值,如 A 1 表示网页A的排名为1 A 1 B 1 C 1 D 1

  • 输出格式: 要求分两步完成。第一步连接网页链接关系和初始的网页排名两个文件,输出连接结果: A 1 B D B 1 C C 1 A B D 1 B C 第二步输出网页的链接关系和最终的排名值: A 0.21436248817266176 B D B 0.3633209225962085 C C 0.40833002013844744 A B D 0.1302651623462253 B C

注意:连接阶段无需将排名值解析为数值类型, 计算网页排名的阶段请将排名值解析为 double 类型变量进行计算。

PageRankJoinMapperImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinMapper;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class PageRankJoinMapperImpl extends PageRankJoinMapper {

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    // 读取文件
    FileSplit split = (FileSplit) context.getInputSplit();
    String path_str = split.getPath().toString();
    //String[] paths = path_str.split("/");
    //String path = paths[paths.length - 1];

    String[] data = value.toString().split(" ");

    ReduceJoinWritable output_value = new ReduceJoinWritable();
    output_value.setData(value.toString());

    if(path_str.contains("ranks"))
    {
      output_value.setTag(ReduceJoinWritable.PAGERNAK);
    }
    if(path_str.contains("pages"))
    {
      output_value.setTag(ReduceJoinWritable.PAGEINFO);
    }
    context.write(new Text(data[0]), output_value);

  }
}

PageRankJoinReducerImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinReducer;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class PageRankJoinReducerImpl extends PageRankJoinReducer {

  @Override
  public void reduce(Text key, Iterable<ReduceJoinWritable> values, Context context)
      throws IOException, InterruptedException {
    String data_ranks = null;
    String data_pages = null;

    for (ReduceJoinWritable value : values)
    {
      //System.out.println(value.getTag());
      if(value.getTag().equals(ReduceJoinWritable.PAGERNAK))
      {
        // System.out.println(value.getTag());
        data_ranks = value.getData();
      }
      if(value.getTag().equals(ReduceJoinWritable.PAGEINFO))
      {
        // System.out.println(value.getTag());
        data_pages = value.getData();
        //System.out.println(data_pages);
      }
    }

    assert data_pages != null;
    String[] pages = data_pages.split(" ");
    String tmp = "";
    for (int i=1; i < pages.length; i++)
    {
      tmp = tmp  + pages[i];
      if(i < pages.length - 1)
      {
        tmp = tmp + " ";
      }
    }


    String output_key;
    if(tmp.equals(""))
    {
      output_key = data_ranks;
    }else {
      output_key = data_ranks + " " + tmp;
    }

    // System.out.println(output_key);

    context.write(new Text(output_key), NullWritable.get());
  }
}

PageRankMapperImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankMapper;

public class PageRankMapperImpl extends PageRankMapper {

}

PageRankReducerImpl

package DSPPCode.mapreduce.common_pagerank.impl;

import DSPPCode.mapreduce.common_pagerank.question.PageRankReducer;
import DSPPCode.mapreduce.common_pagerank.question.PageRankRunner;
import DSPPCode.mapreduce.common_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import java.io.IOException;

public class PageRankReducerImpl extends PageRankReducer {
  private static final double D = 0.85;

  @Override
  public void reduce(Text key, Iterable<ReducePageRankWritable> values, Context context)
      throws IOException, InterruptedException {

    int totalPage = context.getConfiguration().getInt(PageRankRunner.TOTAL_PAGE, 0);
    int iteration = context.getConfiguration().getInt(PageRankRunner.ITERATION, 0);

    double data_ranks = 0;
    String data_pages = null;

    for (ReducePageRankWritable value : values)
    {
      if(value.getTag().equals(ReducePageRankWritable.PR_L))
      {
        data_ranks += Double.parseDouble(value.getData());
      }
      if(value.getTag().equals(ReducePageRankWritable.PAGE_INFO))
      {
        data_pages = value.getData();
      }
    }
    data_ranks = (1 - D) /totalPage + D * data_ranks;
    assert data_pages != null;
    String[] data = data_pages.split(" ");
    String output_key = key.toString() + " " + data_ranks;
    for(int i = 2; i < data.length; i++)
    {
      output_key += " ";
      output_key += data[i];
    }


    // 判断是否收敛
    double p1 = Double.parseDouble(data_pages.split(" ")[1]);
    double p2 = Double.parseDouble(output_key.split(" ")[1]);
    //System.out.println(p1);
    if(Math.abs(p2 - p1) < PageRankRunner.DELTA)
    {
      Counter counter = context.getCounter(PageRankRunner.GROUP_NAME,PageRankRunner.COUNTER_NAME);
      counter.increment(1);//增加计数
    }
    //System.out.println();
    context.write(new Text(output_key), NullWritable.get());
  }
}

崩溃的SQL

难易程度:** 中

待完成

  • 请在DSPPCode.mapreduce.crashed_sql.impl中创建SQLMapperImpl,继承SQLMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.crashed_sql.impl中创建SQLReducerImpl,继承SQLReducer,实现抽象方法

题目描述:

  • DASE店铺在开业一周年之际,决定通过以下活动来回馈新老用户,即在过去一年内订单金额前5的订单可以享受八折优惠。Tom是一名SQL开发人员,他为了从历史订单表中找出总金额前五的订单,很轻松地就写出了相应的SQL语句 SELECT id,UserName,SUM(Price) total FROM orders ORDER BY total DESC LIMIT 5; 但海量的订单导致该SQL查询在单机数据库中执行一段时间后就崩溃而无法得到结果,Tom只好求助作为大数据开发工程师的你,希望你能帮他实现该SQL语句的功能。 请你通过MapReduce程序来实现上述SQL语句的功能。
  • 输入格式:每行表示一条订单记录,第一列到最后一列分别表示订单Id,用户名,购买产品,单价(int),购买数量(int),订单日期,每列之间用制表符分隔。

    Id UserName Product Price Number Date u00001 rookie 文化衫 45 20 2021-6-11 u00003 lucy 贺卡 15 35 2021-9-08 u00027 wang 钢笔 27 10 2021-3-22 u00102 anni 鼠标 108 1 2021-5-27 u00004 lucy 水杯 98 1 2021-11-09 u00005 rookie 教材 40 2 2021-9-01 u00019 lucy 文化衫 45 4 2021-10-03 u00110 song 教材 55 1 2021-9-01 u00111 shy 教材 35 1 2021-9-02 u00311 shy 本子 20 2 2021-9-02

  • 输出格式: 请输出订单Id,UserName,total,每列之间用制表符分隔 u00001 rookie 900 u00003 lucy 525 u00027 wang 270 u00019 lucy 180 u00102 anni 108

SQLMapperImpl

package DSPPCode.mapreduce.crashed_sql.impl;

import DSPPCode.mapreduce.crashed_sql.question.SQLMapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


public class SQLMapperImpl extends SQLMapper {

  public List<Integer> pay = new ArrayList<>();
  public List<String> lst = new ArrayList<>();

  @Override
  public void map(Object key, Text value, Context context) {

    String[] data = value.toString().split("\t");
    String order = data[0];
    String UserName = data[1];
    Integer price = Integer.valueOf(data[3]);
    Integer Number = Integer.valueOf(data[4]);
    int total = price * Number;
    // 构造字符串
    String output = order + "\t" + UserName + "\t" + total;
    lst.add(output);
    pay.add(total);

  }

  @Override
  protected void cleanup(Context context)
      throws IOException, InterruptedException {
    // 排序
    Collections.sort(pay);
    int x = -1;
    if (pay.size() > 5) {
      x = pay.get(pay.size() - 5);
    }
    //System.out.println(pay.get(pay.size() - 5));

    for (String str : lst) {
      String[] data = str.split("\t");
      int total = Integer.parseInt(data[2]);
      if (total >= x) {
        context.write(new Text(str), new LongWritable(total));
      }
    }
  }

}

SQLReducerImpl

package DSPPCode.mapreduce.crashed_sql.impl;

import DSPPCode.mapreduce.crashed_sql.question.SQLReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

public class SQLReducerImpl extends SQLReducer {

  public List<Integer> pay = new ArrayList<>();
  public List<String> lst = new ArrayList<>();

  @Override
  public void reduce(Text key, Iterable<LongWritable> values, Context context) {
    String str = key.toString();
    String[] data = str.split("\t");
    int total = Integer.parseInt(data[2]);
    lst.add(str);
    pay.add(total);
  }


  @Override
  protected void cleanup(Context context)
      throws IOException, InterruptedException {
    // 排序
    Collections.sort(pay);
    int x = -1;
    if (pay.size() > 5) {
      x = pay.get(pay.size() - 5);
    }
    //System.out.println(pay.get(pay.size() - 5));

    // 排序
    lst.sort(new Comparator<String>() {
      public int compare(String entry1, String entry2) {
        int x = Integer.parseInt(entry1.split("\t")[2]);
        int y = Integer.parseInt(entry2.split("\t")[2]);
        return y - x;
      }
    });

    for (String str : lst) {
      String[] data = str.split("\t");
      int total = Integer.parseInt(data[2]);
      // System.out.println(total);
      if (total >= x) {
        // System.out.println(str);
        context
            .write(new Text(data[0] + "\t" + data[1]), new LongWritable(Integer.parseInt(data[2])));
      }
    }
  }
}