Spark WordCount 探索RDD编程

通过编写WordCount来探索RDD编程,还是没能完全改成函数式编程,所以这里的代码主要使用了Java API,即使是函数式编程也是在Java1.8下的。没有在scala下。稍微有些不习惯scala下var和val自动定义变量类型。

SparkConf

每一个Spark任务都需要一个driver执行所有的操作,那么每一个spark程序的开头都是定义这么一个driver。在wordcount应用中,只需要定义driver的名字和master就可以了。

SparkContext

SparkContext是Spark应用程序中用到的第一个类。SparkContext为Spark的主要入口点,简明扼要,如把Spark集群当作服务端那Spark Driver就是客户端,SparkContext则是客户端的核心。
SparkContext用于连接Spark集群、创建RDD、累加器(accumlator)、广播变量(broadcast variables)。

wordcount程序基本只需要创建RDD,那么创建RDD有两种基本方式:

  • 从文件中读取 textFile()
  • parallelize()

例如:
通过textFile()建立

1
2
3
4
5
6
7
SparkConf conf = new SparkConf();
conf.setAppName("SecondarySort");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

//导入文本
JavaRDD<String> input = sc.textFile(src_file);

注意src_file的路径有两种方式:

  • 位于hdfs上的路径采用hdfs://前缀
  • 位于主机本地上的路径采用file://前缀

通过parallelize()建立

1
2
3
4
5
SparkConf conf = new SparkConf().setAppName("text");
conf.setMaster("local");
List<Integer> list = Arrays.asList(1,2,3,4);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> numberRDD = sc.parallelize(list, 4);

RDD操作

我们想要实现统计一段文本中出现频率最高的单词。也就是说结果是单词按照词频降序排列。那么应该想到有以下步骤:

  1. 分词,String->String.iterator 从长文本变为单词迭代器。这就设计到了转换,使用flatMap

map与flatMap的区别:

  • map()是将函数用于RDD中的每个元素,将返回值构成新的RDD。
  • flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
  1. 赋值词频1,String-> <String, 1> 用到map
  2. 合并词频,<String, 1> -> <String, v> 用到reduce
  3. 按照词频排序,首先要将key变为词频计数。<String, v> -> <v, String>, 用到map
  4. 按key排序,用到sortByKey(false)。false表示降序
  5. 输出结果并保存。启用设定格式,去掉括号,用到map

整体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
*
*/

package wordcountsort;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;


import scala.Tuple2;

/**
* @author shaoguoliang
*
*/
public class WordCountSort {

public static void main(String[] args) {

String src_file = "file:///Users/shaoguoliang/Workspaces/MyEclipse 2017 CI/spark-rdd/data/wordtext.txt";
String dest_file = "file:///Users/shaoguoliang/Workspaces/MyEclipse 2017 CI/spark-rdd/data/wordtext_result.txt";

SparkConf conf = new SparkConf();
conf.setAppName("SecondarySort");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

//导入文本
JavaRDD<String> input = sc.textFile(src_file);

//分词
// JavaRDD<String> words = input.flatMap(x-> Arrays.asList(x.split(" ")).iterator() );
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {

@Override
public Iterator<String> call(String t) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(t.split(" ")).iterator();
}

});

//构建<word, count>, 并且reduce
JavaPairRDD<String, Integer> wordPair1 = words.mapToPair(new PairFunction<String, String, Integer>() {

@Override
public Tuple2<String, Integer> call(String t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(t, 1);
}
});

JavaPairRDD<String, Integer> wordPair = wordPair1.reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1+v2;
}
});

//因为只能对key进行排序,我们要统计词频出现最多的单词,将key, value倒置
// JavaPairRDD<Integer, String> wordCount = wordPair.mapToPair(x-> new Tuple2(x._2, x._1));
JavaPairRDD<Integer, String> wordCount = wordPair.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<Integer, String>(t._2, t._1);
}

});

//降序排列
JavaPairRDD<Integer, String> wordCountSort = wordCount.sortByKey(false);

//去掉括号
// JavaRDD<String> wordCountOutput = wordCountSort.map(x-> new String(x._1 + "," + x._2));
JavaRDD<String> wordCountOutput = wordCountSort.map(new Function<Tuple2<Integer,String>, String>() {

@Override
public String call(Tuple2<Integer, String> v1) throws Exception {
// TODO Auto-generated method stub
return v1._1 + "," + v1._2;
}
});

//输出
wordCountOutput.foreach(x-> System.out.println(x));

wordCountOutput.saveAsTextFile(dest_file);
}

}

编译提交

maven编译参考Spark环境搭建与RDD编程基础

提交:
spark-submit --master "localhost:7077" ./spark-rdd-0.0.1-SNAPSHOT.jar

RDD Tips

在RDD编程的过程中一定要先想好你想要的结果是什么,然后再去选择合适的RDD编程架构。

在上述代码中,结果会出现如下:

统计错误

所以在分词的过程中,不仅要添加空格还要添加可能的标点符号。

所以需要使用正则表达式进行字符串分隔。

return Arrays.asList(t.split("[\\p{Punct}\\s]+")).iterator();