使用sparkstreaming统计单词
并且实现动态改变广播变量
nc -lk 9999
开启端口,并且输入若干个单词
package spark; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class sparkstreaming { public static void main(String[] args) { SparkConf conf = new SparkConf(); //两个线程 conf.setMaster("local[2]").setAppName("sparkstreaming"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jsc=new JavaStreamingContext(sc,Durations.seconds(5));//5秒一批次 JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999); JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word,1); } }); JavaPairDStream<String, Integer> reduceByKey = pairword.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer arg0, Integer arg1) throws Exception { // TODO Auto-generated method stub return arg0+arg1; } }); // reduceByKey.print(); reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() { /* * foreachRDD可以拿到df的rdd,然后可以使用rdd的算子 * 要对拿到的rdd使用action算子触发执行 * foreavhRDD中call方法内,拿到的RDD的算子外,代码是在Driver端执行 */ @Override public void call(JavaPairRDD<String, Integer> rdd) throws Exception { System.out.println("Driver"); /* * 5秒一次 * 可以使用这个算子通过修改读取文件实现动态改变广播变量 */ /* * 获取sc */ SparkContext context=rdd.context(); JavaSparkContext javasparkcontext=new JavaSparkContext(context); Broadcast<String> broadcast = javasparkcontext.broadcast("hello");//广播出去 String value=broadcast.value(); System.out.println(value); // TODO Auto-generated method stub JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception { // TODO Auto-generated method stub System.out.println("excuter"); return new Tuple2<String, Integer>(tuple._1+"~",tuple._2); } }); mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); } }); } }); jsc.start(); jsc.awaitTermination(); } }
我加你了哦
以后多发点哦
看了那么多博客,就你的能看懂