900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > mysql schema 保存数据_如何在mysql数据库中保存apache spark schema输出

mysql schema 保存数据_如何在mysql数据库中保存apache spark schema输出

时间:2020-07-31 19:26:28

相关推荐

mysql schema 保存数据_如何在mysql数据库中保存apache spark schema输出

任何人都可以告诉我,如果有任何方式在apache的火花存储在mysql数据库的JavaRDD?我从2个CSV文件中获取输入,然后在对其内容进行连接操作之后,我需要将输出(输出JavaRDD)保存在mysql数据库中。我已经能够在hdfs上成功保存输出,但是我没有找到任何有关apache Spark-MYSQL连接的信息。下面我发布spark sql的代码。这可能会成为那些正在寻找spark-sql示例的人员的参考。

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.api.java.JavaSQLContext;

import org.apache.spark.sql.api.java.JavaSchemaRDD;

import org.apache.spark.sql.api.java.Row;

public class Spark_Mysql {

@SuppressWarnings("serial")

public static class CompleteSample implements Serializable {

private String ASSETNUM;

private String ASSETTAG;

private String CALNUM;

public String getASSETNUM() {

return ASSETNUM;

}

public void setASSETNUM(String aSSETNUM) {

ASSETNUM = aSSETNUM;

}

public String getASSETTAG() {

return ASSETTAG;

}

public void setASSETTAG(String aSSETTAG) {

ASSETTAG = aSSETTAG;

}

public String getCALNUM() {

return CALNUM;

}

public void setCALNUM(String cALNUM) {

CALNUM = cALNUM;

}

}

@SuppressWarnings("serial")

public static class ExtendedSample implements Serializable {

private String ASSETNUM;

private String CHANGEBY;

private String CHANGEDATE;

public String getASSETNUM() {

return ASSETNUM;

}

public void setASSETNUM(String aSSETNUM) {

ASSETNUM = aSSETNUM;

}

public String getCHANGEBY() {

return CHANGEBY;

}

public void setCHANGEBY(String cHANGEBY) {

CHANGEBY = cHANGEBY;

}

public String getCHANGEDATE() {

return CHANGEDATE;

}

public void setCHANGEDATE(String cHANGEDATE) {

CHANGEDATE = cHANGEDATE;

}

}

@SuppressWarnings("serial")

public static void main(String[] args) throws Exception {

JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");

JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

JavaRDD cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(

new Function() {

public CompleteSample call(String line) throws Exception {

String[] parts = line.split(",");

CompleteSample cs = new CompleteSample();

cs.setASSETNUM(parts[0]);

cs.setASSETTAG(parts[1]);

cs.setCALNUM(parts[2]);

return cs;

}

});

JavaRDD es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(

new Function() {

public ExtendedSample call(String line) throws Exception {

String[] parts = line.split(",");

ExtendedSample es = new ExtendedSample();

es.setASSETNUM(parts[0]);

es.setCHANGEBY(parts[1]);

es.setCHANGEDATE(parts[2]);

return es;

}

});

JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);

complete.registerAsTable("cs");

JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);

extended.registerAsTable("es");

JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");

JavaRDD result = fs.map(new Function() {

public String call(Row row) {

return row.getString(0);

}

});

result.saveAsTextFile("hdfs://path/to/hdfs/dir-name"); //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

}

}这里最后我将结果保存在HDFS中。但现在我想保存到MYSQL数据库中。请帮助我。谢谢

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。