跳转至

2022-2023-2-编程作业1

2020级,MapReduce

Word Count

待完成:

  • 请在DSPPCode.mapreduce.warm_up.impl中创建TokenizerMapperImpl, 继承TokenizerMapper, 实现抽象方法.

  • 请在DSPPCode.mapreduce.warm_up.impl中创建IntSumReducerImpl, 继承IntSumReducer, 实现抽象方法.

题目描述:

  • 现存在一份英文文本文件,要求统计文本中每个单次出现的次数,并将计数结果输出到文本中。其中,输出文本的每行由单词和频数组成,单次和频数之间用制表符分隔。

Deduplication

难易程度:* 易

待完成

  • 请在DSPPCode.mapreduce.deduplication中创建DeduplicationMapperImpl, 继承DeduplicationMapper, 实现抽象方法。
  • 请在DSPPCode.mapreduce.deduplication中创建DeduplicationReducerImpl, 继承DeduplicationReducer, 实现抽象方法。

题目描述

  • 学校图书馆的自助借书系统因为网络原因崩溃了,无法同步更新每天借书的学生信息,两位图书管理员只能各自记录了一份借阅图书的学生名单。现在,他们希望你能够帮助他们合并两份名单,整理为一份完整且没有重复的借阅图书的学生名单。 请你通过MapReduce程序来实现以上功能。

  • 输入格式:输入一共有两个文件,文本中的第一列都为日期,第二列为学生姓名,列与列之间用空格分隔。
    file1: 2019-3-1 alice 2019-3-2 bob 2019-3-3 carter 2019-3-4 david 2019-3-5 alice 2019-3-6 bob 2019-3-7 carter file2: 2019-3-1 bob 2019-3-2 alice 2019-3-3 bob 2019-3-4 david 2019-3-5 alice 2019-3-6 carter 2019-3-7 david 2019-3-3 carter

  • 输出格式:与输入文件的格式相同 ``` 2019-3-1 alice 2019-3-1 bob 2019-3-2 alice 2019-3-2 bob 2019-3-3 bob 2019-3-3 carter 2019-3-4 david 2019-3-5 alice 2019-3-6 bob 2019-3-6 carter 2019-3-7 carter 2019-3-7 david

```

DeduplicationMapperImpl

package DSPPCode.mapreduce.deduplication.impl;

import DSPPCode.mapreduce.deduplication.question.DeduplicationMapper;
import DSPPCode.mapreduce.deduplication.question.Borrows;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class DeduplicationMapperImpl extends DeduplicationMapper{
  @Override
  public void map(Object key, Text value, Context context)
      throws IOException, InterruptedException {
    String[] lines = value.toString().split("\n");
    for (String line : lines) {
      context.write(new Text(line), NullWritable.get());
    }

  }

}

DeduplicationReducerImpl

package DSPPCode.mapreduce.deduplication.impl;

import DSPPCode.mapreduce.deduplication.question.DeduplicationReducer;
import DSPPCode.mapreduce.deduplication.question.Borrows;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class DeduplicationReducerImpl extends DeduplicationReducer{
  @Override
  public void reduce(Text key, Iterable<NullWritable> values, Context context)
      throws IOException, InterruptedException {
    HashSet<String> borrowSet = new HashSet<String>();
    for (NullWritable value : values) {
      borrowSet.add(key.toString());
    }
    for (String borrow : borrowSet) {
      context.write(new Text(borrow), NullWritable.get());
    }

  }

}

Semi Join

难易程度: **中

待完成

  • 请在DSPPCode.mapreduce.semi_join.impl中创建SemiJoinMapperImpl,继承SemiJoinMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.semi_join.impl中创建SemiJoinRunnerImpl,继承SemiJoinRunner,实现抽象方法

问题描述

学院为了衡量精品课程的建设情况,决定使用如下SQL语句 select S#,C#,Score from SC where C# in (select C# from Course); 统计同学们的精品课程的成绩情况。然而由于学生选课记录太多了,学院的单机数据库并不能支撑这么大的计算量,现在只能请求你使用MapReduce实现上述SQL语句。

输入格式:

输入一共两个文件Course和SC,分别表示精品课程信息表Course,学生选课信息表SC。

  • Course

