Framed Data
 
98_outtake
20160601

OUTTAKE

This was first filed under '02a create data', when I was convinced that it was not possible to create spark objects from columns of values: it is possible to create them from rows of values..

This section is just kept for 'historical reference'...

Create dataframe from arrays

Heres' how to create dataframes udf and tdf from CSV FILES (and not from an array like shown in section 2a)

Unfortunately there is no way to create Spark data objects (rdd's, dataframe's) from arrays. (it is possible to create Spark objects from sequences of rows!)

The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:

udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')

Then start up the Spark-shell and run this code to load the RDD's:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

case class Udf( uid:Int, name:String)
case class Tdf( xid:Int, uid:Int, amount:Double, date:java.sql.Timestamp)

val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse_udf(line:String) = {
    val _field=line.split(',')
    val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val name = if (_field(1).isEmpty) "" else _field(1)
    Udf ( uid, name)
}

def parse_tdf(line:String) = {
    val _field=line.split(',')
    val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
    val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
    val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l) else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
    Tdf ( xid, uid, amount, date)
}

// udf ---------------------------------------------
val udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
    map(parse_udf(_))

// tdf ---------------------------------------------
val tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
    map(parse_tdf(_))

Same remark as for Spark RDD: unfortunately there is no way to create Spark data objects from arrays. (it is possible to create Spark objects from sequences of rows!). The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:

udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')

Run the following in the Spark shell:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val udf_schema = StructType(
    StructField("uid",IntegerType,false)  ::
    StructField("name",StringType,false) :: Nil
    )

val tdf_schema = StructType(
    StructField("xid",IntegerType,false)  ::
    StructField("uid",IntegerType,false)  ::
    StructField("amount",DoubleType,false)  ::
    StructField("date",TimestampType,false) :: Nil
    )

val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse_udf(line:String) = {
    val _field=line.split(',')
    val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val name = if (_field(1).isEmpty) "" else _field(1)
    Row ( uid, name) 
}

def parse_tdf(line:String) = {
    val _field=line.split(',')
    val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
    val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
    val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l) 
               else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
    Row ( xid, uid, amount, date)
}


val sx= new org.apache.spark.sql.SQLContext(sc)

// udf ---------------------------------------------
val in_udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
    map(parse_udf(_))
val udf_df = sx.createDataFrame(in_udf_rdd, udf_schema)
udf_df.registerTempTable("t_udf")


// tdf ---------------------------------------------
val in_tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
    map(parse_tdf(_))
val tdf_df = sx.createDataFrame(in_tdf_rdd, tdf_schema)
tdf_df.registerTempTable("t_tdf")

Et voila: the dataframes udf_df and tdf_df and the relational tables t_udf and t_tdf.

..

Sidenote: create fake data

The above user and transaction data was created using the following script, which employs the fake-factory package ( 'pip install fake-factory') to generated random data

The python code: 

from faker import Factory
import pandas as pd
import random

fake = Factory.create('sv_SE')

# create user data
uid_v=[]
name_v=[]
for i in range(0,9):
    uid_v.append(9000+i)
    name_v.append(fake.name())

# create transaction data
xid_v=[]
uid_v=[]
amount_v=[]
date_v=[]
sign=[-1,1]
for i in range(0,21):
    xid_v.append(5000+i)
    amount_v.append(sign[random.randint(0,1)]*random.randint(80,900))
    uid_v.append(id_v[random.randint(0,len(id_v)-1)])
    date_v.append(str(fake.date_time_this_year()))
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/frameddata at 2016-10-18 07:18