package spark; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class sparkwordcount { public static void main(String[] args) { /* * conf * 1.可以设置spark的运行模式 * 2.可是设置在web中显示的application名称 * 3.可以设置当前spark application 运行的资源(内存+core) * * spark运行模式 * 1.local 测试 * 2.standalone spark自带的资源调度框架,spark任务可以依赖standalone调度资源 * 3.yarn hadoop生态圈的资源调度框架,spark也可以基于yarn调度资源 */ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("任务名称"); conf.set("内存", "10G"); /* * sparkContext 是通往集群的唯一通道,创建一个RDD */ JavaSparkContext sc = new JavaSparkContext(conf); //一行数据 JavaRDD<String> lines=sc.textFile("words"); //切分 JavaRDD<String> words=lines.flatMap(new FlatMapFunction<String,String>() { //<进String(一行的数据) 出String类型(一个单词)> @Override //第一个String 对应出的String 第二个String对应进的String public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } }); //在Java中如果想让某个RDD转成K,V格式,使用xxxToPair eg:mapToPair //计数 //String ->单词 String->k Integer->v JavaPairRDD<String, Integer> pairwords=words.mapToPair(new PairFunction<String,String,Integer>() { @Override //<k,v> (单词) public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }); /* * 将相同的key分组 * 对每一组的key对应的value按照自定义去处理 */ //聚合(懒执行) 1 1 2 JavaPairRDD<String, Integer> reduce=pairwords.reduceByKey(new Function2<Integer, Integer, Integer>() { /* * 1 1 1 * 1+1=2 1 * 2 1 * 3 */ @Override // 2 1 1 public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1+v2; } }); //排序(懒执行) /* * (hello,5)------->(5,hello) */ JavaPairRDD<Integer, String>mapToPair=reduce.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { // TODO Auto-generated method stub return tuple.swap();//换位置 } }); JavaPairRDD<Integer, String>sorBykey=mapToPair.sortByKey(false); //换回去(懒执行) JavaPairRDD<String, Integer> reducesort=sorBykey.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { // TODO Auto-generated method stub return tuple.swap();//换位置 } }); //action算子,触发(Transformation算子)懒执行 reducesort.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); } }); sc.stop(); } }
算子
package spark; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; public class suanzi { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("任务名称"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String>lines=sc.textFile("./words"); //取前5个 List<String> list=lines.take(5); for (String string : list) { System.out.println(list); } //过滤 JavaRDD<String> data=lines.filter(new Function<String, Boolean>() { @Override public Boolean call(String arg0) throws Exception { // TODO Auto-generated method stub return arg0.equals("hello zhazhalong"); } }); } }
以后多多交流