输入格式:精品课程信息表,第一行为表字段名:C#,CName,分别表示课程号和课程名,使用制表符隔开。剩余每一行表示一条课程记录。如:C1 Principles of Computer Construction ,表示课程号为C1的课程名称为Principles of Computer Construction。

C# CName C1 Principles of Computer Construction C2 Computer Network

  • SC

输入格式:C学生选课信息表,第一行为表字段名:S#,C#,Score,分别表示学生学号,课程号,成绩,使用制表符隔开。剩余每一行表示一个选课记录,各个字段使用制表符隔开,如:S1 C1 99 表示学号为S1的学生选修了C1课程,该课程成绩为99

S# C# Score S1 C1 43 S1 C8 46 S1 C6 51 S2 C2 66 S2 C4 53 S2 C6 58 S3 C3 97

输出格式:请输出满足SQL语句的SC记录,按照S#,C#,Score形式输出,其中第一行为字段名,剩余行为结果,每列之间使用制表符分割

S#  C#  Score
S1  C1  43
S2  C2  66

SemiJoinMapperImpl

package DSPPCode.mapreduce.semi_join.question;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

abstract public class SemiJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

  /**
   * TODO 请完成该抽象方法
   * -
   * 输入:
   * 1.SC学生选课信息表,第一行为表字段名:S#,C#,Score(学生学号,课程ID,成绩)使用制表符隔开,剩余每一行表示一个选课记录,各个字段使用制表符隔开
   * 如:S1 C1  99 表示学号为S1的学生选修了C1课程,该课程成绩为99
   * 2.Course精品课程表,第一行为表字段名:C#,CName(课程ID,课程名)使用制表符隔开,剩余每一行表示一个课程记录,各个字段使用制表符隔开
   * 如:C1 Data Mining 表示课程ID为C1的课程名字为Data Mining
   */
  abstract public void map(LongWritable key,Text value,Context context)
      throws IOException, InterruptedException;
}

SemiJoinRunnerImpl

package DSPPCode.mapreduce.semi_join.question;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
import java.net.URISyntaxException;


public abstract class SemiJoinRunner extends Configured implements Tool {

  @Override
  public int run(String[] strings) throws Exception
  {
    Job job=Job.getInstance(getConf(),getClass().getSimpleName());
    job.setJarByClass(getClass());
    configureMapReduceTask(strings,job);
    return job.waitForCompletion(true)?0:1;
  }

  /**
   * TODO 请完成该抽象方法
   * -
   * 输入:
   * 1.命令行输入,输入为长度为2的字符串数组,分别表示输入路径和输出路径,
   * 例如[/tmp/mapreduce/semi_join/,/tmp/mapreduce/output/],
   * 表明:
   * 输入路径为/tmp/mapreduce/semi_join/,在该目录下有两个文本文件SC和Course分别为学生选课信息表SC和精品课程表Course
   * 输出路径为/tmp/mapreduce/output/
   * 2.Job类,用于配置MapReduce相关具体配置
   * 功能:
   * 配置输入输出路径和Map和Reduce的相关运行信息。
   */
 abstract public void configureMapReduceTask(String []strings, Job job)
     throws IOException, URISyntaxException;
}

Revised PageRank

难易程度: ** 中

待完成

  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountMapperImpl,继承PageRankCountMapper,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountReducerImpl,继承PageRankCountReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankCountRunnerImpl,继承PageRankCountRunner,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法
  • 请在DSPPCode.mapreduce.revised_pagerank.impl中根据需要添加新的类

题目描述:

  • 基于一个输入文本(网页链接关系、初始的网页排名)实现网页链接排名算法(阻尼系数以0.85计算)。
  • 本题中,网页总数N需要你根据输入文件通过一个MapReduce过程计算获得,并将网页总数N输出到文件中。
  • 本题对网页排名值的收敛条件做了简化,如果当某一网页当前排名值与上一轮迭代排名值之间差值的绝对值小于1e-6,那么认为该网页的排名值已经收敛。 迭代停止的条件为达到最大迭代次数或某次迭代中所有网页均收敛。
  • 输入格式:输入文本中的每一行为一项网页信息,其组织形式为(网页名 网页排名值(出站链接...))

