跳转至

2020-2021-2-编程作业2

2018级,Spark

Friendship

难易程度: ** 中

待完成

  • 请在 DSPPCode.spark.friendship.impl 中创建 FriendshipImpl, 继承 Friendship, 实现抽象方法

题目描述:

  • 根据输入文本中的用户关系,找出任意两个用户之间的共同好友。

  • 输入格式:输入文本中每行表示一个用户关系,一个用户关系由多个列组成。其中,第一列为用户名,其余列为该用户的好友的用户名,用户名之间用空格分隔。例如,下面第一行表示用户A有B、C、D、E、F五位好友。

A B C D E F B A C D E C A B E D A B E E A B C D F A

  • 输出格式: 输出文本的每行表示两个用户的用户名及其所有共同好友的用户名,其中两个用户的用户名按升序字典序进行拼接,所有共同好友的用户名之间以逗号和空格分隔。 例如, 下面第一行表示用户A和C的两个用户的所有共同好友为B、E。

(AC,[B, E]) (BC,[A, E]) (CE,[A, B]) (AB,[C, D, E]) (AE,[B, C, D]) (CD,[A, B, E]) (DF,[A]) (CF,[A]) (AD,[B, E]) (DE,[A, B]) (BE,[A, C, D]) (BF,[A]) (EF,[A]) (BD,[A, E])

FriendshipImpl

package DSPPCode.spark.friendship.impl;

import DSPPCode.spark.friendship.question.Friendship;
import com.clearspring.analytics.util.Lists;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class FriendshipImpl extends Friendship {

  @Override
  public JavaPairRDD<String, Iterable<String>> findFriendShip(JavaRDD<String> lines) {
    // JavaPairRDD<String, Iterable<String>> input = lines.mapToPair(
    //     new PairFunction<String, String, Iterable<String>>() {
    //       @Override
    //       public Tuple2<String, Iterable<String>> call(String s) throws Exception {
    //         String[] strs = s.split(" ");
    //         String s0 = strs[0];
    //         List<String> list1=Arrays.asList(strs);
    //         List<String> arrList = new ArrayList<String>(list1);
    //         arrList.remove(s0);
    //         Iterable<String> itr = arrList;
    //         return new Tuple2<String, Iterable<String>>(s0, itr);
    //       }
    //     }
    // );
    JavaPairRDD<String, String> input = lines.flatMapToPair(
        new PairFlatMapFunction<String, String, String>() {
          @Override
          public Iterator<Tuple2<String, String>> call(String s) throws Exception {
            String[] strs = s.split(" ");
            List<Tuple2<String, String>> list = new ArrayList<>();
            int l = strs.length;
            for(int i=1;i<l;i++)
              list.add(new Tuple2<String, String>(strs[i], strs[0]));
            return list.iterator();
          }
        }
    ).groupByKey().flatMapToPair(
        new PairFlatMapFunction<Tuple2<String, Iterable<String>>, String, String>() {
          @Override
          public Iterator<Tuple2<String, String>> call(
              Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
            ArrayList<String> strl = (ArrayList<String>) Lists.newArrayList(stringIterableTuple2._2);
            String[] strs = strl.toArray(new String[0]);
            List<String> jo = orderedPairs(strs);
            List<Tuple2<String, String>> list = new ArrayList<>();
            for(int i=0;i<jo.size();i++) {
              System.err.print(jo.get(i));
              System.err.print(" ");
              list.add(new Tuple2<String, String>(jo.get(i), stringIterableTuple2._1()));
            }
            return list.iterator();
          }
        }
    );
    JavaPairRDD<String, Iterable<String>> ans = input.groupByKey();
    // JavaPairRDD<String, String> joinput = input.cogroup(input).flatMapToPair(
    //     new PairFlatMapFunction<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>, String, String>() {
    //       @Override
    //       public Iterator<Tuple2<String, String>> call(
    //           Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> stringTuple2Tuple2)
    //           throws Exception {
    //
    //         return null;
    //       }
    //     }
    // )
    // //     .mapToPair(
    // //     (tup) -> new Tuple2<String, String>(tup._1, tup._2()._1() + tup._2()._1())
    // // );
    // input.foreach(
    //     new VoidFunction<Tuple2<String, String>>() {
    //       @Override
    //       public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
    //         System.err.print(stringStringTuple2._1());
    //         System.err.print(" ");
    //         System.err.println(stringStringTuple2._2);
    //       }
    //     }
    // );
    // joinput.foreach(
    //     new VoidFunction<Tuple2<String, String>>() {
    //       @Override
    //       public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
    //         System.err.print(stringStringTuple2._1());
    //         System.err.print(" ");
    //         System.err.println(stringStringTuple2._2);
    //       }
    //     }
    // );
    return ans;
  }
  @Override
  public List<String> orderedPairs(String[] strs) {
    Arrays.sort(strs);
    List<String> ans = new ArrayList<>();
    int l = strs.length;
    for(int i=0;i<l;i++){
      for(int j=i+1;j<l;j++)
        ans.add(strs[i] + strs[j]);
    }
    return ans;
  }
}

