spark监控目录的对象是程序运行起来后目录文件的添加,已有的文件进行修改保存这个是不被监控到的。
package spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.dstream.DStream; public class 监控目录 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("监控目录"); JavaStreamingContext jsc= new JavaStreamingContext(conf,Durations.seconds(5)); JavaDStream<String> textFileStream=jsc.textFileStream("data"); textFileStream.filter(new Function<String, Boolean>() { @Override public Boolean call(String data) throws Exception { // TODO Auto-generated method stub return data.startsWith("hello zha");//过滤 } }).print(100); DStream<String> dstream = textFileStream.dstream(); dstream.saveAsTextFiles("save/前缀", "后缀"); jsc.start(); jsc.awaitTermination(); jsc.close(); } }

挺明白的