Unfortunately there is no way to create Spark data objects (rdd's, dataframe's) from arrays. (it is possible to create Spark objects from sequences of rows!)
The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:
udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')
Then start up the Spark-shell and run this code to load the RDD's:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import java.time.LocalDate
import java.time.format.DateTimeFormatter
case class Udf( uid:Int, name:String)
case class Tdf( xid:Int, uid:Int, amount:Double, date:java.sql.Timestamp)
val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse_udf(line:String) = {
val _field=line.split(',')
val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
val name = if (_field(1).isEmpty) "" else _field(1)
Udf ( uid, name)
}
def parse_tdf(line:String) = {
val _field=line.split(',')
val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l) else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
Tdf ( xid, uid, amount, date)
}
// udf ---------------------------------------------
val udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
map(parse_udf(_))
// tdf ---------------------------------------------
val tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
map(parse_tdf(_))
Same remark as for Spark RDD: unfortunately there is no way to create Spark data objects from arrays. (it is possible to create Spark objects from sequences of rows!). The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:
udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')
Run the following in the Spark shell:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val udf_schema = StructType(
StructField("uid",IntegerType,false) ::
StructField("name",StringType,false) :: Nil
)
val tdf_schema = StructType(
StructField("xid",IntegerType,false) ::
StructField("uid",IntegerType,false) ::
StructField("amount",DoubleType,false) ::
StructField("date",TimestampType,false) :: Nil
)
val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse_udf(line:String) = {
val _field=line.split(',')
val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
val name = if (_field(1).isEmpty) "" else _field(1)
Row ( uid, name)
}
def parse_tdf(line:String) = {
val _field=line.split(',')
val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l)
else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
Row ( xid, uid, amount, date)
}
val sx= new org.apache.spark.sql.SQLContext(sc)
// udf ---------------------------------------------
val in_udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
map(parse_udf(_))
val udf_df = sx.createDataFrame(in_udf_rdd, udf_schema)
udf_df.registerTempTable("t_udf")
// tdf ---------------------------------------------
val in_tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
map(parse_tdf(_))
val tdf_df = sx.createDataFrame(in_tdf_rdd, tdf_schema)
tdf_df.registerTempTable("t_tdf")
Et voila: the dataframes udf_df and tdf_df and the relational tables t_udf and t_tdf.