如(A 1.0 B D)表示一个名称为A的网页,当前排名值为1.0,该网页链接至名称为B和D的网页(所有网页权重默认为1.0) A 1.0 B D B 1.0 C C 1.0 A B D 1.0 B C

  • 输出格式: 要求分两步完成。第一步输出网页总数N: 4 第二步输出网页的链接关系和最终的排名值: A 0.21436248817266176 B D B 0.3633209225962085 C C 0.40833002013844744 A B D 0.1302651623462253 B C

注意: 计算网页排名的阶段请将排名值解析为 double 类型变量进行计算。 输出结果的小数位数无需处理

PageRankCountMapperImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountMapper;
import DSPPCode.mapreduce.revised_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Iterator;

public class PageRankCountMapperImpl extends PageRankCountMapper{
  private static final IntWritable ONE = new IntWritable(1);
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    // 将每一行文本按空格切分,并取出第一个元素,即网页的 URL
    // String[] parts = value.toString().split("\\s+");
    // String url = parts[0];

    // 将 URL 输出到 reducer,并统计数量
    context.write(new Text("value"), ONE);
    // String[] pages = value.toString().split("\n");
    // for (String page : pages) {
    //   context.write(new Text(page), ONE);
    // }

    // context.write(value, ONE);

  }

}

PageRankCountReducerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;

public class PageRankCountReducerImpl extends PageRankCountReducer{
  // static int sum = 0;
  // private Timer timer = new Timer();
  // private final int expiredTime = 10000;
  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    // 定义一个计数器变量
    int count = 0;

    // 遍历 values,将每个值累加到计数器中
    for (IntWritable value : values) {
      count += value.get();
    }

    // 输出最终的计数结果
    context.write(new Text(String.valueOf(count)), NullWritable.get());

    // context.write(new Text(String.valueOf(sum)), NullWritable.get());
  }

  // public void start(Iterable<IntWritable> values, Context context) {
  //   timer.schedule(new TimerTask() {
  //     @Override
  //     public void run() {
  //       timer.cancel();
  //     }
  //   }, expiredTime);
  //
  //   // 不断接收 value
  //
  //   for(IntWritable value : values) {
  //     sum++;
  //     timer.cancel();
  //     timer = new Timer();
  //     timer.schedule(new TimerTask() {
  //       @Override
  //       public void run() {
  //         try {
  //           context.write(new Text(String.valueOf(sum)), NullWritable.get());
  //         } catch (IOException e) {
  //           e.printStackTrace();
  //         } catch (InterruptedException e) {
  //           e.printStackTrace();
  //         }
  //         timer.cancel();
  //       }
  //     }, expiredTime);
  //   }
  //
  // }


}

PageRankCountRunnerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountRunner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.nio.file.Path;
import java.nio.file.Paths;

public class PageRankCountRunnerImpl extends PageRankCountRunner {

  @Override
  public void configureMapReduceTask(Job job,String []strings) throws FileNotFoundException {



    job.setMapperClass(PageRankCountMapperImpl.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setCombinerClass(PageRankCountCombinerImpl.class);

    job.setReducerClass(PageRankCountReducerImpl.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);



  }



}

PageRankReducerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankReducer;
import DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner;
import DSPPCode.mapreduce.revised_pagerank.question.utils.ReducePageRankWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.lang.Math;

import static DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner.COUNTER_NAME;
import static DSPPCode.mapreduce.revised_pagerank.question.PageRankRunner.GROUP_NAME;

public class PageRankReducerImpl extends PageRankReducer{
  private static final double D = 0.85;
  public static double stop_value = 1;
  @Override
  public void reduce(Text key, Iterable<ReducePageRankWritable> values, Context context)
      throws IOException, InterruptedException {
    String[] pageInfo = null;
    // 从配置项中读取网页的总数
    int totalPage = context.getConfiguration().getInt(PageRankRunner.TOTAL_PAGE, 0);
    // 从配置项中读取当前迭代步数
    int iteration = context.getConfiguration().getInt(PageRankRunner.ITERATION, 0);
    double sum = 0;
    for (ReducePageRankWritable value : values) {
      String tag = value.getTag();
      // 如果是贡献值则进行求和,否则以空格为分隔符切分后保存到pageInfo
      if (tag.equals(ReducePageRankWritable.PR_L)) {
        sum += Double.parseDouble(value.getData());
      } else if (tag.equals(ReducePageRankWritable.PAGE_INFO)) {
        pageInfo = value.getData().split(" ");
      }
    }
    double old_value = Double.parseDouble(pageInfo[1]);
    // 根据公式计算排名值
    double pageRank = (1 - D) / totalPage + D * sum;
    // 更新网页信息中的排名值

    pageInfo[1] = String.valueOf(pageRank);
    stop_value = Math.abs(old_value - pageRank);
    if (stop_value < PageRankRunner.DELTA) {
      context.getCounter(GROUP_NAME, COUNTER_NAME).increment(1);
    }
    // 最后一次迭代输出网页名及排名值,而其余迭代输出网页信息
    StringBuilder result = new StringBuilder();

    for (String data : pageInfo) {
      result.append(data).append(" ");
    }


    context.write(new Text(result.toString()), NullWritable.get());

  }

}

PageRankCountCombinerImpl

package DSPPCode.mapreduce.revised_pagerank.impl;

import DSPPCode.mapreduce.revised_pagerank.question.PageRankCountCombiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import java.io.IOException;

public class PageRankCountCombinerImpl extends PageRankCountCombiner {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
      sum += value.get();
    }
    context.write(key, new IntWritable((sum)));
  }
}

