updateStateByKey

1年前 (2020-04-04) 401次浏览 已收录 4个评论

1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象
2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state
如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能

package spark;

import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;

/*
 * 功能
 * 1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象
 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state
 * 如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能
 */
public class updatestate {
public static void main(String[] args) {
 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("updatestate");
 JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(15));
 /*
  * 设置checkpoint目录
  * 将内存中的数据(每一个key所对应的状态)写入到磁盘一份的间隔时间
  * 1、如果设置的bacthInterval小于10s,那么每10s会将内存中的数据写入磁盘一份
  * 2、如果bacthInterval大于10s,那么就以bacthInterval 15s为准
  * 这样可以防止频繁的写入hdfs
  */
 jsc.checkpoint("./checkpoint");
//	jsc.checkpoint("hdfs://node01:9000:/spark");
 
JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999);
 
 JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() {

  @Override
  public Iterable<String> call(String line) throws Exception {
   // TODO Auto-generated method stub
   return Arrays.asList(line.split(" "));
  }
 });
 JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() {

  @Override
  public Tuple2<String, Integer> call(String word) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String, Integer>(word,1);
  }
 });
 JavaPairDStream<String,Integer> counts=pairword.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
   // TODO Auto-generated method stub
      /*
       * values:经过分组最后这个key所对应的value【1,1,1,1,1】
       * state:这个key在本次之前的状态
       */
      Integer update=0;
      if(state.isPresent()) {
       //判断state上一次是否有值
       update=state.get();
      }
      for(Integer value : values) {
       //累加
       update+=value;
      }
      return Optional.of(update);
  }

 });
 counts.print();
 jsc.start();
 jsc.awaitTermination();
}
}

 


渣渣龙, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:updateStateByKey
喜欢 (0)

您必须 登录 才能发表评论!

(4)个小伙伴在吐槽
  1. 最好再详细点
    沥青2020-04-06 09:57
  2. 奥利给
    小蚯蚓2020-04-06 10:00
  3. 不错
    笨鸟先飞2020-04-06 10:05
  4. 以后多发点哦
    渣渣辉2020-04-06 10:06