任何人都可以告诉我,如果有任何方式在apache的火花存储在mysql数据库的JavaRDD?我从2个CSV文件中获取输入,然后在对其内容进行连接操作之后,我需要将输出(输出JavaRDD)保存在mysql数据库中。我已经能够在hdfs上成功保存输出,但是我没有找到任何有关apache Spark-MYSQL连接的信息。下面我发布spark sql的代码。这可能会成为那些正在寻找spark-sql示例的人员的参考。
package attempt1;
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
public class Spark_Mysql {
@SuppressWarnings("serial")
public static class CompleteSample implements Serializable {
private String ASSETNUM;
private String ASSETTAG;
private String CALNUM;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getASSETTAG() {
return ASSETTAG;
}
public void setASSETTAG(String aSSETTAG) {
ASSETTAG = aSSETTAG;
}
public String getCALNUM() {
return CALNUM;
}
public void setCALNUM(String cALNUM) {
CALNUM = cALNUM;
}
}
@SuppressWarnings("serial")
public static class ExtendedSample implements Serializable {
private String ASSETNUM;
private String CHANGEBY;
private String CHANGEDATE;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getCHANGEBY() {
return CHANGEBY;
}
public void setCHANGEBY(String cHANGEBY) {
CHANGEBY = cHANGEBY;
}
public String getCHANGEDATE() {
return CHANGEDATE;
}
public void setCHANGEDATE(String cHANGEDATE) {
CHANGEDATE = cHANGEDATE;
}
}
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
JavaRDD cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
new Function() {
public CompleteSample call(String line) throws Exception {
String[] parts = line.split(",");
CompleteSample cs = new CompleteSample();
cs.setASSETNUM(parts[0]);
cs.setASSETTAG(parts[1]);
cs.setCALNUM(parts[2]);
return cs;
}
});
JavaRDD es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
new Function() {
public ExtendedSample call(String line) throws Exception {
String[] parts = line.split(",");
ExtendedSample es = new ExtendedSample();
es.setASSETNUM(parts[0]);
es.setCHANGEBY(parts[1]);
es.setCHANGEDATE(parts[2]);
return es;
}
});
JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
complete.registerAsTable("cs");
JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
extended.registerAsTable("es");
JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
JavaRDD result = fs.map(new Function() {
public String call(Row row) {
return row.getString(0);
}
});
result.saveAsTextFile("hdfs://path/to/hdfs/dir-name"); //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection
}
}这里最后我将结果保存在HDFS中。但现在我想保存到MYSQL数据库中。请帮助我。谢谢