K Means

待完成:

  • 请在DSPPCode.spark.broadcast_k_means.imlp中创建BroadcastKMeansImpl,继承BroadcastKMeans,实现抽象方法

题目描述:

  • 使用广播机制实现KMeans算法,求聚类中心点。

  • 输入:

输入文件有两个,allPoints包含了所有点的坐标,initPoints包含了K个初始聚类中心点的坐标。 输入文件每行按逗号分割成两个整数,表示一个二维点。

4,400 96,826 606,776 474,866 400,768 2,920 356,766

  • 输出:

输出所有点的聚类结果,格式为:坐标1,坐标2 所属类别(类别编号从0开始计)。

4,400 1 96,826 0 606,776 2 474,866 2 400,768 2 2,920 0 356,766 2

BroadcastKMeansImpl

package DSPPCode.spark.broadcast_k_means.impl;

import DSPPCode.spark.broadcast_k_means.question.BroadcastKMeans;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import java.util.List;

public class BroadcastKMeansImpl extends BroadcastKMeans {

  @Override
  public Integer closestPoint(List<Integer> p, Broadcast<List<List<Double>>> kPoints) {
    List<List<Double>> kPointsValue = kPoints.value();
    Double mindis = Double.MAX_VALUE;
    Integer lj = 0;
    int lk = kPointsValue.size();
    for (int j = 0; j < lk; j++){
      List<Double> kPoint = kPointsValue.get(j);
      int l = kPoint.size();
      Double dis = 0.0;
      for(int i = 0; i < l; i++){
        dis += Math.pow(kPoint.get(i)-p.get(i),2);
      }
      if(dis < mindis){
        mindis = dis;
        lj = j;
      }
    }
    return lj;
  }

  @Override
  public Broadcast<List<List<Double>>> createBroadcastVariable(JavaSparkContext sc,
      List<List<Double>> localVariable) {
    Broadcast<List<List<Double>>> broadcast = sc.broadcast(localVariable);
    return broadcast;
  }
}

Moving Averages

待完成:

  • 请在 DSPPCode.spark.moving_averages.impl 中创建 MovingAveragesImpl, 继承 movingAverages, 实现抽象方法

题目描述:

  • 移动平均(moving averages)通过对时间序列进行转换来估计时间序列的趋势。移动平均有多种实现方式,其中最常见的一种是平滑移动平均(moving average smoothing)。平滑移动平均在计算某一时间序列中 t 时间的值时,将 t 时间点本身以及前后各 k 个时间点的值的平均值当作 t 时间点的值,假设该时间序列中最大的时间点为 n,则平滑移动平均的计算公式如下图所示。我们使用多个键值对 [时间, 值]来表示时间序列,当时间序列为 [1, 1000] [2, 2000] [3, 3000]且 k = 1 时,经过平滑移动平均转换后的时间序列为[1, 1500] [2, 2000] [3, 2500]。本题要求在给定时间序列的情况下,计算该时间序列在 k = 2 时经过平滑移动平均转换后得到的时间序列。

  • 输入格式: 时间序列键值对 [时间 t , 值 v ],假设该时间序列中最大的时间点为 n ,保证所给的时间序列涵盖 1~n 区间范围内的所有时间点且不重复。 数据范围:3 <= n <100,1 <= v <1000。

