Spark DataFrames

References

DataFrames are :

  • The preferred abstraction in Spark
  • Strongly typed collection of distributed elements
  • Built on Resilient Distributed Datasets (RDD)
  • Immutable once constructed

With Dataframes you can :

  • Track lineage information to efficiently recompute lost data
  • Enable operations on collection of elements in parallel

You construct DataFrames

  • by parallelizing existing collections (e.g., Pandas DataFrames)
  • by transforming an existing DataFrames
  • from files in HDFS or any other storage system (e.g., Parquet)

Features

  • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
  • Support for a wide array of data formats and storage systems
  • Seamless integration with all big data tooling and infrastructure via Spark
  • APIs for Python, Java, Scala, and R

DataFrames versus RDDs

  • Nice API for new users familiar with data frames in other programming languages.
  • For existing Spark users, the API will make Spark easier to program than using RDDs
  • For both sets of users, DataFrames will improve performance through intelligent optimizations and code-generation

PySpark Shell

Run the Spark shell:

pyspark

Output similar to the following will be displayed, followed by a >>> REPL prompt:

Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2018-09-18 17:13:13 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.
>>>

Read data and convert to Dataset

df = sqlContext.read.csv("/tmp/irmar.csv", sep=';', header=True)
>>> df2.show()
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|_c0|                name|       phone|office|organization|position|  hdr|    team1|   team2|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|  0|      Alphonse Paul |+33223235223|   214|          R1|     DOC|False|      EDP|      NA|
|  1|        Ammari Zied |+33223235811|   209|          R1|      MC| True|      EDP|      NA|
.
.
.
| 18|    Bernier Joachim |+33223237558|   214|          R1|     DOC|False|   ANANUM|      NA|
| 19|   Berthelot Pierre |+33223236043|   601|          R1|      PE| True|       GA|      NA|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
only showing top 20 rows

Transformations, Actions, Laziness

Like RDDs, DataFrames are lazy. Transformations contribute to the query plan, but they don’t execute anything. Actions cause the execution of the query.

Transformation examples

  • filter
  • select
  • drop
  • intersect
  • join ### Action examples
  • count
  • collect
  • show
  • head
  • take

Creating a DataFrame in Python

import sys, subprocess
import os

os.environ["PYSPARK_PYTHON"] = sys.executable
from pyspark import SparkContext, SparkConf, SQLContext
# The following three lines are not necessary
# in the pyspark shell
conf = SparkConf().setAppName("people").setMaster("local[*]") 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
df = sqlContext.read.json("data/people.json") # get a dataframe from json file

df.show(24)

Schema Inference

In this exercise, let’s explore schema inference. We’re going to be using a file called irmar.txt. The data is structured, but it has no self-describing schema. And, it’s not JSON, so Spark can’t infer the schema automatically. Let’s create an RDD and look at the first few rows of the file.

rdd = sc.textFile("data/irmar.csv")
for line in rdd.take(10):
  print(line)

Hands-on Exercises

You can look at the DataFrames API documentation

Let’s take a look to file “/tmp/irmar.csv”. Each line consists of the same information about a person:

  • name
  • phone
  • office
  • organization
  • position
  • hdr
  • team1
  • team2
from collections import namedtuple

rdd = sc.textFile("data/irmar.csv")

Person = namedtuple('Person', ['name', 'phone', 'office', 'organization', 
                               'position', 'hdr', 'team1', 'team2'])
def str_to_bool(s):
    if s == 'True': return True
    return False

def map_to_person(line):
    cols = line.split(";")
    return Person(name         = cols[0],
                  phone        = cols[1],
                  office       = cols[2],
                  organization = cols[3],
                  position     = cols[4], 
                  hdr          = str_to_bool(cols[5]),
                  team1        = cols[6],
                  team2        = cols[7])
    
people_rdd = rdd.map(map_to_person)
df = people_rdd.toDF()
df.show()

Schema

df.printSchema()

display

display(df)

select

df.select(df["name"], df["position"], df["organization"])
df.select(df["name"], df["position"], df["organization"]).show()

filter

df.filter(df["organization"] == "R2").show()

filter + select

df2 = df.filter(df["organization"] == "R2").select(df['name'],df['team1'])
df2.show()

orderBy

(df.filter(df["organization"] == "R2")
   .select(df["name"],df["position"])
   .orderBy("position")).show()

groupBy

df.groupby(df["hdr"])
df.groupby(df["hdr"]).count().show()

WARNING: Don’t confuse GroupedData.count() with DataFrame.count(). GroupedData.count() is not an action. DataFrame.count() is an action.

df.filter(df["hdr"]).count()
df.filter(df['hdr']).select("name").show()
df.groupBy(df["organization"]).count().show()

Exercises

  • How many teachers from INSA (PR+MC) ?
  • How many MC in STATS team ?
  • How many MC+CR with HDR ?
  • What is the ratio of student supervision (DOC / HDR) ?
  • List number of people for every organization ?
  • List number of HDR people for every team ?
  • Which team contains most HDR ?
  • List number of DOC students for every organization ?
  • Which team contains most DOC ?
  • List people from CNRS that are neither CR nor DR ?
sc.stop()