跳转至

2024-2025-2-编程作业1

2022级,MapReduce

Word Count

待完成:

  • 请在DSPPCode.mapreduce.warm_up.impl中创建TokenizerMapperImpl, 继承TokenizerMapper, 实现抽象方法.

  • 请在DSPPCode.mapreduce.warm_up.impl中创建IntSumReducerImpl, 继承IntSumReducer, 实现抽象方法.

题目描述:

  • 现存在一份英文文本文件,要求统计文本中每个单次出现的次数,并将计数结果输出到文本中。其中,输出文本的每行由单词和频数组成,单次和频数之间用制表符分隔。

代码略

Average Score

难易程度: * 易

待完成:

  • 请在 DSPPCode.mapreduce.average_score.impl 中创建 ScoreMapperImpl, 继承 ScoreMapper, 实现抽象方法

  • 请在 DSPPCode.mapreduce.average_score.impl 中创建 ScoreReducerImpl, 继承 ScoreReducer, 实现抽象方法

题目描述:

  • 某班级有若干个学生,每个学生共修读三门课程。现给定该班级中每个学生每门课程的分数,请计算每门课程的班级平均分(级平均分向下取整)。

  • 输入格式:学号,数学分析分数,概率论分数,实变函数分数。例如,10160001,98,80,75 代表学号为10160001的学生, 数学分析98分, 概率论80分, 实变函数75分。

10160001,98,80,75 10160002,53,94,77 10160003,61,86,91

  • 输出格式:课程名 班级平均分

Function of Real Variable 81 Mathematical analysis 70 Probability Theory 86

(课程名通过 Util.getCourseName 获得)

ScoreMapperImpl

package DSPPCode.mapreduce.average_score.impl;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
import DSPPCode.mapreduce.average_score.question.ScoreMapper;
import DSPPCode.mapreduce.average_score.question.Util;

import static DSPPCode.mapreduce.average_score.question.Util.getCourseName;

public class ScoreMapperImpl extends ScoreMapper {
  @Override
  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    String[] s = value.toString().split(","); // 将输入行按逗号分隔

    for (int i = 1; i<s.length; i++){
      Text course = new Text(getCourseName(i-1)); // 获取课程名
      IntWritable score = new IntWritable(Integer.parseInt(s[i])); // 转换分数为 IntWritable
      context.write(course,score);  // 输出键值对
    }
  }
}

ScoreReducerImpl

package DSPPCode.mapreduce.average_score.impl;

import DSPPCode.mapreduce.average_score.question.ScoreReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class ScoreReducerImpl extends ScoreReducer{

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum1 = 0;
    int count = 0;
    for (IntWritable val:values){
      sum1 += val.get(); // 累加分数
      count++; // 统计分数个数
    }
    context.write(key, new IntWritable(sum1 / count)); // 计算平均分
  }
}

Semi Join

难易程度: **中

待完成

  • 请在DSPPCode.mapreduce.semi_join.impl中创建SemiJoinMapperImpl,继承SemiJoinMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.semi_join.impl中创建SemiJoinRunnerImpl,继承SemiJoinRunner,实现抽象方法

问题描述

学院为了衡量精品课程的建设情况,决定使用如下SQL语句 select S#,C#,Score from SC where C# in (select C# from Course); 统计同学们的精品课程的成绩情况。然而由于学生选课记录太多了,学院的单机数据库并不能支撑这么大的计算量,现在只能请求你使用MapReduce实现上述SQL语句。

输入格式:

输入一共两个文件Course和SC,分别表示精品课程信息表Course,学生选课信息表SC。

  • Course

输入格式:精品课程信息表,第一行为表字段名:C#,CName,分别表示课程号和课程名,使用制表符隔开。剩余每一行表示一条课程记录。如:C1 Principles of Computer Construction ,表示课程号为C1的课程名称为Principles of Computer Construction。

C# CName C1 Principles of Computer Construction C2 Computer Network

  • SC

