SparkStreaming

2020-03-30 272次浏览 已收录 3个评论

使用sparkstreaming统计单词

并且实现动态改变广播变量

nc -lk 9999

开启端口,并且输入若干个单词

package spark;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class sparkstreaming {
public static void main(String[] args) {
 SparkConf conf = new SparkConf();
 //两个线程
 conf.setMaster("local[2]").setAppName("sparkstreaming");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaStreamingContext jsc=new JavaStreamingContext(sc,Durations.seconds(5));//5秒一批次
 JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999);
 
 JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() {

  @Override
  public Iterable<String> call(String line) throws Exception {
   // TODO Auto-generated method stub
   return Arrays.asList(line.split(" "));
  }
 });
 JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() {

  @Override
  public Tuple2<String, Integer> call(String word) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String, Integer>(word,1);
  }
 });
 JavaPairDStream<String, Integer> reduceByKey = pairword.reduceByKey(new Function2<Integer, Integer, Integer>() {
  
  @Override
  public Integer call(Integer arg0, Integer arg1) throws Exception {
   // TODO Auto-generated method stub
   return arg0+arg1;
  }
 });
 
//	reduceByKey.print();
 reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {
/*
 * foreachRDD可以拿到df的rdd,然后可以使用rdd的算子
 * 要对拿到的rdd使用action算子触发执行
 * foreavhRDD中call方法内,拿到的RDD的算子外,代码是在Driver端执行
 */
  @Override
  public void call(JavaPairRDD<String, Integer> rdd) throws Exception {
   System.out.println("Driver");
   /*
    * 5秒一次
    * 可以使用这个算子通过修改读取文件实现动态改变广播变量
    */
   
   /*
    * 获取sc
    */
   SparkContext context=rdd.context();
   JavaSparkContext javasparkcontext=new JavaSparkContext(context);
   Broadcast<String> broadcast = javasparkcontext.broadcast("hello");//广播出去
   String value=broadcast.value();
   System.out.println(value);
   
   // TODO Auto-generated method stub
   JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {

    @Override
    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception {
     // TODO Auto-generated method stub
     System.out.println("excuter");
     return new Tuple2<String, Integer>(tuple._1+"~",tuple._2);
    }
   });
   mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() {

    @Override
    public void call(Tuple2<String, Integer> arg0) throws Exception {
     // TODO Auto-generated method stub
     System.out.println(arg0);
    }
   });
  }
  
 });
 
 jsc.start();
 jsc.awaitTermination();
 
}
}

 


渣渣龙, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:SparkStreaming
喜欢 (0)

您必须 登录 才能发表评论!

(3)个小伙伴在吐槽
  1. 我加你了哦
    小白2020-04-03 09:40
  2. 以后多发点哦
    奋斗2020-04-03 09:41
  3. 看了那么多博客,就你的能看懂
    白云2020-04-03 09:41