Spark全部算子

首页 » Spark » Spark全部算子

transformations算子

  • mapPartitionWithIndex

类似于mapPartitions,除此之外还会携带分区的索引值。

  • repartition

增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)

  • coalesce

coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。

true为产生shuffle,false不产生shuffle。默认是false。

如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)

  • groupByKey

作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。

  • zip

将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。

  • zipWithIndex

该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。

package spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

public class suanzi {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("othersuanzi");
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList(
     "zha1","zha2","zha3","zha4",
     "zha5","zha6","zha7","zha8",
     "zha9","zha10","zha11","zha12"
    ),3);
  
//		System.out.println("rdd1 partition length = "+rdd1.partitions().size());
  
  /**
   * mapPartitionWithIndex:
   * 	会将RDD中的partition索引下标带出来,index 是每个partition的索引下标
   */
  JavaRDD<String> mapPartitionsWithIndex = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

   /**
    * 
    */
   private static final long serialVersionUID = 1L;

   @Override
   public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
    List<String> list = new ArrayList<String>();
    
    while(iter.hasNext()) {
     String one = iter.next();
     list.add("rdd1 partition index = 【"+index+"】, value = 【"+one+"】");
    }
    return list.iterator();
   }
  }, true);
  /**
   * repartition
   *  repartition 是有shuffle的算子,可以对RDD重新分区。可以增加分区,也可以减少分区。
   *  repartition = coalesce(numPartitions,true)
   */
//		JavaRDD<String> rdd2 = mapPartitionsWithIndex.repartition(2);
  
  /**
   * coalesce
   * 	coalesce 与repartition一样,可以对RDD进行分区,可以增多分区,也可以减少分区。
   * 	coalsece(numPartitions,shuffle [Boolean = false]) false不产生shuffle
   */
  JavaRDD<String> rdd2 = mapPartitionsWithIndex.coalesce(4,false);
  
  System.out.println("rdd2 partition length = "+rdd2.partitions().size());
  
  JavaRDD<String> mapPartitionsWithIndex2 = rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

   /**
    * 
    */
   private static final long serialVersionUID = 1L;

   @Override
   public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
    List<String> list = new ArrayList<String>();
    
    while(iter.hasNext()) {
     String one = iter.next();
     list.add("rdd2 partition index = 【"+index+"】, value = 【"+one+"】");
    }
    return list.iterator();
   }
  }, true);
  List<String> collect = mapPartitionsWithIndex2.collect();
  for(String s :collect) {
   System.out.println(s);
  }
  sc.stop();
 }
}

Action算子

  • countByKey

 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。

  • countByValue

根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

  • reduce

根据聚合逻辑聚合数据集中的每个元素。

package spark;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class suanzi {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("othersuanzi");
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
    new Tuple2<String,String>("zha","18"),
    new Tuple2<String,String>("zha","180"),
    new Tuple2<String,String>("long","19"),
    new Tuple2<String,String>("long","190"),
    new Tuple2<String,String>("wang","100")
   ),2);
  /**
   * countByValue()
   *  Action 算子,对RDD中相同的元素计数
   */
  Map<Tuple2<String, String>, Long> countByValue = rdd1.countByValue();
  Set<Entry<Tuple2<String, String>, Long>> entrySet = countByValue.entrySet();
  for(Entry<Tuple2<String, String>, Long> entry :entrySet) {
   Tuple2<String, String> key = entry.getKey();
   Long value = entry.getValue();
   System.out.println("key = "+key+",value ="+value);
  }
  
  
  
  /**
   * countByKey()
   *  Action 算子,对RDD中相同的key的元素计数
   */
//		Map<String, Object> countByKey = rdd1.countByKey();
//		
//		Set<Entry<String, Object>> entrySet = countByKey.entrySet();
//		for(Entry<String, Object> entry :entrySet) {
//			String key = entry.getKey();
//			Object value = entry.getValue();
//			System.out.println("key = "+key+",value ="+value);
//		}
  
  
  /**
   * reduce  Action算子,对RDD中的每个元素 使用传递的逻辑去处理
   */
  
