Spark wordcount

首页 » Spark » Spark wordcount
package spark;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class sparkwordcount {
public static void main(String[] args) {
 /*
  * conf
  * 1.可以设置spark的运行模式
  * 2.可是设置在web中显示的application名称
  * 3.可以设置当前spark application 运行的资源(内存+core)
  * 
  * spark运行模式
  * 1.local 测试
  * 2.standalone      spark自带的资源调度框架,spark任务可以依赖standalone调度资源
  * 3.yarn			 hadoop生态圈的资源调度框架,spark也可以基于yarn调度资源
  */
 SparkConf conf = new SparkConf();
 conf.setMaster("local");
 conf.setAppName("任务名称");
 conf.set("内存", "10G");
 
 /*
  * sparkContext 是通往集群的唯一通道,创建一个RDD
  */
 JavaSparkContext sc = new JavaSparkContext(conf);
 //一行数据
 JavaRDD<String> lines=sc.textFile("words");
 //切分
 JavaRDD<String> words=lines.flatMap(new FlatMapFunction<String,String>() {
 //<进String(一行的数据) 出String类型(一个单词)>

  @Override
  //第一个String 对应出的String 第二个String对应进的String
  public Iterable<String> call(String line) throws Exception {
   // TODO Auto-generated method stub
   return Arrays.asList(line.split(" "));
  }
 });
 
 //在Java中如果想让某个RDD转成K,V格式,使用xxxToPair  eg:mapToPair
 //计数
              //String ->单词        String->k   Integer->v
 JavaPairRDD<String, Integer> pairwords=words.mapToPair(new PairFunction<String,String,Integer>() {
  @Override
  //<k,v>                              (单词)
  public Tuple2<String, Integer> call(String word) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String, Integer>(word, 1);
  }
 });
 
 
 /*
  * 将相同的key分组
  * 对每一组的key对应的value按照自定义去处理
  */
 //聚合(懒执行)								1        1         2
 JavaPairRDD<String, Integer> reduce=pairwords.reduceByKey(new Function2<Integer, Integer, Integer>() {
  /*
   * 1 1 1
   * 1+1=2 1
   * 2 1
   * 3
   */
  @Override
  //			2			1			1
  public Integer call(Integer v1, Integer v2) throws Exception {
   // TODO Auto-generated method stub
   return v1+v2;
  }
 });
 
 //排序(懒执行)
 /*
  * (hello,5)------->(5,hello)
  */
  JavaPairRDD<Integer, String>mapToPair=reduce.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

  @Override
  public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
   // TODO Auto-generated method stub
   return tuple.swap();//换位置
  }
 
 });
 JavaPairRDD<Integer, String>sorBykey=mapToPair.sortByKey(false);
 //换回去(懒执行)
 JavaPairRDD<String, Integer> reducesort=sorBykey.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

  @Override
  public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
   // TODO Auto-generated method stub
   return tuple.swap();//换位置
  }
 
 });
 //action算子,触发(Transformation算子)懒执行
 reducesort.foreach(new VoidFunction<Tuple2<String,Integer>>() {
  
  @Override
  public void call(Tuple2<String, Integer> arg0) throws Exception {
   // TODO Auto-generated method stub
   System.out.println(arg0);
  }
 });
 sc.stop();
}
}

算子

package spark;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class suanzi {
public static void main(String[] args) {
 SparkConf conf = new SparkConf();
 conf.setMaster("local");
 conf.setAppName("任务名称");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaRDD<String>lines=sc.textFile("./words");
 
 //取前5个
 List<String> list=lines.take(5);
 for (String string : list) {
  System.out.println(list);
 }
 
 //过滤
 JavaRDD<String> data=lines.filter(new Function<String, Boolean>() {
  
  @Override
  public Boolean call(String arg0) throws Exception {
   // TODO Auto-generated method stub
   return arg0.equals("hello zhazhalong");
  }
 });
}
}

分享到:
赞(0) 打赏

评论 5

评论前必须登录!

 

  1. #1

    以后多多交流

    我是你哥6个月前 (03-27)
  2. #2

    给你点赞

    hello6个月前 (03-28)
  3. #3

    挺明白的

    靓妹6个月前 (03-28)
  4. #4

    你哥6个月前 (03-28)
  5. #5

    hello6个月前 (03-29)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00