输入格式:C学生选课信息表,第一行为表字段名:S#,C#,Score,分别表示学生学号,课程号,成绩,使用制表符隔开。剩余每一行表示一个选课记录,各个字段使用制表符隔开,如:S1 C1 99 表示学号为S1的学生选修了C1课程,该课程成绩为99

S# C# Score S1 C1 43 S1 C8 46 S1 C6 51 S2 C2 66 S2 C4 53 S2 C6 58 S3 C3 97

输出格式:请输出满足SQL语句的SC记录,按照S#,C#,Score形式输出,其中第一行为字段名,剩余行为结果,每列之间使用制表符分割

S#  C#  Score
S1  C1  43
S2  C2  66

SemiJoinMapperImpl

package DSPPCode.mapreduce.semi_join.impl;

import DSPPCode.mapreduce.semi_join.question.SemiJoinMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;


public class SemiJoinMapperImpl extends SemiJoinMapper {
  private Map<String,String>courseTable = new HashMap<>();
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    // 加载Course文件到内存
    if(courseTable.isEmpty())
    {
      URI uri = context.getCacheFiles()[0]; // 获取文件的 URI
      FileSystem fs = FileSystem.get(uri,new Configuration());

      FSDataInputStream fis = fs.open(new Path(uri));
      BufferedReader reader = new BufferedReader( new InputStreamReader(fis) );
      String content;
      while ((content=reader.readLine())!=null) {
        String[] Data = content.split("\t");  // 分割课程记录
        courseTable.put( Data[0], Data[1] ); // 将课程号和课程名存入 HashMap
      }
    }
    // 对SC文件而言
    String[] stu_Data = value.toString().split("\t");
    String CourseNumber = stu_Data[1]; // 获取课程号
    if(courseTable.containsKey(CourseNumber)) {
      // 检查课程号是否存在于 courseTable 中
      context.write(new Text(value.toString()), NullWritable.get());
    }
  }
}

SemiJoinRunnerImpl

package DSPPCode.mapreduce.semi_join.impl;
import DSPPCode.mapreduce.semi_join.question.SemiJoinRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.io.Text;
import java.io.File;

public class SemiJoinRunnerImpl extends SemiJoinRunner {

  @Override
  public void configureMapReduceTask(String[] strings, Job job)
      throws IOException, URISyntaxException {
    // 负责配置 MapReduce 作业的输入、输出路径,以及 Mapper 的实现和分布式缓存
    String inputPath = strings[0]+ "/SC";
    FileInputFormat.addInputPath(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(strings[1]));

    // Set the Mapper implementation class
    job.setMapperClass(SemiJoinMapperImpl.class);
    // Set output key and value classes
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    // No reducer needed as all processing happens in the Mapper
    job.setNumReduceTasks(0);

    // Course表广播 供Mapper使用
    String broadcasturl = strings[0]+"/Course";
    // 为了防止Windows的URI错误:参考https://segmentfault.com/q/1010000042890453
    File f = new File(broadcasturl);
    URI u = f.toURI();
    job.addCacheFile(u);
  }
}

Revised PageRank

难易程度: ** 中

待完成

  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountMapperImpl,继承PageRankCountMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountReducerImpl,继承PageRankCountReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountRunnerImpl,继承PageRankCountRunner,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中根据需要添加新的类

题目描述:

  • 基于一个输入文本(网页链接关系、初始的网页排名)实现网页链接排名算法(阻尼系数以0.85计算)。
  • 本题中,网页总数N需要你根据输入文件通过一个MapReduce过程计算获得,并将网页总数N输出到文件中。
  • 本题对网页排名值的收敛条件做了简化,如果当某一网页当前排名值与上一轮迭代排名值之间差值的绝对值小于1e-6,那么认为该网页的排名值已经收敛。 迭代停止的条件为达到最大迭代次数或某次迭代中所有网页均收敛。
  • 输入格式:输入文本中的每一行为一项网页信息,其组织形式为(网页名 网页排名值(出站链接...))