//		JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2,3,4,5));
//		Integer reduce = rdd2.reduce(new Function2<Integer, Integer, Integer>() {
//			
//			/**
//			 * 
//			 */
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public Integer call(Integer v1, Integer v2) throws Exception {
//				return v1+v2;
//			}
//		});
//		System.out.println(reduce);
  
//		JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
//				new Tuple2<String,Integer>("zhangsan",100),
//				new Tuple2<String,Integer>("zhangsan",200),
//				new Tuple2<String,Integer>("lisi",300),
//				new Tuple2<String,Integer>("lisi",400),
//				new Tuple2<String,Integer>("wangwu",500),
//				new Tuple2<String,Integer>("wangwu",600)
//				));
  /**
   * zipWithIndex 
   * 	给RDD中的每个元素与当前元素的下标压缩成一个K,V格式的RDD
   */
//		JavaPairRDD<Tuple2<String, String>, Long> zipWithIndex = rdd1.zipWithIndex();
//		zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String,String>,Long>>() {
//
//			/**
//			 * 
//			 */
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public void call(Tuple2<Tuple2<String, String>, Long> arg0) throws Exception {
//				System.out.println(arg0);
//			}
//		});
  
  /**
   * zip 将两个RDD压缩成一个K,V格式的RDD
   * 两个RDD中每个分区的数据要一致
   */
//		JavaPairRDD<Tuple2<String, String>, Tuple2<String, Integer>> zip = rdd1.zip(rdd2);
//		
//		zip.foreach(new VoidFunction<Tuple2<Tuple2<String,String>,Tuple2<String,Integer>>>() {
//
//			/**
//			 * 
//			 */
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public void call(Tuple2<Tuple2<String, String>, Tuple2<String, Integer>> arg0) throws Exception {
//				System.out.println(arg0);
//			}
//		});
  /**
   * groupByKey
   *  将RDD中相同的Key分组
   */
//		JavaPairRDD<String, Iterable<String>> groupByKey = parallelizePairs.groupByKey();
//		groupByKey.foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {
//			
//			/**
//			 * 
//			 */
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public void call(Tuple2<String, Iterable<String>> arg0) throws Exception {
//				System.out.println(arg0);
//			}
//		});
  sc.stop();
 }
}


package 其他算子;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import com.google.common.base.Optional;

import scala.Tuple2;



public class OtherSuanzi {
public static void main(String[] args) {
 SparkConf conf = new SparkConf().setMaster("local").setAppName("suanzi");
 JavaSparkContext sc = new JavaSparkContext(conf);
 
 
 List<String> list = Arrays.asList("1","2","4","6");
 
 JavaRDD<String> rdd = sc.parallelize(list,4);
 //将集合转换成RDD,指定4个分区,也就是4个partition,4个task并行的去处理任务
 System.out.println("分区数:"+rdd.partitions().size());
 
 List<String> coll=rdd.collect();//将RDD转换成list
 
 //k,v
 List<Tuple2<String,Integer>>list2=Arrays.asList(
   new Tuple2<String,Integer>("zha",1),
   new Tuple2<String,Integer>("long",2),
   new Tuple2<String,Integer>("wang",3)
   );
 //转成kv格式的RDD
 JavaPairRDD<String, Integer> rdd2=sc.parallelizePairs(list2, 2);
 List<Tuple2<String, Integer>> coll2=rdd2.collect();//将RDD转换成list
 
 
 
 List<Tuple2<String,Integer>>list3=Arrays.asList(
   new Tuple2<String,Integer>("zha",1),
   new Tuple2<String,Integer>("long",2),
   new Tuple2<String,Integer>("wang",3)
   );
 JavaPairRDD<String, Integer> rdd3=sc.parallelizePairs(list2, 2);
 List<Tuple2<String,String>>list4=Arrays.asList(
   new Tuple2<String,String>("zha","a"),
   new Tuple2<String,String>("xue","b"),
   new Tuple2<String,String>("wang","c")
   );
 JavaPairRDD<String, String> rdd4=sc.parallelizePairs(list4, 2);
 /*
  * join
  * 只对kv格式有效
  * 按照两个RDD的key去关联
  * join后的RDD与父RDD分区多的那个分区数一致
  */	
 //String代表这个key的类型		Interger是rdd3的value,String是rdd4的value
 JavaPairRDD<String, Tuple2<Integer, String>> join=rdd3.join(rdd4);
 join.foreach(new VoidFunction<Tuple2<String,Tuple2<Integer,String>>>(){

  @Override
  public void call(Tuple2<String, Tuple2<Integer, String>> arg0) throws Exception {
   // TODO Auto-generated method stub
//			System.out.println(arg0);
  }
  
 });
 //左链接	String代表rdd3的key  Integer代表rdd3的value String代表rdd4的相同key的value
 JavaPairRDD<String, Tuple2<Integer, Optional<String>>> join2=rdd3.leftOuterJoin(rdd4);
 join2.foreach(new VoidFunction<Tuple2<String,Tuple2<Integer,Optional<String>>>>(){

  @Override
  public void call(Tuple2<String, Tuple2<Integer, Optional<String>>> arg0) throws Exception {
   // TODO Auto-generated method stub
   String key=arg0._1;
   Integer value=arg0._2._1;
   Optional<String> option=arg0._2._2;
   if(option.isPresent()) {
    //value不为空
    System.out.println("key="+key+",value="+value+",value2="+option.get());
   }else {
    System.out.println("key="+key+",value="+value+",value2="+"null");
   }
  }
 });
 
 
 /*
  * union合并
  * union后的RDD是合并RDD的分区数目相加
  * 类型一致
  */
 JavaPairRDD<String, Integer> union=rdd2.union(rdd3);
 union.foreach(new VoidFunction<Tuple2<String,Integer>>() {

  @Override
  public void call(Tuple2<String, Integer> arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println(arg0);
  }
 });
 
 /*
  * 取两个数据集的交集
  * intersection
  * 差集
  * subtract
  * 
  * cogroup 将两个RDD的key合并,每个RDD中的key对应一个Value集合
  */
 rdd2.intersection(rdd3);
 
 /*
  * map
  * 进去一个数据出去一个数据
  */
 JavaRDD<String>rdd6=sc.parallelize(Arrays.asList("1","2","3","4","5","6","7"),3);
 rdd6.map(new Function<String, String>() {

  @Override
  public String call(String arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println("创建链接");
   System.out.println("添加一条数据");
   System.out.println("关闭链接");
   return arg0+"~";	
  }
 }).collect();
 
 
 JavaRDD<String>rdd5=sc.parallelize(Arrays.asList("1","2","3","4","5","6","7"),3);
 /*
  * mapPartitions
  * 返回RDD
  */
 rdd5.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
  //进来一个分区的数据,出去也是一个分区的数据,减少oom
  @Override
  public Iterable<String> call(Iterator<String> iter) throws Exception {
   // TODO Auto-generated method stub
   System.out.println("创建链接");
   List<String> list=new ArrayList<String>();
   while(iter.hasNext()) {
    String s=iter.next();
    list.add(s);
    System.out.println("添加一条数据");
   }
   System.out.println("关闭链接");
   return list;
  }	
 }).collect();
 
 
 /*
  * foreachePartition 进去一个分区出去一个分区
  * 区别是不用返回、是action算子
  */
 rdd5.foreachPartition(new VoidFunction<Iterator<String>>() {

  @Override
  public void call(Iterator<String> arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println("创建链接");
   List<String> list=new ArrayList<String>();
   while(arg0.hasNext()) {
    String s=arg0.next();
    list.add(s);
    System.out.println("添加一条数据");
   }
   System.out.println("关闭链接");
  }
 });
 
 
 
 
 JavaRDD<String>rdd7=sc.parallelize(Arrays.asList("1","2","3","4","1","2","3","4"),3);
 /*
  * 去重1
  */
 JavaRDD<String> dis=rdd7.distinct();
 dis.foreach(new VoidFunction<String>() {

  @Override
  public void call(String arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println(arg0);
  }
 });
 
 /*
  * 去重2
  */
 JavaPairRDD<String, Integer> group=rdd7.mapToPair(new PairFunction<String, String, Integer>() {
  //分组
  @Override
  public Tuple2<String, Integer> call(String arg0) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String,Integer>(arg0,1);
   //kv格式RDD 
  }
 });
 JavaPairRDD<String, Integer> reduce=group.reduceByKey(new Function2<Integer, Integer, Integer>() {
  
  @Override
  public Integer call(Integer arg0, Integer arg1) throws Exception {
   // TODO Auto-generated method stub
   return arg0;//这个无所谓,只取分组后的key
  }
 });
 JavaRDD<String> result=reduce.map(new Function<Tuple2<String,Integer>, String>() {

  @Override
  public String call(Tuple2<String, Integer> arg0) throws Exception {
   // TODO Auto-generated method stub
   return arg0._1;
  }
 });
 result.foreach(new VoidFunction<String>() {

  @Override
  public void call(String arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println(arg0);
  }
 });
 
 
 sc.stop();
}
}

Scala实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object OtherSuanzi {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val rdd1=sc.makeRDD(Array(("zha",10),("zha",20),("long",20),("wang",30)),3)
    val rdd2=sc.makeRDD(Array(("zha",100),("long",200),("xue",300)),2)
    
    val result=rdd1.join(rdd2)
    result.foreach(println)
    
    
    rdd1.union(rdd2).foreach(println)
    
    
    val result2=rdd1.distinct()
    result2.foreach(println)
    
    
    
    rdd1.mapPartitions(iter=>{
    iter
    }, true).foreach(println);
    sc.stop()
  }
}

分享到:
赞(0) 打赏

评论 8

评论前必须登录!

 

  1. #1

    可以

    小蚯蚓8个月前 (03-27)
  2. #2

    还可以

    白云8个月前 (03-27)
  3. #3

    不错

    靓仔8个月前 (03-27)
  4. #4

    可以

    渣渣混8个月前 (03-27)
  5. #5

    最好再详细点

    你好8个月前 (03-28)
  6. #6

    小白6个月前 (05-26)
  7. #7

    可以

    我也是渣渣6个月前 (05-26)
  8. #8

    不错

    笨鸟先飞6个月前 (05-26)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00