updateStateByKey

首页 » Spark » updateStateByKey

1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象
2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state
如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能

package spark;

import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;

/*
 * 功能
 * 1、为Spark Streaming中每一个key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象
 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,spark streaming会在使用updatestate的时候为已经存在的key进行state
 * 如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,所以需要开启checkpoinnt机制和功能
 */
public class updatestate {
public static void main(String[] args) {
 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("updatestate");
 JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(15));
 /*
  * 设置checkpoint目录
  * 将内存中的数据(每一个key所对应的状态)写入到磁盘一份的间隔时间
  * 1、如果设置的bacthInterval小于10s,那么每10s会将内存中的数据写入磁盘一份
  * 2、如果bacthInterval大于10s,那么就以bacthInterval 15s为准
  * 这样可以防止频繁的写入hdfs
  */
 jsc.checkpoint("./checkpoint");
//	jsc.checkpoint("hdfs://node01:9000:/spark");
 
JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("192.168.2.14", 9999);
 
 JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() {

  @Override
  public Iterable<String> call(String line) throws Exception {
   // TODO Auto-generated method stub
   return Arrays.asList(line.split(" "));
  }
 });
 JavaPairDStream<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() {

  @Override
  public Tuple2<String, Integer> call(String word) throws Exception {
   // TODO Auto-generated method stub
   return new Tuple2<String, Integer>(word,1);
  }
 });
 JavaPairDStream<String,Integer> counts=pairword.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
   // TODO Auto-generated method stub
      /*
       * values:经过分组最后这个key所对应的value【1,1,1,1,1】
       * state:这个key在本次之前的状态
       */
      Integer update=0;
      if(state.isPresent()) {
       //判断state上一次是否有值
       update=state.get();
      }
      for(Integer value : values) {
       //累加
       update+=value;
      }
      return Optional.of(update);
  }

 });
 counts.print();
 jsc.start();
 jsc.awaitTermination();
}
}

 

分享到:
赞(0) 打赏

评论 4

评论前必须登录!

 

  1. #1

    最好再详细点

    沥青8个月前 (04-06)
  2. #2

    奥利给

    小蚯蚓8个月前 (04-06)
  3. #3

    不错

    笨鸟先飞8个月前 (04-06)
  4. #4

    以后多发点哦

    渣渣辉8个月前 (04-06)

觉得文章有用就打赏一下弟弟吧

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.5主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
正在播放:

作者想对您说:

累了就停下来听首歌吧

听完后会给您一个好心情

最后

等到您不容易

还希望您能多待一会儿

      00:00/00:00