[1,1] [2,2] [3,3]

  • 输出格式: 时间序列键值对 [时间 t , 值 v ],平均值向下取整。

[1,2] [2,2] [3,2]

MovingAveragesImpl

package DSPPCode.spark.moving_averages.impl;

import DSPPCode.spark.moving_averages.question.MovingAverages;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.codehaus.janino.Java;
import scala.Tuple2;
import shapeless.Tuple;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class MovingAveragesImpl extends MovingAverages{
  @Override
  public JavaRDD<String> movingAverages(JavaRDD<String> lines) {
    long n = lines.count();
    JavaPairRDD<Integer,Integer> posAndValue =
        lines.flatMapToPair(
            (PairFlatMapFunction<String,Integer,Integer>) line -> {
              ArrayList<Tuple2<Integer,Integer>> posValue = new ArrayList<>();
              String[] posAndValue1 =line.substring(1,line.length()-1).split(",");
              int pos = Integer.parseInt(posAndValue1[0]);
              int value = Integer.parseInt(posAndValue1[1]);
              posValue.add(new Tuple2<>(pos,value));
              for (int k=1;k<=2;k++){
                int left = pos-k;
                int right = pos+k;
                if (left>=1){
                  posValue.add(new Tuple2<>(left,value));
                }
                if (right<=n){ // 切勿使用right<=lines.count()
                  posValue.add(new Tuple2<>(right,value));
                }
              }
              return posValue.iterator();
            });

    JavaPairRDD<Integer,Iterable<Integer>> posIntermediateResult = posAndValue.groupByKey();

    JavaRDD<String> timeSeries =
        posIntermediateResult.map((Function<Tuple2<Integer, Iterable<Integer>>, String>) posTuple->{
          int sum = 0;
          int num = 0;
          for (Integer value:posTuple._2){
            sum+=value;
            num++;
          }
          int avg = sum/num;
          return "["+posTuple._1+","+avg+"]";
        });

    return timeSeries;
  }
}

PageRank

待完成

  • 请在 DSPPCode.spark.pagerank_top3 中创建 PageRankTop3Impl, 继承 PageRankTop3, 实现抽象方法

题目描述

  • 请根据如下 PageRank 公式,对给定的图数据计算节点权重,并输出排名值为前3的网页排名结果。其中,PR(B) 为 网页 B 的排名值,L(B) 为网页 B 的出链数,q 为阻尼系数( damping factor),这里我们指定 q = 0.85
PR(A) = (PR(B)/L(B) + PR(C)/L(C) + PR(D)/L(D) + ...)*q + (1 - q)/N

每行表示一个网页的信息。每行按空格分成若干个字段,第一个字段表示网页名称,第二个表示该网页初始rank值 后面每两个字段表示一条出站链接及其权重。例如,第一行A 1.0 B 1.0 C 1.0 表示网页A的初始rank值为1.0,网页A有两条出站链接, 分别指向网页B和网页C,权重都是1.0。

```

A 1.0 B 1.0 C 1.0 B 1.0 C 1.0 C 1.0 D 1.0 D 1.0 A 1.0 C 1.0 E 1.0 E 1.0 B 1.0 C 1.0 ```

  • 输出格式:

只输出前三名节点的 (网页名称, 该网页 rank 值) 键值对

```

(C,0.5850899486896407) (D,0.5739767782048728) (B,0.215620367429474) ```

PageRankTop3Impl

