1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象
2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state
如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能
package spark; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; /* * 功能 * 1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state * 如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能 */ public class updatestate { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("updatestate"); JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(15)); /* * 设置checkpoint目录 * 将内存中的数据(每一个key所对应的状态)写入到磁盘一份的间隔时间 * 1、如果设置的bacthInterval小于10s,那么每10s会将内存中的数据写入磁盘一份 * 2、如果bacthInterval大于10s,那么就以bacthInterval 15s为准 * 这样可以防止频繁的写入hdfs */ jsc.checkpoint("./checkpoint"); // jsc.checkpoint("hdfs://node01:9000:/spark"); JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999); JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word,1); } }); JavaPairDStream<String,Integer> counts=pairword.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { private static final long serialVersionUID = 1L; @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { // TODO Auto-generated method stub /* * values:经过分组最后这个key所对应的value【1,1,1,1,1】 * state:这个key在本次之前的状态 */ Integer update=0; if(state.isPresent()) { //判断state上一次是否有值 update=state.get(); } for(Integer value : values) { //累加 update+=value; } return Optional.of(update); } }); counts.print(); jsc.start(); jsc.awaitTermination(); } }
最好再详细点
奥利给
不错
以后多发点哦