Spark DataFrames#

  • Enable wider audiences beyond “Big Data” engineers to leverage the power of distributed processing

  • Inspired by data frames in R and Python (Pandas)

  • Designed from the ground-up to support modern big data and data science applications

  • Extension to the existing RDD API

References#

DataFrames are :#

  • The preferred abstraction in Spark

  • Strongly typed collection of distributed elements

  • Built on Resilient Distributed Datasets (RDD)

  • Immutable once constructed

With Dataframes you can :#

  • Track lineage information to efficiently recompute lost data

  • Enable operations on collection of elements in parallel

You construct DataFrames#

  • by parallelizing existing collections (e.g., Pandas DataFrames)

  • by transforming an existing DataFrames

  • from files in HDFS or any other storage system (e.g., Parquet)

Features#

  • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster

  • Support for a wide array of data formats and storage systems

  • Seamless integration with all big data tooling and infrastructure via Spark

  • APIs for Python, Java, Scala, and R

DataFrames versus RDDs#

  • Nice API for new users familiar with data frames in other programming languages.

  • For existing Spark users, the API will make Spark easier to program than using RDDs

  • For both sets of users, DataFrames will improve performance through intelligent optimizations and code-generation

PySpark Shell#

Run the Spark shell:

pyspark

Output similar to the following will be displayed, followed by a >>> REPL prompt:

Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2018-09-18 17:13:13 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.
>>>

Read data and convert to Dataset

df = sqlContext.read.csv("/tmp/irmar.csv", sep=';', header=True)
>>> df2.show()
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|_c0|                name|       phone|office|organization|position|  hdr|    team1|   team2|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|  0|      Alphonse Paul |+33223235223|   214|          R1|     DOC|False|      EDP|      NA|
|  1|        Ammari Zied |+33223235811|   209|          R1|      MC| True|      EDP|      NA|
.
.
.
| 18|    Bernier Joachim |+33223237558|   214|          R1|     DOC|False|   ANANUM|      NA|
| 19|   Berthelot Pierre |+33223236043|   601|          R1|      PE| True|       GA|      NA|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
only showing top 20 rows

Transformations, Actions, Laziness#

Like RDDs, DataFrames are lazy. Transformations contribute to the query plan, but they don’t execute anything. Actions cause the execution of the query.

Transformation examples#

  • filter

  • select

  • drop

  • intersect

  • join

Action examples#

  • count

  • collect

  • show

  • head

  • take

Creating a DataFrame in Python#

import sys, subprocess
import os

os.environ["PYSPARK_PYTHON"] = sys.executable
from pyspark import SparkContext, SparkConf, SQLContext
# The following three lines are not necessary
# in the pyspark shell
conf = SparkConf().setAppName("people").setMaster("local[*]") 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
df = sqlContext.read.json("data/people.json") # get a dataframe from json file

df.show(24)

Schema Inference#

In this exercise, let’s explore schema inference. We’re going to be using a file called irmar.txt. The data is structured, but it has no self-describing schema. And, it’s not JSON, so Spark can’t infer the schema automatically. Let’s create an RDD and look at the first few rows of the file.

rdd = sc.textFile("data/irmar.csv")
for line in rdd.take(10):
  print(line)

Hands-on Exercises#

You can look at the DataFrames API documentation

Let’s take a look to file “/tmp/irmar.csv”. Each line consists of the same information about a person:

  • name

  • phone

  • office

  • organization

  • position

  • hdr

  • team1

  • team2

from collections import namedtuple

rdd = sc.textFile("data/irmar.csv")

Person = namedtuple('Person', ['name', 'phone', 'office', 'organization', 
                               'position', 'hdr', 'team1', 'team2'])
def str_to_bool(s):
    if s == 'True': return True
    return False

def map_to_person(line):
    cols = line.split(";")
    return Person(name         = cols[0],
                  phone        = cols[1],
                  office       = cols[2],
                  organization = cols[3],
                  position     = cols[4], 
                  hdr          = str_to_bool(cols[5]),
                  team1        = cols[6],
                  team2        = cols[7])
    
people_rdd = rdd.map(map_to_person)
df = people_rdd.toDF()
df.show()

Schema#

df.printSchema()

display#

display(df)

select#

df.select(df["name"], df["position"], df["organization"])
df.select(df["name"], df["position"], df["organization"]).show()

filter#

df.filter(df["organization"] == "R2").show()

filter + select#

df2 = df.filter(df["organization"] == "R2").select(df['name'],df['team1'])
df2.show()

orderBy#

(df.filter(df["organization"] == "R2")
   .select(df["name"],df["position"])
   .orderBy("position")).show()

groupBy#

df.groupby(df["hdr"])
df.groupby(df["hdr"]).count().show()

WARNING: Don’t confuse GroupedData.count() with DataFrame.count(). GroupedData.count() is not an action. DataFrame.count() is an action.

df.filter(df["hdr"]).count()
df.filter(df['hdr']).select("name").show()
df.groupBy(df["organization"]).count().show()

Exercises#

  • How many teachers from INSA (PR+MC) ?

  • How many MC in STATS team ?

  • How many MC+CR with HDR ?

  • What is the ratio of student supervision (DOC / HDR) ?

  • List number of people for every organization ?

  • List number of HDR people for every team ?

  • Which team contains most HDR ?

  • List number of DOC students for every organization ?

  • Which team contains most DOC ?

  • List people from CNRS that are neither CR nor DR ?

sc.stop()