package DSPPCode.spark.pagerank_top3.impl;

import DSPPCode.spark.pagerank_top3.question.PageRankTop3;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class PageRankTop3Impl extends PageRankTop3{

  /**
   * TODO 请完成该方法
   * <p>
   * 请在此方法中计算出按rank值排名前三名的(id,rank)键值对
   *
   * @param text       包含了输入文本文件数据的RDD
   * @param iterateNum 迭代轮数
   * @return 前三名节点的  (网页名称, 该网页 rank 值)  键值对
   */
  public JavaPairRDD<String, Double> getTop3(JavaRDD<String> text, int iterateNum){
    /* 步骤2:按应用逻辑使用操作算子编写DAG,其中包括RDD的创建、转换和行动等 */
    double factor = 0.85; // 指定系数

    // 将文本数据转换成[网页, {链接列表}]键值对
    JavaPairRDD<String, List<String>> links =
        text.mapToPair(
            new PairFunction<String, String, List<String>>() {
              @Override
              public Tuple2<String, List<String>> call(String line) throws Exception {
                String[] tokens = line.split(" ");
                List<String> list = new ArrayList<>();
                for (int i = 2; i < tokens.length; i+=2) {
                  list.add(tokens[i]);
                }
                return new Tuple2<>(tokens[0], list);
              }
            })
            .cache(); // 持久化到内存

    long N = text.count(); // 获取网页总数N // 該步使用算法要求行數嚴格等於節點數

    // 初始化每个页面的排名值,得到[网页, 排名值]键值对
    JavaPairRDD<String, Double> ranks =
        text.mapToPair(
            new PairFunction<String, String, Double>() {
              @Override
              public Tuple2<String, Double> call(String line) throws Exception {
                String[] tokens = line.split(" ");
                return new Tuple2<>(tokens[0], Double.valueOf(tokens[1]));
              }
            });

    // 执行iterateNum次迭代计算
    for (int iter = 1; iter <= iterateNum; iter++) {
      JavaPairRDD<String, Double> contributions =
          links
              // 将links和ranks做join,得到[网页, {{链接列表}, 排名值}]
              .join(ranks)
              // 计算出每个网页对其每个链接网页的贡献值
              .flatMapToPair(
                  new PairFlatMapFunction<
                      Tuple2<String, Tuple2<List<String>, Double>>, String, Double>() {
                    @Override
                    public Iterator<Tuple2<String, Double>> call(
                        Tuple2<String, Tuple2<List<String>, Double>> t) throws Exception {
                      List<Tuple2<String, Double>> list = new ArrayList<>();
                      for (int i = 0; i < t._2._1.size(); i++) {
                        // 网页排名值除以链接总数
                        list.add(new Tuple2<>(t._2._1.get(i), t._2._2 / t._2._1.size()));
                      }
                      return list.iterator();
                    }
                  });

      ranks =
          contributions
              // 聚合对相同网页的贡献值,求和得到对每个网页的总贡献值
              .reduceByKey(
                  new Function2<Double, Double, Double>() {
                    @Override
                    public Double call(Double r1, Double r2) throws Exception {
                      return r1 + r2;
                    }
                  })
              // 根据公式计算得到每个网页的新排名值
              .mapValues(
                  new Function<Double, Double>() {
                    @Override
                    public Double call(Double v) throws Exception {
                      return (1 - factor) * 1.0 / N + factor * v;
                    }
                  }).sortByKey();
    }

    JavaPairRDD<String, Double> sranks = ranks
        .mapToPair((row)->new Tuple2<>(row._2,row._1))
        .sortByKey(false)
        .mapToPair((row)->new Tuple2<>(row._2,row._1));

    Double lim = sranks.take(3).get(2)._2;

    JavaPairRDD<String, Double> ans = sranks
        .filter(new Function<Tuple2<String, Double>, Boolean>() {
          @Override
          public Boolean call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
            return stringDoubleTuple2._2 >= lim;
          }
        }); // 該步使用算法要求不能有重複的PageRank值

    // 对排名值保留5位小数,并打印最终网页排名结果
    ans.foreach(new VoidFunction<Tuple2<String, Double>>() {
      @Override
      public void call(Tuple2<String, Double> t) throws Exception {
        System.out.println(t._1 + " " + String.format("%f", t._2));
      }
    });

    return ans;
  }
}

