Data transformation, Scala collections, and SQL

Data transformation is one of the 3 steps in ETL (extract, transform, load) — a process for getting raw data from heterogeneous sources (e.g. databases, text files),  process or transform, then loading it to the final source (e.g. in a format ready for further modelling or analysis). While there exist a plethora of languages for this task, this post describes a minimal set of operations that can be used for the purpose. Concrete examples are given using HiveQL, a variant of the popular query language SQL, and the Scala collections.

Our running example will be a simple dataset containing student records.

1. Data representation

With SQL, it’s obvious that each student record will be represented as a row in a table. Suppose we know the student ID, name, year of birth, and enrolment year, we define the following Student table:

create table Student(
  id string, 
  name string,
  birth_year,
  enrol_year
); 

With Scala, each student record can simply be recorded as a tuple or a case class. We’ll use case class in this example because later on we want to access each field of a record by name.

case class Student(id: String, name: String, birthYear: Int, enrolYear: Int)

The entire set of student records is stored as a table in SQL, and as a collection in Scala. Note also that the type of each field is available in both representations.

2. Example data

Let’s assume our data has 4 students as created by the below code.


val students = List[Student](Student("1", "Alice", 1990, 2015),
    Student("2", "Bob", 1991, 2016), 
    Student("3", "Cathy", 1990, 2015), 
    Student("4", "David", 1991, 2014))

The SQL code is omitted for convenience as it’s not part of the operations we’re interested in.

3. Selection

The first operation is Select. Let’s say we want to find all students with ID less than “3”.

In SQL:

 

select * from Student where id < "3";

In Scala:

students.filter(s => s.id < "3")


The underlying implementation for this operation may simply traverse the list (in Scala) or read each line of the table (in SQL), then check if each element satisfies the condition. In a specialised database, such as MySQL, there can be optimisation such as indexing (based on the ID field) to allow more efficient search. However, the high-level abstractions in both languages are much similar.

4. Projection

To select a subset of fields in SQL, we just need to specify which column names to keep. Let’s say for the previous query we only want the student name.

select name from Student where id < "3";


In Scala:

students.filter(s => s.id < "3").map(s => s.name)


If you’re new to Scala, it may not be easy to understand the above code. Let’s break it down into two expressions:


val studentsWithSmallIds: List[Student] = students.filter(s => s.id < "3")
val studentNames: List[String] = studentsWithSmallIds.map(s => s.name)

The first line filters the list based on the condition (s.id < “3”), for each element s of the list. Then the second line applies the function which returns s.name for each elemnt s, hence the result type if of type List[String].

5. Group By / Aggregations

Another common operation is to group the data based on some fields and performs aggregations such as counting the number of elements in each group.

Let’s say we want to count the number of students that were born in each year. This can be done easily in SQL:

select birth_year, count(*) as cnt from Student group by birth_year;

In Scala:

scala> val groups: Map[Int, List[Student]] = students.groupBy(s => s.birthYear)
res27: scala.collection.immutable.Map[Int,List[Student]] = Map(1991 -> List(Student(2,Bob,1991,2016), Student(4,David,1991,2014)), 1990 -> List(Student(1,Alice,1990,2015), Student(3,Cathy,1990,2015)))
scala> val countByYear: Map[Int, Int] = groups.map{ case (birthYear, xs) => (birthYear, xs.length) }
res29: scala.collection.immutable.Map[Int,Int] = Map(1991 -> 2, 1990 -> 2)


The first line transforms the list into a map whose the key is the field that we want to group by, and the value is a list of Students with a same key.
Then we apply the function case (birthYear, xs) => (birthYear, xs.length) to each element of the resulting map (groups). The function returns a tuple with two elements, which are then implicitly converted into the map countByYear.

6. Join

The last operation we consider is join. Let’s introduce another type of records that some students loathe but some absolutely love — the GPA record. Each record contains student ID and GPA, like so

case class GPA(id: String, gpa: Float)
val gpas = List(GPA("1.0", 1.0f), GPA("2.0", 2.0f), GPA("3.0", 3.0f), GPA("4.0", 4.0f))

Join is supported natively in SQL, so if we want to join Student and GPA we can simply write


select t1.*, t2.gpa from Student t1 join GPA t2 on t1.id = t2.id;

There is no native join operation in Scala, so we’ll implement one by ourselves. It’s easy to do this for this particular example: for example, we can iterate through the Student list and the GPA list, and select those that match on ID:

scala> for (s <- students; g <- gpas; if (s.id == g.id)) yield (s.id, s.name, s.birthYear, s.enrolYear, g.gpa)

res55: List[(String, String, Int, Int, Float)] = List((1,Alice,1990,2015,1.0), (2,Bob,1991,2016,2.0), (3,Cathy,1990,2015,3.0), (4,David,1991,2014,4.0))

7. Generic Join in Scala

The code in the previous section for joining students and their GPAs are all well and good, except that they are not general enough. If we were to join two different collections, we’ll have to repeat the above code with some modification to match on the right key. In this section we’ll study how to implement a generic join in Scala as a fun exercise.

First we will implement join for two Maps: m1 of type [K, V1] and m2 of type [K, V2]. Note that the map share the same key type but can have different value types.

We can define the join function as:

def join[K, V1, V2](m1: Map[K, List[V1]], m2: Map[K, List[V2]]): Map[K, (V1, V2)] = {
   for ((k1, v1) <- m1; (k2, v2) <- m2; if (k1 == k2))
     yield (k1, v1.flatMap(x => v2.map( y => (x, y))))
}


A slightly more complicated but perhaps more functional way of implementing join (without using the for expression) is:


def join[K, V1, V2](m1: Map[K, List[V1]], m2: Map[K, List[V2]]): Map[K, (V1, V2)] = {

  m1.map {

    case (k, v1) => m2.get(k) match {

      case Some(v2) => (k, v1.flatMap( x => v2.map( y => (x, y))))

   }

}

Now to join two lists, we just need to specify what is the key of each element and convert both of them to Maps, where map entry is a binding from a key to all elements of a list with that same key. Coming back to our Student example, the Scala code to use the above join would be:

scala> val studentMap = students.groupBy(s => s.id)
scala> val gpaMap = gpas.groupBy(g => g.id)
scala> val studentWithGPA = join(studentMap, gpaMap)
studentWithGPA: scala.collection.immutable.Map[String,List[(Student, GPA)]] = Map(2 -> List((Student(2,Bob,1991,2016),GPA(2,2.0))), 1 -> List((Student(1,Alice,1990,2015),GPA(1,1.0))), 4 -> List((Student(4,David,1991,2014),GPA(4,4.0))), 3 -> List((Student(3,Cathy,1990,2015),GPA(3,3.0))))

Conclusion

 We see through this post how Scala collections can be used to implement common data transformation operations, much like a SQL language. Scala makes it convenient to write a DSL (domain-specific-language) using Scala. This is the case with Scalding, a data ETL framework built on top of Cascading and Hadoop MapReduce. In fact, the main difference in the operations you see in this post and those in Scalding is that the actual Scolding implementation is for distributed systems like Hadoop cluster.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s