基于物品的协同过滤(ItemCF)

难易程度:*** 难

待完成:

  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 CoOccurrenceMapperImpl, 继承 CoOccurrenceMapper, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 CoOccurrenceReducerImpl, 继承 CoOccurrenceReducer, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 RecommendMapperImpl, 继承 RecommendMapper, 实现抽象方法
  • 请在 DSPPCode.mapreduce.item_cf.impl 中创建 RecommendReducerImpl, 继承 RecommendReducer, 实现抽象方法

题目描述:

  • 背景:协同过滤是推荐系统中的常用算法,可以分为基于物品的协同过滤(ItemCF)和基于用户的协同过滤(UserCF)两类,本题只涉及 ItemCF。ItemCF 的核心思想就是“物以类聚”,即为用户推荐与用户喜欢的物品相似的物品。物品相似度的计算取决于大量用户的行为数据。如用户 A 对物品 1 进行了点赞,而对物品 1 点赞的所有用户中有很大一部分也对物品 2 进行了点赞,那么我们就可以说物品 1 与 物品 2 具有较高的相似度,从而把物品 2 推荐给用户 A。下面给出 ItemCF 算法中的一些关键定义:

  • 用户评分矩阵:矩阵的行表示用户,列表示物品,元素值表示用户对物品的“评分”,这里的评分往往是由用户的隐式反馈累计而来,如点赞计 1 分,收藏计 2 分等。当评分大于零时,表示用户喜欢对应物品。如果用户未对某个物品产生行为,则值为 0。

  • 物品同现矩阵:矩阵的行和列均表示物品,元素值表示同时喜欢两个物品的用户数。
  • 用户推荐矩阵:矩阵的行表示用户,列表示物品,元素值表示推荐度,推荐度越高,表示用户喜欢该物品的可能性越大。给某个用户进行推荐时,返回矩阵对应行推荐度 topk 的物品(需要去除用户已经产生行为的物品)。

下图给出了用户推荐矩阵的计算方式:用户推荐矩阵 = 用户评分矩阵 * 物品同现矩阵

item_cf

  • 要求:给定用户评分矩阵,先计算物品同现矩阵,然后计算用户推荐矩阵。为了更加符合真实场景,所有矩阵均采用稀疏矩阵的存储形式

  • 输入格式

用户评分矩阵,每行三个数字,分别是行号(用户编号)、列号(物品编号)、评分,中间使用 Tab 分割。为了简化,这里的分数均为大于零的整数

1 1 1 1 2 1 2 1 1 2 3 1 3 2 1 3 3 2

  • 输出格式

物品同现矩阵,每行三个数字,分别是行号(物品编号)、列号(物品编号)、值,中间使用 Tab 分割。

1 1 2 1 2 1 1 3 1 2 1 1 2 2 2 2 3 1 3 1 1 3 2 1 3 3 2

用户推荐矩阵,每行三个数字,分别是行号(用户编号)、列号(物品编号)、推荐度,中间使用 Tab 分割。

1 1 3 1 2 3 1 3 2 2 1 3 2 2 2 2 3 3 3 1 3 3 2 4 3 3 5