如(A 1.0 B D)表示一个名称为A的网页,当前排名值为1.0,该网页链接至名称为B和D的网页(所有网页权重默认为1.0) A 1.0 B D B 1.0 C C 1.0 A B D 1.0 B C

  • 输出格式: 要求分两步完成。第一步输出网页总数N: 4 第二步输出网页的链接关系和最终的排名值: A 0.21436248817266176 B D B 0.3633209225962085 C C 0.40833002013844744 A B D 0.1302651623462253 B C

注意: 计算网页排名的阶段请将排名值解析为 double 类型变量进行计算。 输出结果的小数位数无需处理

PageRankCountMapperImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountMapper;
import DSPPCode.mapreduce.revised_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class PageRankCountMapperImpl extends PageRankCountMapper{
  private static final IntWritable ONE = new IntWritable(1);
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    // 将 URL 输出到 reducer,并统计数量
    // 每次读取一行输入,输出键值对 ("value", 1) 所有网页都被映射到相同的键 "value"
    context.write(new Text("value"), ONE);
  }
}

PageRankCountReducerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;

public class PageRankCountReducerImpl extends PageRankCountReducer{
  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    // 计数器,统计网页总数
    int count = 0;
    // 遍历 values,将每个值累加到计数器中
    for (IntWritable value : values) {
      count += value.get();
    }
    // 输出最终的计数结果
    context.write(new Text(String.valueOf(count)), NullWritable.get());
  }
}

PageRankCountRunnerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountRunner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.nio.file.Path;
import java.nio.file.Paths;

public class PageRankCountRunnerImpl extends PageRankCountRunner {

