package spark; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; public class dfmysql { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); /* * 配置join或者聚合操作shuffle数据时分区的数量 */ conf.set("spark.sql.shuffle.partitions","100"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlcontext= new SQLContext(sc); Map<String,String> opt= new HashMap<String,String>(); opt.put("url", "jdbc:mysql://192.168.2.14:3306/spark"); opt.put("driver", "com.mysql.jdbc.Driver"); opt.put("user","root"); opt.put("password","933339"); opt.put("dbtable","score"); DataFrame score=sqlcontext.read().format("jdbc").options(opt).load(); score.show(); score.registerTempTable("linshibiao1"); //第二种方式 DataFrameReader reader = sqlcontext.read().format("jdbc"); reader.option("url","jdbc:mysql://192.168.2.14:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user","root"); reader.option("password","933339"); reader.option("dbtable","person"); DataFrame person=reader.load(); person.show(); person.registerTempTable("linshibiao2"); DataFrame result=sqlcontext.sql("select linshibiao2.id,linshibiao2.name,linshibiao2.age,linshibiao1.score from linshibiao1,linshibiao2 where linshibiao1.name=linshibiao2.name"); result.show(); //将dataframe结果保存到mysql中 Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "933339"); /* * SaveMode: * Overwrite:覆盖 * Append:追加 * ErrorIfExists:如果存在就报错 * Ignore:如果存在就忽略 */ result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.2.14:3306/spark","result2",properties); System.out.println("----finish"); sc.stop(); } }