SparkStreaming

首页 » Spark » SparkStreaming

使用sparkstreaming统计单词

并且实现动态改变广播变量

nc -lk 9999

开启端口,并且输入若干个单词

package spark;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class sparkstreaming {
public static void main(String[] args) {
 SparkConf conf = new SparkConf();
 //两个线程
 conf.setMaster("local[2]").setAppName("sparkstreaming");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaStreamingContext jsc=new JavaStreamingContext(sc,Durations.seconds(5));//5秒一批次
 JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999);
 
 JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() {

  @Override
  public Iterable<String> call(String line) throws Exception {
   // TODO Auto-generated method stub
   return Arrays.asList(line.split(" "));
  }
 });
 JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() {

  @Override
  public Tuple2<String, Integer> call(String word) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String, Integer>(word,1);
  }
 });
 JavaPairDStream<String, Integer> reduceByKey = pairword.reduceByKey(new Function2<Integer, Integer, Integer>() {
  
  @Override
  public Integer call(Integer arg0, Integer arg1) throws Exception {
   // TODO Auto-generated method stub
   return arg0+arg1;
  }
 });
 
//	reduceByKey.print();
 reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {
/*
 * foreachRDD可以拿到df的rdd,然后可以使用rdd的算子
 * 要对拿到的rdd使用action算子触发执行
 * foreavhRDD中call方法内,拿到的RDD的算子外,代码是在Driver端执行
 */
  @Override
  public void call(JavaPairRDD<String, Integer> rdd) throws Exception {
   System.out.println("Driver");
   /*
    * 5秒一次
    * 可以使用这个算子通过修改读取文件实现动态改变广播变量
    */
   
   /*
    * 获取sc
    */
   SparkContext context=rdd.context();
   JavaSparkContext javasparkcontext=new JavaSparkContext(context);
   Broadcast<String> broadcast = javasparkcontext.broadcast("hello");//广播出去
   String value=broadcast.value();
   System.out.println(value);
   
   // TODO Auto-generated method stub
   JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {

    @Override
    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception {
     // TODO Auto-generated method stub
     System.out.println("excuter");
     return new Tuple2<String, Integer>(tuple._1+"~",tuple._2);
    }
   });
   mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() {

    @Override
    public void call(Tuple2<String, Integer> arg0) throws Exception {
     // TODO Auto-generated method stub
     System.out.println(arg0);
    }
   });
  }
  
 });
 
 jsc.start();
 jsc.awaitTermination();
 
}
}

 

分享到:
赞(0) 打赏

评论 3

评论前必须登录!

 

  1. #1

    我加你了哦

    小白6个月前 (04-03)
  2. #2

    以后多发点哦

    奋斗6个月前 (04-03)
  3. #3

    看了那么多博客,就你的能看懂

    白云6个月前 (04-03)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00