  @Override
  public void configureMapReduceTask(Job job,String []strings) throws FileNotFoundException {
    // 配置 MapReduce 作业,运行页面计数任务
    job.setMapperClass(PageRankCountMapperImpl.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setCombinerClass(PageRankCountCombinerImpl.class);

    job.setReducerClass(PageRankCountReducerImpl.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

  }

}

PageRankReducerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankReducer;
import DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner;
import DSPPCode.mapreduce.revised_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.lang.Math;

import static DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner.COUNTER_NAME;
import static DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner.GROUP_NAME;

public class PageRankReducerImpl extends PageRankReducer{
  private static final double D = 0.85;
  public static double stop_value = 1;
  @Override
  public void reduce(Text key, Iterable<ReducePageRankWritable> values, Context context)
      throws IOException, InterruptedException {
    String[] pageInfo = null;
    // 从配置项中读取网页的总数
    int totalPage = context.getConfiguration().getInt(PageRankRunner.TOTAL_PAGE, 0);
    // 从配置项中读取当前迭代步数
    int iteration = context.getConfiguration().getInt(PageRankRunner.ITERATION, 0);
    double sum = 0;
    for (ReducePageRankWritable value : values) {
      String tag = value.getTag();
      // 如果是贡献值则进行求和,否则以空格为分隔符切分后保存到pageInfo
      if (tag.equals(ReducePageRankWritable.PR_L)) {
        sum += Double.parseDouble(value.getData());
      } else if (tag.equals(ReducePageRankWritable.PAGE_INFO)) {
        pageInfo = value.getData().split(" ");
      }
    }
    double old_value = Double.parseDouble(pageInfo[1]);
    // 根据公式计算排名值
    double pageRank = (1 - D) / totalPage + D * sum;
    // 更新网页信息中的排名值
    pageInfo[1] = String.valueOf(pageRank);
    stop_value = Math.abs(old_value - pageRank);
    if (stop_value < PageRankRunner.DELTA) {
      context.getCounter(GROUP_NAME, COUNTER_NAME).increment(1);
    }
    // 最后一次迭代输出网页名及排名值,而其余迭代输出网页信息
    StringBuilder result = new StringBuilder();

    for (String data : pageInfo) {
      result.append(data).append(" ");
    }


    context.write(new Text(result.toString()), NullWritable.get());

  }

}

PageRankCountCombinerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountCombiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class PageRankCountCombinerImpl extends PageRankCountCombiner {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException {
        // 在 Mapper 输出后、Reducer 之前对中间结果进行局部合并,减少数据传输量。
    int sum = 0;
    for (IntWritable value : values) {
      sum += value.get();
    }
    context.write(key, new IntWritable((sum)));
  }
}

基于物品的协同过滤(ItemCF)

难易程度:*** 难

待完成:

  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 CoOccurrenceMapperImpl, 继承 CoOccurrenceMapper, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 CoOccurrenceReducerImpl, 继承 CoOccurrenceReducer, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 RecommendMapperImpl, 继承 RecommendMapper, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 RecommendReducerImpl, 继承 RecommendReducer, 实现抽象方法

题目描述:

  • 背景:协同过滤是推荐系统中的常用算法,可以分为基于物品的协同过滤(ItemCF)和基于用户的协同过滤(UserCF)两类,本题只涉及 ItemCF。ItemCF 的核心思想就是“物以类聚”,即为用户推荐与用户喜欢的物品相似的物品。物品相似度的计算取决于大量用户的行为数据。如用户 A 对物品 1 进行了点赞,而对物品 1 点赞的所有用户中有很大一部分也对物品 2 进行了点赞,那么我们就可以说物品 1 与 物品 2 具有较高的相似度,从而把物品 2 推荐给用户 A。下面给出 ItemCF 算法中的一些关键定义:

  • 用户评分矩阵:矩阵的行表示用户,列表示物品,元素值表示用户对物品的“评分”,这里的评分往往是由用户的隐式反馈累计而来,如点赞计 1 分,收藏计 2 分等。当评分大于零时,表示用户喜欢对应物品。如果用户未对某个物品产生行为,则值为 0。

  • 物品同现矩阵:矩阵的行和列均表示物品,元素值表示同时喜欢两个物品的用户数。
  • 用户推荐矩阵:矩阵的行表示用户,列表示物品,元素值表示推荐度,推荐度越高,表示用户喜欢该物品的可能性越大。给某个用户进行推荐时,返回矩阵对应行推荐度 topk 的物品(需要去除用户已经产生行为的物品)。

下图给出了用户推荐矩阵的计算方式:用户推荐矩阵 = 用户评分矩阵 * 物品同现矩阵

item_cf

  • 要求:给定用户评分矩阵,先计算物品同现矩阵,然后计算用户推荐矩阵。为了更加符合真实场景,所有矩阵均采用稀疏矩阵的存储形式

  • 输入格式

用户评分矩阵,每行三个数字,分别是行号(用户编号)、列号(物品编号)、评分,中间使用 Tab 分割。为了简化,这里的分数均为大于零的整数

1 1 1 1 2 1 2 1 1 2 3 1 3 2 1 3 3 2

  • 输出格式

物品同现矩阵,每行三个数字,分别是行号(物品编号)、列号(物品编号)、值,中间使用 Tab 分割。

1 1 2 1 2 1 1 3 1 2 1 1 2 2 2 2 3 1 3 1 1 3 2 1 3 3 2

用户推荐矩阵,每行三个数字,分别是行号(用户编号)、列号(物品编号)、推荐度,中间使用 Tab 分割。

1 1 3 1 2 3 1 3 2 2 1 3 2 2 2 2 3 3 3 1 3 3 2 4 3 3 5

CoOccurrenceMapperImpl

package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.CoOccurrenceMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class CoOccurrenceMapperImpl extends CoOccurrenceMapper {

  private Map<String, Set<String>> map = new HashMap<String, Set<String>>();

  @Override
  public void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
      throws IOException, InterruptedException {

    String[] values = value.toString().split("\t");
    String userId = values[0];
    String itemId = values[1];

    if (!map.containsKey(userId)) {
      // 检查用户是否已经存在于 map 中
      map.put(userId, new HashSet<String>()); //如果 map 中还没有当前用户的记录,则为该用户创建一个新的集合 HashSet,用于存储该用户喜欢的物品。
      map.get(userId).add(itemId);//将当前物品 itemId 添加到该用户的集合中。
    } else {
      //遍历用户评分矩阵
      for (String itemId_intra : map.get(userId)){
        context.write(new Text(String.join("\t", new String[]{itemId_intra, itemId})), new Text(String.valueOf(1)));//表示物品 itemId_intra 和 itemId 的同现。
        context.write(new Text(String.join("\t", new String[]{itemId, itemId_intra})), new Text(String.valueOf(1)));//表示物品 itemId 和 itemId_intra 的同现(对称关系)。
      }
      map.get(userId).add(itemId);
    }

    context.write(new Text(String.join("\t", new String[]{itemId, itemId})), new Text(String.valueOf(1)));//输出键值对格式:(物品i,物品j,1)

  }
}

CoOccurrenceReducerImpl

package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.CoOccurrenceReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class CoOccurrenceReducerImpl extends CoOccurrenceReducer {

  @Override
  //汇总 Mapper 的输出,计算物品同现矩阵的元素值。
  public void reduce(Text key, Iterable<Text> values,
      Reducer<Text, Text, Text, LongWritable>.Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (Text value : values) {
      //对于每对物品,统计出现的次数
      sum += Integer.parseInt(value.toString());
    }

    context.write(key, new LongWritable(sum));
  }
}

RecommendMapperImpl

package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.RecommendMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;

import static DSPPCode.mapreduce.item_cf.question.utils.Constants.L;
import static DSPPCode.mapreduce.item_cf.question.utils.Constants.R;

public class RecommendMapperImpl extends RecommendMapper {

  @Override
  //将用户评分矩阵和物品同现矩阵进行分发,为计算推荐矩阵做准备。
  protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
      throws IOException, InterruptedException {
    String[] info = value.toString().split("\t");
    if(fileName.equals(userScoreMatrixFileName)){
      //如果是用户评分矩阵,输出用户对物品的评分。
      context.write(new Text(info[0]), new Text(String.join("\t", new String[]{info[1], info[2], L})));
    } else if(fileName.equals(coOccurrenceMatrixFileName)){
      //如果是物品同现矩阵,输出物品之间的同现值,并将物品同现值广播到所有用户。
      for (int i = 0; i < users; i++) {
        context.write(new Text(String.valueOf(i+1)),
            new Text(String.join("\t", new String[]{info[0], info[1], info[2], R})));
      }
    }
  }
}

RecommendReducerImpl

package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.RecommendReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;
import java.util.ArrayList;

import static DSPPCode.mapreduce.item_cf.question.utils.Constants.L;
import static DSPPCode.mapreduce.item_cf.question.utils.Constants.R;

public class RecommendReducerImpl extends RecommendReducer {

//计算推荐矩阵的推荐度。
  @Override
  public void reduce(Text key, Iterable<Text> values,
      Reducer<Text, Text, Text, LongWritable>.Context context)
      throws IOException, InterruptedException {
    ArrayList<String[]> userLList = new ArrayList<String[]>();
    ArrayList<String[]> coCoRList = new ArrayList<String[]>();
    int[] sum = new int[items + 1]; // 0 initialization, for edge cases
    for (Text value : values) {
      String[] info = value.toString().split("\t");
      if (info[info.length-1].equals(L)) {
        userLList.add(info);
      } else if (info[info.length-1].equals(R)) {
        coCoRList.add(info);
      }
    }
    for (String[] user_s : userLList) {
      //遍历用户评分矩阵和物品同现矩阵,计算推荐度。
      for (String[] coCo_s : coCoRList) {
        if (user_s[0].equals(coCo_s[0])) {
          sum[Integer.parseInt(coCo_s[1])] +=
              Integer.parseInt(user_s[user_s.length-2]) * Integer.parseInt(coCo_s[coCo_s.length-2]);
        }
      }
    }
    for (int i = 1; i <= items; i++) {
      if (sum[i] > 0) {
        context.write(new Text(String.join("\t", key.toString(), String.valueOf(i))),
            new LongWritable(sum[i]));
      }
    }
  }
}