NYC Flights data 2013 with Weather data#

Reference : rich-iannone/so-many-pyspark-examples

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .config("spark.executor.memory", "8g") \
        .appName("Convert CSV to parquet") \
        .master("spark://b2-120-gra11:7077") \
        .getOrCreate()

Joins#

Joins are easily performed with Spark DataFrames. The expression is:

join(other, on = None, how = None)

where:

  • other: a DataFrame that serves as the right side of the join

  • on: typically a join expression

  • how: the default is inner but there are also inner, outer, left_outer, right_outer, and leftsemi joins available

Let’s load in some more data so that we can have two DataFrames to join. The CSV file weather.csv contains hourly meteorological data from EWR during 2013. nycflights2013.csv contains flights data duringthe same period. Lets create nycflights2013 using a schema object made with `pyspark.sql.type``

from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

nycflights_schema = StructType([
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('dep_time', StringType(), True),
  StructField('dep_delay', IntegerType(), True),
  StructField('arr_time', StringType(), True),
  StructField('arr_delay', IntegerType(), True),
  StructField('carrier', StringType(), True),
  StructField('tailnum', StringType(), True),
  StructField('flight', StringType(), True),  
  StructField('origin', StringType(), True),
  StructField('dest', StringType(), True),
  StructField('air_time', IntegerType(), True),
  StructField('distance', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('minute', IntegerType(), True)
  ])

# ...and then read the CSV with the schema
nycflights13_csv = spark.read.csv("file:///srv/data/nycflights/nycflights13.csv", schema = nycflights_schema )
nycflights13_csv.show()

Create a proper timestamp.#

We have all the components: year, month, day, hour, and minute.

Use concat_ws() (concatentate with separator) to combine column data into StringType columns such that dates (- separator, YYYY-MM-DD) and times (: separator, 24-hour time) are formed

nycflights13 = \
(nycflights13_csv
 .withColumn('date',
             concat_ws('-',
                       nycflights13_csv.year,
                       nycflights13_csv.month,
                       nycflights13_csv.day))
 .withColumn('time',
             concat_ws(':',
                       nycflights13_csv.hour,
                       nycflights13_csv.minute)))

In a second step, concatenate with concat_ws() the date and time strings (separator is a space); then drop several columns

nycflights13 = \
(nycflights13
 .withColumn('timestamp',
             concat_ws(' ',
                       nycflights13.date,
                       nycflights13.time))
 .drop('date')     # `drop()` doesn't accept a list of column names, therefore, for every column, 
 .drop('minute')   # we would like to remove from the DataFrame, we must create a new `drop()`
 .drop('time'))    # statement

# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights13 = \
(nycflights13
 .withColumn('timestamp',
             to_utc_timestamp(nycflights13.timestamp, 'GMT')))

Create a schema object and then read the CSV with the schema

weather_schema = StructType([  
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('temp', FloatType(), True),
  StructField('dewp', FloatType(), True),
  StructField('humid', FloatType(), True),
  StructField('wind_dir', IntegerType(), True),
  StructField('wind_speed', FloatType(), True),
  StructField('wind_gust', FloatType(), True),
  StructField('precip', FloatType(), True),
  StructField('pressure', FloatType(), True),
  StructField('visib', FloatType(), True)
  ])


weather = spark.read.csv("file:///srv/data/nycflights/weather.csv", schema = weather_schema)
weather.show()

Join the nycflights DF with the weather DF

nycflights_all_columns = \
(nycflights13
 .join(weather,
       [nycflights13.month == weather.month, # three join conditions: month,
        nycflights13.day == weather.day,     #                        day,
        nycflights13.hour == weather.hour],  #                        hour
       'left_outer')) # left outer join: keep all rows from the left DF (flights), with the matching rows in the right DF (weather)
                      # NULLs created if there is no match to the right DF
nycflights_all_columns.printSchema()

One way to reduce the number of extraneous columns is to use a select() statement

nycflights_wind_visib = \
(nycflights_all_columns
 .select(['timestamp', 'carrier', 'flight',
          'origin', 'dest', 'wind_dir',
          'wind_speed', 'wind_gust', 'visib']))
nycflights_wind_visib.schema.fields

Let’s load in even more data so we can determine if any takeoffs occurred in very windy weather.

The CSV beaufort_land.csv contains Beaufort scale values (the force column), wind speed ranges in mph, and the name for each wind force.

# Create a schema object... 
beaufort_land_schema = StructType([  
  StructField('force', IntegerType(), True),
  StructField('speed_mi_h_lb', IntegerType(), True),
  StructField('speed_mi_h_ub', IntegerType(), True),
  StructField('name', StringType(), True)
  ])

# ...and then read the CSV with the schema
beaufort_land = spark.read.csv('/srv/data/nycflights/beaufort_land.csv', 
                               header = True, schema = beaufort_land_schema)
beaufort_land.show()

Join the current working DF with the beaufort_land DF and use join expressions that use the WS ranges

nycflights_wind_visib_beaufort = \
(nycflights_wind_visib
 .join(beaufort_land,
      [nycflights_wind_visib.wind_speed >= beaufort_land.speed_mi_h_lb,
       nycflights_wind_visib.wind_speed < beaufort_land.speed_mi_h_ub],
       'left_outer')
 .withColumn('month', month(nycflights_wind_visib.timestamp)) # Create a month column from `timestamp` values
 .drop('speed_mi_h_lb')
 .drop('speed_mi_h_ub')
)
nycflights_wind_visib_beaufort.printSchema()
nycflights_wind_visib_beaufort.filter("name IS NOT NULL").show()
nycflights_wind_visib_beaufort.filter("NOT name IS NULL").show()

We can inspect the number of potentially dangerous takeoffs (i.e., where the Beaufort force is high) month-by-month through the use of the crosstab() function

crosstab_month_force = \
(nycflights_wind_visib_beaufort
 .crosstab('month', 'force'))
crosstab_month_force.show()

After creating the crosstab DataFrame, use a few functions to clean up the resultant DataFrame

crosstab_month_force = \
(crosstab_month_force
 .withColumn('month_force',
             crosstab_month_force.month_force.cast('int')) # the column is initially a string but recasting as
                                                           # an `int` will aid ordering in the next expression
 .orderBy('month_force')
 .drop('null'))
crosstab_month_force.show()

User Defined Functions (UDFs)#

UDFs allow for computations of values while looking at every input row in the DataFrame. They allow you to make your own function and import functionality from other Python libraries.

Define a function to convert velocity from miles per hour (mph) to meters per second (mps)

def mph_to_mps(mph):
    try:
        mps = mph * 0.44704
    except:
        mps = 0.0
    return mps

# Register this function as a UDF using `udf()`
mph_to_mps = udf(mph_to_mps, FloatType()) # An output type was specified

Create two new columns that are conversions of wind speeds from mph to mps

(
  nycflights_wind_visib_beaufort
  .withColumn('wind_speed_mps', mph_to_mps('wind_speed'))
  .withColumn('wind_gust_mps', mph_to_mps('wind_gust'))
  .withColumnRenamed('wind_speed', 'wind_speed_mph')
  .withColumnRenamed('wind_gust', 'wind_gust_mph')
  .show()
)