NYC Flights data 2013 with Weather data#
Reference : rich-iannone/so-many-pyspark-examples
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \
.appName("Convert CSV to parquet") \
.master("spark://b2-120-gra11:7077") \
.getOrCreate()
Joins#
Joins are easily performed with Spark DataFrames. The expression is:
join(other, on = None, how = None)
where:
other: a DataFrame that serves as the right side of the join
on: typically a join expression
how: the default is
inner
but there are alsoinner
,outer
,left_outer
,right_outer
, andleftsemi
joins available
Let’s load in some more data so that we can have two DataFrames to join. The CSV file weather.csv
contains hourly meteorological data from EWR during 2013. nycflights2013.csv
contains flights data duringthe same period.
Lets create nycflights2013 using a schema object made with `pyspark.sql.type``
from pyspark.sql.types import * # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions
nycflights_schema = StructType([
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('dep_time', StringType(), True),
StructField('dep_delay', IntegerType(), True),
StructField('arr_time', StringType(), True),
StructField('arr_delay', IntegerType(), True),
StructField('carrier', StringType(), True),
StructField('tailnum', StringType(), True),
StructField('flight', StringType(), True),
StructField('origin', StringType(), True),
StructField('dest', StringType(), True),
StructField('air_time', IntegerType(), True),
StructField('distance', IntegerType(), True),
StructField('hour', IntegerType(), True),
StructField('minute', IntegerType(), True)
])
# ...and then read the CSV with the schema
nycflights13_csv = spark.read.csv("file:///srv/data/nycflights/nycflights13.csv", schema = nycflights_schema )
nycflights13_csv.show()
Create a proper timestamp.#
We have all the components: year
, month
, day
, hour
, and minute
.
Use concat_ws()
(concatentate with separator) to combine column data into StringType columns such that dates (-
separator, YYYY-MM-DD) and times (:
separator, 24-hour time) are formed
nycflights13 = \
(nycflights13_csv
.withColumn('date',
concat_ws('-',
nycflights13_csv.year,
nycflights13_csv.month,
nycflights13_csv.day))
.withColumn('time',
concat_ws(':',
nycflights13_csv.hour,
nycflights13_csv.minute)))
In a second step, concatenate with concat_ws()
the date
and time
strings (separator is a space); then drop several columns
nycflights13 = \
(nycflights13
.withColumn('timestamp',
concat_ws(' ',
nycflights13.date,
nycflights13.time))
.drop('date') # `drop()` doesn't accept a list of column names, therefore, for every column,
.drop('minute') # we would like to remove from the DataFrame, we must create a new `drop()`
.drop('time')) # statement
# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights13 = \
(nycflights13
.withColumn('timestamp',
to_utc_timestamp(nycflights13.timestamp, 'GMT')))
Create a schema object and then read the CSV with the schema
weather_schema = StructType([
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('hour', IntegerType(), True),
StructField('temp', FloatType(), True),
StructField('dewp', FloatType(), True),
StructField('humid', FloatType(), True),
StructField('wind_dir', IntegerType(), True),
StructField('wind_speed', FloatType(), True),
StructField('wind_gust', FloatType(), True),
StructField('precip', FloatType(), True),
StructField('pressure', FloatType(), True),
StructField('visib', FloatType(), True)
])
weather = spark.read.csv("file:///srv/data/nycflights/weather.csv", schema = weather_schema)
weather.show()
Join the nycflights
DF with the weather
DF
nycflights_all_columns = \
(nycflights13
.join(weather,
[nycflights13.month == weather.month, # three join conditions: month,
nycflights13.day == weather.day, # day,
nycflights13.hour == weather.hour], # hour
'left_outer')) # left outer join: keep all rows from the left DF (flights), with the matching rows in the right DF (weather)
# NULLs created if there is no match to the right DF
nycflights_all_columns.printSchema()
One way to reduce the number of extraneous columns is to use a select()
statement
nycflights_wind_visib = \
(nycflights_all_columns
.select(['timestamp', 'carrier', 'flight',
'origin', 'dest', 'wind_dir',
'wind_speed', 'wind_gust', 'visib']))
nycflights_wind_visib.schema.fields
Let’s load in even more data so we can determine if any takeoffs occurred in very windy weather.
The CSV beaufort_land.csv
contains Beaufort scale values (the force
column), wind speed ranges in mph, and the name for each wind force.
# Create a schema object...
beaufort_land_schema = StructType([
StructField('force', IntegerType(), True),
StructField('speed_mi_h_lb', IntegerType(), True),
StructField('speed_mi_h_ub', IntegerType(), True),
StructField('name', StringType(), True)
])
# ...and then read the CSV with the schema
beaufort_land = spark.read.csv('/srv/data/nycflights/beaufort_land.csv',
header = True, schema = beaufort_land_schema)
beaufort_land.show()
Join the current working DF with the beaufort_land
DF and use join expressions that use the WS ranges
nycflights_wind_visib_beaufort = \
(nycflights_wind_visib
.join(beaufort_land,
[nycflights_wind_visib.wind_speed >= beaufort_land.speed_mi_h_lb,
nycflights_wind_visib.wind_speed < beaufort_land.speed_mi_h_ub],
'left_outer')
.withColumn('month', month(nycflights_wind_visib.timestamp)) # Create a month column from `timestamp` values
.drop('speed_mi_h_lb')
.drop('speed_mi_h_ub')
)
nycflights_wind_visib_beaufort.printSchema()
nycflights_wind_visib_beaufort.filter("name IS NOT NULL").show()
nycflights_wind_visib_beaufort.filter("NOT name IS NULL").show()
We can inspect the number of potentially dangerous
takeoffs (i.e., where the Beaufort force is high)
month-by-month through the use of the crosstab()
function
crosstab_month_force = \
(nycflights_wind_visib_beaufort
.crosstab('month', 'force'))
crosstab_month_force.show()
After creating the crosstab DataFrame, use a few functions to clean up the resultant DataFrame
crosstab_month_force = \
(crosstab_month_force
.withColumn('month_force',
crosstab_month_force.month_force.cast('int')) # the column is initially a string but recasting as
# an `int` will aid ordering in the next expression
.orderBy('month_force')
.drop('null'))
crosstab_month_force.show()
User Defined Functions (UDFs)#
UDFs allow for computations of values while looking at every input row in the DataFrame. They allow you to make your own function and import functionality from other Python libraries.
Define a function to convert velocity from miles per hour (mph) to meters per second (mps)
def mph_to_mps(mph):
try:
mps = mph * 0.44704
except:
mps = 0.0
return mps
# Register this function as a UDF using `udf()`
mph_to_mps = udf(mph_to_mps, FloatType()) # An output type was specified
Create two new columns that are conversions of wind speeds from mph to mps
(
nycflights_wind_visib_beaufort
.withColumn('wind_speed_mps', mph_to_mps('wind_speed'))
.withColumn('wind_gust_mps', mph_to_mps('wind_gust'))
.withColumnRenamed('wind_speed', 'wind_speed_mph')
.withColumnRenamed('wind_gust', 'wind_gust_mph')
.show()
)