## CoOccurrenceMapperImpl

``` package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.CoOccurrenceMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set;

public class CoOccurrenceMapperImpl extends CoOccurrenceMapper {

private Map<String, Set<String>> map = new HashMap<String, Set<String>>();

@Override
public void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
    throws IOException, InterruptedException {

  String[] values = value.toString().split("\t");
  String userId = values[0];
  String itemId = values[1];

  if (!map.containsKey(userId)) {
    map.put(userId, new HashSet<String>());
    map.get(userId).add(itemId);
  } else {
    for (String itemId_intra : map.get(userId)){
      context.write(new Text(String.join("\t", new String[]{itemId_intra, itemId})), new Text(String.valueOf(1)));
      context.write(new Text(String.join("\t", new String[]{itemId, itemId_intra})), new Text(String.valueOf(1)));
    }
    map.get(userId).add(itemId);
  }

  context.write(new Text(String.join("\t", new String[]{itemId, itemId})), new Text(String.valueOf(1)));

}

}

```

## CoOccurrenceReducerImpl

``` package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.CoOccurrenceReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import java.io.IOException;

public class CoOccurrenceReducerImpl extends CoOccurrenceReducer {

@Override
public void reduce(Text key, Iterable<Text> values,
    Reducer<Text, Text, Text, LongWritable>.Context context)
    throws IOException, InterruptedException {
  int sum = 0;
  for (Text value : values) {
    sum += Integer.parseInt(value.toString());
  }
  // System.out.println(key.toString() + ": " + sum);

  context.write(key, new LongWritable(sum));
}

}

```

## RecommendMapperImpl

``` package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.RecommendMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import java.io.IOException;

import static DSPPCode.mapreduce.item_cf.question.utils.Constants.L; import static DSPPCode.mapreduce.item_cf.question.utils.Constants.R;

public class RecommendMapperImpl extends RecommendMapper {

@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
    throws IOException, InterruptedException {
  String[] info = value.toString().split("\t");
  // user1(row) * item1(col)
  if(fileName.equals(userScoreMatrixFileName)){
    context.write(new Text(info[0]), new Text(String.join("\t", new String[]{info[1], info[2], L})));

  } else if(fileName.equals(coOccurrenceMatrixFileName)){
    for (int i = 0; i < users; i++) {
      context.write(new Text(String.valueOf(i+1)),
          new Text(String.join("\t", new String[]{info[0], info[1], info[2], R})));
    }
  }
}

}

```

## RecommendReducerImpl

``` package DSPPCode.mapreduce.item_cf.impl;

import DSPPCode.mapreduce.item_cf.question.RecommendReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import java.io.IOException; import java.util.ArrayList;

import static DSPPCode.mapreduce.item_cf.question.utils.Constants.L; import static DSPPCode.mapreduce.item_cf.question.utils.Constants.R;

public class RecommendReducerImpl extends RecommendReducer {

@Override
public void reduce(Text key, Iterable<Text> values,
    Reducer<Text, Text, Text, LongWritable>.Context context)
    throws IOException, InterruptedException {
  ArrayList<String[]> userLList = new ArrayList<String[]>();
  ArrayList<String[]> coCoRList = new ArrayList<String[]>();
  int[] sum = new int[items + 1]; // 0 initialization, for edge cases
  // String[] info = values.toString().split("\t");
  // if (info[-1].equals(L)){
  //
  // }
  for (Text value : values) {
    String[] info = value.toString().split("\t");
    if (info[info.length-1].equals(L)) {
      userLList.add(info);
    } else if (info[info.length-1].equals(R)) {
      coCoRList.add(info);
    }
  }
  for (String[] user_s : userLList) {
    for (String[] coCo_s : coCoRList) {
      if (user_s[0].equals(coCo_s[0])) {
        sum[Integer.parseInt(coCo_s[1])] +=
            Integer.parseInt(user_s[user_s.length-2]) * Integer.parseInt(coCo_s[coCo_s.length-2]);
      }
    }
  }
  for (int i = 1; i <= items; i++) {
    if (sum[i] > 0) {
      context.write(new Text(String.join("\t", key.toString(), String.valueOf(i))),
          new LongWritable(sum[i]));
    }
  }
}

}

```