Single Table Association

待完成:

  • 请在 DSPPCode.spark.single_table_association.impl 中创建 SingleTableAssociationImpl, 继承SingleTableAssociation, 实现抽象方法

题目描述:

  • 张三拿到了一份子女和父母对应的关系表,他发现关系表当中的一些人还存在着祖父母的亲属关系。关系表很大,手工分析可能需要几个小时,所以张三希望你能帮他编写程序自动地从中分析出哪些人存在着祖父母的亲属关系。

  • 输入格式:

child parent Jack Philip Jack Jesse Philip Terry Philip Alma

输入保存在文本中,文本的第一行为表头,其余行为孩子和父母的对应关系。以上述示例为例,Jack Philip表示Philip是Jack的父母

  • 输出格式:

(Jack,Terry) (Jack,Alma)

输出保存在文本中,每行为一个元组:(孙子女姓名,祖父母姓名)。以上述示例为例,(Jack,Terry)表示Terry是Jack的祖父母。

SingleTableAssociationImpl

package DSPPCode.spark.single_table_association.impl;
import DSPPCode.spark.single_table_association.question.SingleTableAssociation;

// load java utils

import java.util.ArrayList;
import java.util.Iterator; // 引入Iterator类
import java.util.List;

// load spark related modules
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2; // load tuple2
import shapeless.Tuple;

public class SingleTableAssociationImpl extends SingleTableAssociation {
  private static final String FLAG = "child";

  @Override
  public JavaRDD<Tuple2<String, String>> singleTableAssociation(JavaRDD<String> lines) {

    JavaRDD<String> association = lines.filter(new Function<String, Boolean>() {
      @Override
      public Boolean call(String v1) throws Exception {
        if (v1.contains(FLAG)){
          return false;
        } // 这段不加也是可以的,其实就是用来过滤掉表头
        else{
          return true ;
        }
      }
    });

    JavaPairRDD<String,String> childParentRdd = association.mapToPair(
        new PairFunction<String, String, String>() {
          @Override
          public Tuple2<String, String> call(String s) throws Exception {
            String[] tokens = s.split(" ");
            String chd = tokens[0];
            String prt = tokens[1];
            return new Tuple2<>(chd,prt);
          }
        });

    JavaPairRDD<String,String> parentChildRdd = association.mapToPair(
        new PairFunction<String, String, String>() {
          @Override
          public Tuple2<String, String> call(String s) throws Exception {
            String[] tokens = s.split(" ");
            String chd = tokens[0];
            String prt = tokens[1];
            return new Tuple2<>(prt,chd);
          }
        });
    // remark : 注意join操作 (a1,a2)join(a1,b2)=(a1,(a2,b2)),比较的是value1是否相等


    JavaPairRDD<String, Tuple2<String,String>> result = parentChildRdd.join(childParentRdd);

    return result.values();
  }
}

另一种写法

package DSPPCode.spark.single_table_association.impl;

