2023-2024-2-编程作业2¶
2021级,Spark
又要开始做无聊的分布式编程作业了。【5月7日晚上开始】【截止时间:6月3日23:59】
结论:这次真的是大换血了,和之前的几乎完全不一样,网上几乎什么都没有。
but LLM is god. 【5月8日晚上23点出头完成】
Word Count¶
待完成:¶
- 请在 DSPPCode.spark.warm_up.impl 中创建 WordCountImpl, 继承 WordCount, 实现抽象方法
题目描述:¶
-
现存在一份英文文本文件,要求统计文本中每个单次出现的次数,并将计数结果输出到文本中。
-
输入格式:
apple pen applepen pineapple pineapplepen boy cat dog boy cat
dog cat dog dog
- 输出格式:
(pen,1)
(pineapple,1)
(dog,4)
(pineapplepen,1)
(apple,1)
(applepen,1)
(boy,2)
(cat,3)
Word Count 你要相信这就是个送分题。
WordCountImpl¶
package DSPPCode.spark.warm_up.impl;
import DSPPCode.spark.warm_up.question.WordCount;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.Arrays;
/**
* Java 答案示例
*/
public class WordCountImpl extends WordCount {
@Override
public JavaPairRDD<String, Integer> wordCount(JavaRDD<String> lines) {
// 将每行内容按分隔符拆分成单个单词
JavaRDD<String> words = lines.flatMap((String line)
-> Arrays.asList(line.split(" ")).iterator());
// 将每个单词的频数设置为1,即将每个单词映射为[单词, 1]
JavaPairRDD<String, Integer> wordPairs = words
.mapToPair((String word) -> new Tuple2<>(word, 1));
// 返回累加计数结果
return wordPairs.reduceByKey(Integer::sum);
}
}
EulerCircuit¶
待完成:¶
- 请在DSSPCode.spark.eulercircuit.impl中创建EulerCircuitImpl,继承EulerCircuit,实现抽象方法。
题目描述:¶
-
辅导员让小明去各个学院搬书,问小明能否从本学院出发经过连接各个学院的路仅一次又再次回到本学院。
-
输入格式:
输入包含若干行,每行两个数字a b,代表一条学院a到学院b的路,以空格分隔。
1 2
2 3
3 4
4 1
- 输出格式:
问小明能否按照要求从本学院出发经过连接各个学院的路仅一次又再次回到本学院。可以则输出Yes,否则输出No。
Yes
EulerCircuitImpl¶
package DSPPCode.spark.eulercircuit.impl;
import DSPPCode.spark.eulercircuit.question.EulerCircuit;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class EulerCircuitImpl extends EulerCircuit {
@Override
public boolean isEulerCircuit(JavaRDD<String> lines, JavaSparkContext jsc) {
// 将输入的每一行转换为(a, b)的形式
JavaPairRDD<Integer, Integer> edges = lines.mapToPair(line -> {
String[] nodes = line.split(" ");
return new Tuple2<>(Integer.parseInt(nodes[0]), Integer.parseInt(nodes[1]));
});
// 计算每个节点的度数
JavaPairRDD<Integer, Integer> degrees = edges.flatMapToPair(edge -> {
List<Tuple2<Integer, Integer>> list = new ArrayList<>();
list.add(new Tuple2<>(edge._1, 1));
list.add(new Tuple2<>(edge._2, 1));
return list.iterator();
}).reduceByKey(Integer::sum);
// 检查所有节点的度数是否都是偶数
long oddDegreeNodes = degrees.filter(degree -> degree._2 % 2 != 0).count();
// 如果所有节点的度数都是偶数,那么存在欧拉回路
return oddDegreeNodes == 0;
}
}
KNN¶
难易程度:** 中¶
待完成¶
- 请在 DSPPCode.spark.knn.impl 中创建 KNNImpl,继承 KNN,实现抽象方法。
题目描述¶
-
要求:给定训练数据集和查询数据集,使用 KNN 算法完成对查询数据集的分类。(注意:当一条数据对应多个类别的权重相等时,请选择序号较小的类别。)
-
输入格式
训练数据集:每行 n+2 个数字,分别是数据 id、类别 id、n 维特征,中间使用空格分割。
1 0 1.0 0.9
2 1 0.2 0.3
3 0 0.8 0.7
4 1 0.1 0.2
5 0 0.8 0.8
6 1 0.2 0.2
查询数据集:每行 n+1 个数字,分别是数据 id、n 维特征,中间使用空格分割。
7 0.8 0.9
8 0.2 0.1
- 输出格式
查询数据集类别:每行 2 个数字,分别是数据 id、类别 id,中间使用逗号分割。
7,0
8,1
KNNImpl¶
最开始的代码(只有6分)【看报错似乎是未考虑shuffle性能优化】:
package DSPPCode.spark.knn.impl;
import DSPPCode.spark.knn.question.Data;
import DSPPCode.spark.knn.question.KNN;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class KNNImpl extends KNN {
public KNNImpl(int k) {
super(k);
}
@Override
public JavaPairRDD<Data, Data> kNNJoin(JavaRDD<Data> trainData, JavaRDD<Data> queryData) {
return queryData.cartesian(trainData);
}
@Override
public JavaPairRDD<Integer, Tuple2<Integer, Double>> calculateDistance(JavaPairRDD<Data, Data> data) {
return data.mapToPair(pair -> {
double distance = 0;
for (int i = 0; i < pair._1.x.length; i++) {
distance += Math.pow(pair._1.x[i] - pair._2.x[i], 2);
}
distance = Math.sqrt(distance);
return new Tuple2<>(pair._1.id, new Tuple2<>(pair._2.y, distance));
});
}
@Override
public JavaPairRDD<Integer, Integer> classify(JavaPairRDD<Integer, Tuple2<Integer, Double>> data) {
return data.groupByKey().mapToPair(pair -> {
List<Tuple2<Integer, Double>> list = new ArrayList<>();
pair._2.forEach(list::add);
list.sort((o1, o2) -> {
int compare = o1._2.compareTo(o2._2);
if (compare != 0) {
return compare;
} else {
return o1._1.compareTo(o2._1);
}
});
return new Tuple2<>(pair._1, list.get(0)._1);
});
}
}
以下是修正后的版本:
package DSPPCode.spark.knn.impl;
import DSPPCode.spark.knn.question.Data;
import DSPPCode.spark.knn.question.KNN;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import org.apache.spark.api.java.function.PairFlatMapFunction;
public class KNNImpl extends KNN {
public KNNImpl(int k) {
super(k);
}
@Override
public JavaPairRDD<Data, Data> kNNJoin(JavaRDD<Data> trainData, JavaRDD<Data> queryData) {
// 将训练数据广播到所有节点
Broadcast<List<Data>> broadcastTrainData = JavaSparkContext.fromSparkContext(trainData.context()).broadcast(trainData.collect());
return queryData.flatMapToPair(query -> {
List<Tuple2<Data, Data>> result = new ArrayList<>();
for (Data train : broadcastTrainData.value()) {
result.add(new Tuple2<>(query, train));
}
return result.iterator();
});
}
/*
@Override
public JavaPairRDD<Data, Data> kNNJoin(JavaRDD<Data> trainData, JavaRDD<Data> queryData) {
return queryData.cartesian(trainData);
}
*/
@Override
public JavaPairRDD<Integer, Tuple2<Integer, Double>> calculateDistance(JavaPairRDD<Data, Data> data) {
//data.cache();
return data.flatMapToPair(pair -> {
List<Tuple2<Integer, Tuple2<Integer, Double>>> distances = new ArrayList<>();
for (int i = 0; i < pair._1.x.length; i++) {
double distance = Math.pow(pair._1.x[i] - pair._2.x[i], 2);
distance = Math.sqrt(distance);
distances.add(new Tuple2<>(pair._1.id, new Tuple2<>(pair._2.y, distance)));
}
return distances.iterator();
});
}
/*
@Override
public JavaPairRDD<Integer, Integer> classify(JavaPairRDD<Integer, Tuple2<Integer, Double>> data) {
return data.groupByKey().mapToPair(pair -> {
List<Tuple2<Integer, Double>> list = new ArrayList<>();
pair._2.forEach(list::add);
list.sort((o1, o2) -> {
int compare = o1._2.compareTo(o2._2);
if (compare != 0) {
return compare;
} else {
return o1._1.compareTo(o2._1);
}
});
return new Tuple2<>(pair._1, list.get(0)._1);
});
}
*/
@Override
public JavaPairRDD<Integer, Integer> classify(JavaPairRDD<Integer, Tuple2<Integer, Double>> data) {
final ClassTag<Integer> classTagInteger = ClassTag$.MODULE$.apply(Integer.class);
final Broadcast<Integer> broadcastK = data.context().broadcast(k, classTagInteger);
return data.mapToPair(pair -> new Tuple2<>(new Tuple2<>(pair._1, pair._2._1), pair._2._2))
.combineByKey((Double value) -> new Tuple2<>(1, value),
(Tuple2<Integer, Double> tuple, Double value) -> new Tuple2<>(tuple._1 + 1, Math.min(tuple._2, value)),
(Tuple2<Integer, Double> tuple1, Tuple2<Integer, Double> tuple2) -> new Tuple2<>(tuple1._1 + tuple2._1, Math.min(tuple1._2, tuple2._2))
)
.mapPartitionsToPair((PairFlatMapFunction<Iterator<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>>>, Integer, Tuple2<Integer, Double>>) iterator -> {
Map<Integer, PriorityQueue<Tuple2<Integer, Double>>> map = new HashMap<>();
while (iterator.hasNext()) {
Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>> tuple = iterator.next();
PriorityQueue<Tuple2<Integer, Double>> queue = map.getOrDefault(tuple._1._1, new PriorityQueue<>(Comparator.comparing(Tuple2::_2)));
if (queue.size() < broadcastK.value()) {
queue.add(new Tuple2<>(tuple._1._2, tuple._2._2));
} else if (queue.peek()._2 < tuple._2._2) {
queue.poll();
queue.add(new Tuple2<>(tuple._1._2, tuple._2._2));
}
map.put(tuple._1._1, queue);
}
List<Tuple2<Integer, Tuple2<Integer, Double>>> result = new ArrayList<>();
for (Map.Entry<Integer, PriorityQueue<Tuple2<Integer, Double>>> entry : map.entrySet()) {
PriorityQueue<Tuple2<Integer, Double>> queue = entry.getValue();
while (!queue.isEmpty()) {
result.add(new Tuple2<>(entry.getKey(), queue.poll()));
}
}
return result.iterator();
}).reduceByKey((tuple1, tuple2) -> tuple1._2 < tuple2._2 ? tuple1 : tuple2)
.mapToPair(pair -> {
Map<Integer, Double> map = new HashMap<>();
map.put(pair._2._1, map.getOrDefault(pair._2._1, 0.0) + 1 / pair._2._2);
int maxLabel = Collections.max(map.entrySet(), Comparator.comparing(Map.Entry::getValue)).getKey();
return new Tuple2<>(pair._1, maxLabel);
});
}
}
Connected Components¶
难易程度: **中¶
待完成¶
- 请在 DSPPCode.spark.connected_components.impl 中创建 ConnectedComponentsImpl, 继承 ConnectedComponents, 实现抽象方法
题目描述¶
-
根据输入文本中的顶点关系,找出无向图中每个连通分量的最小顶点 ID。
-
输入格式:第一列表示顶点 ID,第二列起为与第一列顶点相连的顶点 ID (分隔符为tab)。例如,第一行表示顶点1与顶点2,顶点3直接连接。
1 2 3 2 1 3 1 4 6 5 6 6 4 5 -
输出格式:第一列输出顶点 ID,第二列表示该连通分量中最小的顶点 ID。例如,第一行表示在顶点1的连通分量中最小的顶点ID是1。
(1,1) (2,1) (3,1) (4,4) (5,4) (6,4)
ConnectedComponentsImpl¶
最开始的代码(只有6分):
package DSPPCode.spark.connected_components.impl;
import DSPPCode.spark.connected_components.question.ConnectedComponents;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class ConnectedComponentsImpl extends ConnectedComponents {
@Override
public JavaPairRDD<String, Integer> getcc(JavaRDD<String> text) {
// 将输入的文本转换为边的形式
JavaPairRDD<String, Integer> edges = text.flatMapToPair(line -> {
String[] vertices = line.split("\t");
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
for (int i = 1; i < vertices.length; i++) {
pairs.add(new Tuple2<>(vertices[0], Integer.parseInt(vertices[i])));
pairs.add(new Tuple2<>(vertices[i], Integer.parseInt(vertices[0])));
}
return pairs.iterator();
}).distinct();
// 初始化每个顶点的最小顶点ID为其自身
JavaPairRDD<String, Integer> minVertex = text.flatMapToPair(line -> {
String[] vertices = line.split("\t");
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
for (String vertex : vertices) {
pairs.add(new Tuple2<>(vertex, Integer.parseInt(vertex)));
}
return pairs.iterator();
}).reduceByKey(Math::min);
while (true) {
// 计算新的最小顶点ID
JavaPairRDD<String, Integer> newMinVertex = edges.join(minVertex)
.values()
.mapToPair(tuple -> new Tuple2<>(tuple._1.toString(), Math.min(tuple._1, tuple._2)))
.reduceByKey(Math::min);
// 如果最小顶点ID没有发生改变,那么结束迭代
if (!isChange(minVertex, newMinVertex)) {
break;
}
minVertex = newMinVertex;
}
return minVertex;
}
}
报错信息如下所示:
1) test3(DSPPTest.online_judge.spark.connected_components.ConnectedComponentsTest)
org.junit.ComparisonFailure: expected:<[1])> but was:<[2])>
at org.junit.Assert.assertEquals(Assert.java:115)
at org.junit.Assert.assertEquals(Assert.java:144)
at DSPPTest.util.Verifier.verifyKV(Verifier.java:60)
at DSPPTest.util.Verifier.verifyKV(Verifier.java:28)
at DSPPTest.online_judge.spark.connected_components.ConnectedComponentsTest.executeJob(ConnectedComponentsTest.java:104)
at DSPPTest.online_judge.spark.connected_components.ConnectedComponentsTest.test3(ConnectedComponentsTest.java:79)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
FAILURES!!!
后来我尝试自己改数据集,然后发现孤立的点存在时更新紊乱,然后尝试在构建图的时候就加入自环,然后就通过了。正式的代码:
package DSPPCode.spark.connected_components.impl;
import DSPPCode.spark.connected_components.question.ConnectedComponents;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class ConnectedComponentsImpl extends ConnectedComponents {
@Override
public JavaPairRDD<String, Integer> getcc(JavaRDD<String> text) {
// 将输入的文本转换为边的形式
JavaPairRDD<String, Integer> edges = text.flatMapToPair(line -> {
String[] vertices = line.split("\t");
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
for (int i = 1; i < vertices.length; i++) {
pairs.add(new Tuple2<>(vertices[0], Integer.parseInt(vertices[i])));
pairs.add(new Tuple2<>(vertices[i], Integer.parseInt(vertices[0])));
}
//我让每个点到自己都有自环
pairs.add(new Tuple2<>(vertices[0], Integer.parseInt(vertices[0])));
return pairs.iterator();
}).distinct();
// 初始化每个顶点的最小顶点ID为其自身
JavaPairRDD<String, Integer> minVertex = text.flatMapToPair(line -> {
String[] vertices = line.split("\t");
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
for (String vertex : vertices) {
pairs.add(new Tuple2<>(vertex, Integer.parseInt(vertex)));
}
return pairs.iterator();
}).reduceByKey(Math::min);
while (true) {
// 计算新的最小顶点ID
JavaPairRDD<String, Integer> newMinVertex = edges.join(minVertex)
.values()
.mapToPair(tuple -> new Tuple2<>(tuple._1.toString(), Math.min(tuple._1, tuple._2)))
.reduceByKey(Math::min);
// 如果最小顶点ID没有发生改变,那么结束迭代
if (!isChange(minVertex, newMinVertex)) {
break;
}
minVertex = newMinVertex;
}
return minVertex;
}
}
perceptron¶
难易程度: **难¶
待完成¶
- 请在DSPPCode.spark.perceptron.impl中创建IterationStepImpl,继承IterationStep,实现抽象方法
问题描述¶
单层感知机模型通过拟合一个线性函数,再通过阶跃函数输出 {-1, +1} 的类别标签,直接通过标签来判断因变量的类别,其中\(x \in R^n\)为输入,\(w \in R ^ n\)为权重,\(b \in R^n\)为偏置: $$ f(x) = \begin{cases} -1, & \text{if } w \cdot x + b < 0 \ +1, & \text{if } w \cdot x + b \geq 0 \end{cases} $$
我们使用经验损失函数作为代价函数,其中\(M\)为错分类样本集合: $$ L(w,b) = - \sum_{x_i \in M} {y_i(w\cdot x_i + b)} $$
随后我们使用梯度下降法求解新的权重\(w\)和偏置\(b\),其中\(\eta\)为学习率: $\(w^{t+1} \leftarrow w^{t} + \eta (y - f({x_i})) x_i\)$ $\(b^{t+1} \leftarrow b^{t} + \eta (y - f({x_i}))\)$
要求:使用梯度下降算法求解单层感知机模型参数。
- 输入格式
-1 1 0
-1 2 0
-1 2 1
+1 0 1
+1 0 2
+1 0 9
+1 1 2
+1 1 3
第一列表示数据的类别 +1 或 -1,第二列到最后一列表示数据不同维度上的值,数据类型均为 double。即
- 输出格式
w0,-0.07
w1,0.14
b,0.00
第一列为模型参数下标,第二列为对应下标的模型参数值。上面展示维度为 2 的数据经计算后的参数结果。
IterationStepImpl¶
其实就是个大模拟。
package DSPPCode.spark.perceptron.impl;
import DSPPCode.spark.perceptron.question.IterationStep;
import DSPPCode.spark.perceptron.question.DataPoint;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function;
public class IterationStepImpl extends IterationStep {
@Override
public Broadcast<double[]> createBroadcastVariable(JavaSparkContext sc, double[] localVariable) {
return sc.broadcast(localVariable);
}
@Override
public boolean termination(double[] old, double[] newWeightsAndBias) {
double sum = 0;
for (int i = 0; i < old.length; i++) {
sum += Math.pow(old[i] - newWeightsAndBias[i], 2);
}
return sum < THRESHOLD;
}
@Override
public double[] runStep(JavaRDD<DataPoint> points, Broadcast<double[]> broadcastWeightsAndBias) {
double[] weightsAndBias = points.map(new ComputeGradient(broadcastWeightsAndBias.value()))
.reduce(new VectorSum());
for (int i = 0; i < weightsAndBias.length; i++) {
weightsAndBias[i] = broadcastWeightsAndBias.value()[i] + STEP * weightsAndBias[i];
}
return weightsAndBias;
}
public static class VectorSum implements Function2<double[], double[], double[]> {
@Override
public double[] call(double[] a, double[] b) throws Exception {
for (int i = 0; i < a.length; i++) {
a[i] += b[i];
}
return a;
}
}
public static class ComputeGradient implements Function<DataPoint, double[]> {
public final double[] weightsAndBias;
public ComputeGradient(double[] weightsAndBias) {
this.weightsAndBias = weightsAndBias;
}
@Override
public double[] call(DataPoint dataPoint) throws Exception {
double[] gradient = new double[weightsAndBias.length];
double dotProduct = 0;
for (int i = 0; i < dataPoint.x.length; i++) {
dotProduct += dataPoint.x[i] * weightsAndBias[i];
}
dotProduct += weightsAndBias[weightsAndBias.length - 1];
if (dataPoint.y * dotProduct <= 0) {
for (int i = 0; i < dataPoint.x.length; i++) {
gradient[i] = dataPoint.y * dataPoint.x[i];
}
gradient[weightsAndBias.length - 1] = dataPoint.y;
}
return gradient;
}
}
}