Spark WordCount的两种方式。
语言:Java
工具:Idea
项目:Java Maven
pom.xml如下:
1.2.0 org.apache.spark spark-core_2.10 ${spark.version}
第一种方式,比较常规的按部就班的
package pairs;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;public class WordCount1 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount1"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDDinput = sc.textFile(filename); JavaRDD lines = input.flatMap(new FlatMapFunction () { public Iterable call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); //pairs JavaPairRDD pairs = lines.mapToPair(new PairFunction () { public Tuple2 call(String s) throws Exception { return new Tuple2 (s,1); } }); //reduce JavaPairRDD counts = pairs.reduceByKey(new Function2 () { public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //output counts.foreach(new VoidFunction >() { public void call(Tuple2 tuple2) throws Exception { System.out.println(tuple2); } }); sc.stop(); }}
代码输出:
(rose,2)(jack,3)
第二种更为简洁
package pairs;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import java.util.Arrays;import java.util.Map;public class WordCount2 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount2"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDDinput = sc.textFile(filename); JavaRDD lines = input.flatMap(new FlatMapFunction () { public Iterable call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); Map result = lines.countByValue(); System.out.println(result); sc.stop(); }}
代码输出:
{rose=2, jack=3}
通过对比可以发现,第一种方式一直都是转化操作,最后打印的是Tuple2;而第二种方式变成了行动操作,直接输出Map<String,Long>。
具体有什么区别,或者效率上有啥不同,待后续深入学习。
参考资料:
《Spark快速大数据分析》