import DSPPCode.spark.single_table_association.question.SingleTableAssociation;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class SingleTableAssociationImpl extends SingleTableAssociation {

  @Override
  public JavaRDD<Tuple2<String, String>> singleTableAssociation(JavaRDD<String> lines) {

    //读入子女表 父母-孩子
    JavaPairRDD<String, String> childparentRDD1 =
        lines.mapToPair(
            new PairFunction<String, String, String>() {
              @Override
              public Tuple2<String, String> call(String s) throws Exception {
                String[] tokens = s.split(" ");
                return new Tuple2<String, String>(tokens[1], tokens[0]);
              }
            });

    //读入子女表 孩子-父母
    JavaPairRDD<String, String> childparentRDD2 =
        lines.mapToPair(
            new PairFunction<String, String, String>() {
              @Override
              public Tuple2<String, String> call(String s) throws Exception {
                String[] tokens = s.split(" ");
                return new Tuple2<String, String>(tokens[0], tokens[1]);
              }
            });

    JavaRDD<Tuple2<String,String>> tmp =
    childparentRDD1
        .cogroup(childparentRDD2)
        .flatMap(
            new FlatMapFunction<
                Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>, Tuple2<String, String>>() {
              @Override
              public Iterator<Tuple2<String, String>> call(
                  Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> stringTuple2Tuple2)
                  throws Exception {
                    List<Tuple2<String, String>> list = new ArrayList<>();
                    for (String t1 : stringTuple2Tuple2._2._1) {
                      for (String t2 : stringTuple2Tuple2._2._2) {
                        if (!t1.equals(t2)) {
                          Tuple2<String, String> tmp = new Tuple2<String, String>(t1, t2);
                          list.add(tmp);
                        }
                      }
                    }
                return list.iterator();
              }
            });
    return tmp;
  }
}

Station Statistics

待完成:

  • 请在 DSPPCode.spark.station_statistics.impl 中创建 StationStatisticsImpl, 继承 StationStatistics, 实现抽象方法

题目描述:

  • 假定一群人早晨徒步从A地区去往B地区踏青。其中,一部分人早晨出发得早,傍晚就徒步从B地区返回A地区,并于当天18:00:00之前抵达A地区; 而另一部分人由于早晨出发得晚,当天未从B地区返回A地区,直接在B地区留宿一晚。 在来回途中,每个人都会经过多个基站。 现统计了当天每个人往返途中进入每个基站的时刻和从每个基站出来的时刻,要求计算出每个人在每个基站停留的总时间(以秒为单位)。

  • 输入格式:

统计数据保存在一个文本文件中,文件的每行由姓名、基站名、进入基站的时刻或从基站出来的时刻信息组成。信息之间用空格分隔。

Mark station1 12:00:01 Mark station1 12:01:01 Bill station1 11:04:05 Bill station1 11:06:08 Bill station2 14:04:04 Bill station2 14:07:12

  • 输出格式:

(Bill,station1,123) (Bill,station2,188) (Mark,station1,60)

StationStatisticsImpl

package DSPPCode.spark.station_statistics.impl;

import DSPPCode.spark.station_statistics.question.StationStatistics;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Iterator;
import scala.collection.JavaConversions.*;
import shapeless.Tuple;

public class StationStatisticsImpl extends StationStatistics {

  // spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,
  // 而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息

  public  JavaRDD<Tuple3<String, String, Integer>> stationStatistics(JavaRDD<String> lines){
    JavaPairRDD<String,Integer> ps = lines.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) throws Exception {
            String[] str = null;
            str = s.split(" ");
            String[] str_time = str[2].split(":");
            Integer time = Integer.parseInt(str_time[0])*3600+Integer.parseInt(str_time[1])*60+Integer.parseInt(str_time[2]);
            return new Tuple2<String,Integer>(str[0]+" "+str[1],time);
          }
        }
    );

    JavaPairRDD<String,Integer> result = ps.reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer v1, Integer v2) throws Exception {
        return Math.abs(v1-v2);
      }
    });

    JavaRDD<Tuple3<String,String,Integer>> j = result.map(
        new Function<Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
          @Override
          public Tuple3<String, String, Integer> call(Tuple2<String, Integer> v1) throws Exception {
            String[] str = null;
            str = v1._1.split(" ");
            return new Tuple3<>(str[0],str[1], v1._2);
          }
        }
    );

    return j;
  }

}

另一种写法

package DSPPCode.spark.station_statistics.impl;

import DSPPCode.spark.station_statistics.question.StationStatistics;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Iterator;
import scala.collection.JavaConversions.*;
public class StationStatisticsImpl extends StationStatistics {


  public  JavaRDD<Tuple3<String, String, Integer>> stationStatistics(JavaRDD<String> lines){
    JavaPairRDD<String, Integer> ps = lines.mapToPair(
        new PairFunction<String, String, Integer>()  {
          @Override
          public Tuple2<String, Integer> call(String s) throws Exception {
            String[] strArray = null;
            strArray = s.split(" ");
            String[] times = null;
            times = strArray[2].split(":");
            String key = strArray[0] + " "+ strArray[1];
            Integer time = Integer.parseInt(times[0]) * 3600 + Integer.parseInt(times[1]) *60 + Integer.parseInt(times[2]);
            return new Tuple2<String,Integer>(strArray[0]+ " " + strArray[1], time);
          }
        });
    JavaPairRDD<String,Integer> result = ps.reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer integer, Integer integer2) throws Exception {
        return Math.abs(integer - integer2);
      }
    });
    JavaRDD<Tuple3<String, String, Integer>> j = result.map(
        new Function<Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
          @Override
          public Tuple3<String, String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2)
              throws Exception {
              String[] str = null;
              str = stringIntegerTuple2._1.split(" ");
              Tuple3 t = new Tuple3(str[0], str[1], stringIntegerTuple2._2);
              return t;
            }
        });
    return j;
  }

}

TopK power

待完成:

  • 请在 DSPPCode.spark.topk_power.impl 中创建 TopKPowerImpl, 继承 TopKPower, 实现抽象方法

题目描述:

  • 想当上海贼王的男人阿D根据藏宝图发现了一处洞穴。洞穴中有许多宝箱,洞穴中的宝箱可能会多次闪光。阿D知道宝箱的价值为宝箱闪光次数的平方,所以阿D看到箱子闪光就会在小本本上记录一下。出于上海贼王与众不同的行事风格,阿D最多只会搬走5个箱子。请计算阿D能获得的最大的总价值。

  • 输入格式: 发光的箱子的ID(每个箱子的ID唯一,有的箱子可能不会闪光)

2 2 2 3 4 5 6 6 7 7 8 8 8 9 9 10 2 2

  • 输出格式: 最大的总价值

plaintext 46

样例解释:根据样例,阿D应该选择ID为2、8、6、7、9的箱子。

TopKPowerImpl

package DSPPCode.spark.topk_power.impl;


import DSPPCode.spark.topk_power.question.TopKPower;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class TopKPowerImpl extends TopKPower {

  @Override
  public int topKPower(JavaRDD<String> lines) {
    JavaPairRDD<Integer, Integer> mps = lines.flatMap(
        new FlatMapFunction<String, String>() {
          @Override
          public Iterator<String> call(String s) throws Exception {
            return Arrays.asList(s.split(" ")).iterator();
          }
        }
    ).mapToPair(
        new PairFunction<String, Integer, Integer>() {
          @Override
          public Tuple2<Integer, Integer> call(String s) throws Exception {
            return new Tuple2<Integer, Integer>(Integer.parseInt(s), 1);
          }
        }
    );
    JavaPairRDD<Integer, Integer> ans = mps.groupByKey().mapToPair(
        new PairFunction<Tuple2<Integer, Iterable<Integer>>, Integer, Integer>() {
          @Override
          public Tuple2<Integer, Integer> call(Tuple2<Integer, Iterable<Integer>> t) throws Exception {
            Integer s = 0;
            for(Integer i : t._2()){
              s += i;
            }
            return new Tuple2<Integer, Integer>(s * s, t._1());
          }
        }
    ).sortByKey(false);
    ans.foreach(t -> System.out.println(t._1 + " " + t._2));
    List<Tuple2<Integer, Integer>> a = ans.take(5);
    Integer ret = 0;
    for (Tuple2<Integer, Integer> t:a){
      ret += t._1();
    }
    return ret;
  }
}