NYC Flights data analysis with Spark#

Reference : so many pyspark examples

In this notebook, we will extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is taken from here.

Variable descriptions

Name Description

  1. Year 1987-2008

  2. Month 1-12

  3. DayofMonth 1-31

  4. DayOfWeek 1 (Monday) - 7 (Sunday)

  5. DepTime actual departure time (local, hhmm)

  6. CRSDepTime scheduled departure time (local, hhmm)

  7. ArrTime actual arrival time (local, hhmm)

  8. CRSArrTime scheduled arrival time (local, hhmm)

  9. UniqueCarrier unique carrier code

  10. FlightNum flight number

  11. TailNu plane tail number

  12. ActualElapsedTime in minutes

  13. CRSElapsedTime in minutes

  14. AirTime in minutes

  15. ArrDelay arrival delay, in minutes

  16. DepDelay departure delay, in minutes

  17. Origin origin IATA airport code

  18. Dest destination IATA airport code

  19. Distance in miles

  20. TaxiIn taxi in time, in minutes

  21. TaxiOut taxi out time in minutes

  22. Cancelled was the flight cancelled?

  23. CancellationCode reason for cancellation (A = carrier, B = weather, C = NAS, D = security)

  24. Diverted 1 = yes, 0 = no

  25. CarrierDelay in minutes

  26. WeatherDelay in minutes

  27. NASDelay in minutes

  28. SecurityDelay in minutes

  29. LateAircraftDelay in minutes

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .config("spark.cores.max", "4") \
        .appName("NYCFlights") \
        .master("spark://b2-120-gra11:7077") \
        .getOrCreate()


nycflights = spark.read.parquet("hdfs://localhost:54310/data/nycflights.parquet")
nycflights.show()

Let’s take a look to the dataframe scheme

nycflights.printSchema()

Let’s group and aggregate groupBy() will group one or more DF columns and prep them for aggregration functions

(nycflights
 .groupby('Origin') # creates 'GroupedData'
 .count() # creates a new column with aggregate `count` values
 .show())

Use the agg() function to perform multiple aggregations

(nycflights
 .groupby('Origin')
 .agg({'DepDelay': 'avg', 'ArrDelay': 'avg'}) # note the new column names
 .show())

You can’t perform multiple aggregrations on the same column (only the last is performed)

(nycflights
 .groupby('DayOfWeek')
 .agg({'DepDelay': 'min', 'DepDelay': 'max'})
 .show())

Use groupBy() with a few columns, then aggregate

(
  nycflights
  .groupby(['DayOfWeek', 'Origin', 'Dest']) # group by these unique combinations
  .count()                              # perform a 'count' aggregation on the groups
  .orderBy(['DayOfWeek', 'count'],
           ascending = [1, 0])          # order by `DayOfWeek` ascending, `count` descending
  .show(40)
) 
     

Use groupBy() + pivot() + an aggregation function to make a pivot table! Get a table of flights by month for each carrier

(
  nycflights
  .groupBy('DayOfWeek') # group the data for aggregation by `month` number
  .pivot('UniqueCarrier') # provide columns of data by `carrier` abbreviation
  .count()          # create aggregations as a count of rows
  .show()
)
     

Column Operations#

Column instances can be created by:

(1) Selecting a column from a DataFrame

  • df.colName

  • df["colName"]

  • df.select(df.colName)

  • df.withColumn(df.colName)

(2) Creating one from an expression

  • df.colName + 1

  • 1 / df.colName

Once you have a Column instance, you can apply a wide range of functions. Some of the functions covered here are:

  • format_number(): apply formatting to a number, rounded to d decimal places, and return the result as a string

  • when() & otherwise(): when() evaluates a list of conditions and returns one of multiple possible result expressions; if otherwise() is not invoked, None is returned for unmatched conditions

  • concat_ws(): concatenates multiple input string columns together into a single string column, using the given separator

  • to_utc_timestamp(): assumes the given timestamp is in given timezone and converts to UTC

  • year(): extracts the year of a given date as integer

  • month(): extracts the month of a given date as integer

  • dayofmonth(): extracts the day of the month of a given date as integer

  • hour(): extract the hour of a given date as integer

  • minute(): extract the minute of a given date as integer

Perform 2 different aggregations, rename those new columns, then do some rounding of the aggregrate values

from pyspark.sql.functions import *

(
  nycflights
  .groupby('DayOfWeek')
  .agg({'DepDelay': 'avg', 'ArrDelay': 'avg'})
  .withColumnRenamed('avg(DepDelay)', 'mean_arr_delay')
  .withColumnRenamed('avg(ArrDelay)', 'mean_dep_delay')
  .withColumn('mean_arr_delay', format_number('mean_arr_delay', 1))
  .withColumn('mean_dep_delay', format_number('mean_dep_delay', 1))
  .show()
)

Add a new column (far_or_near) with a string based on a comparison on a numeric column; uses: withColumn(), when(), and otherwise()

from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

(
  nycflights
  .withColumn('far_or_near',
              when(nycflights.Distance > 1000, 'far') # the `if-then` statement
              .otherwise('near'))                     # the `else` statement
  .select(["Origin", "Dest", "far_or_near"])
  .distinct()
  .show()
)

Perform a few numerical computations across columns

(
  nycflights
  .withColumn('dist_per_minute',
              nycflights.Distance / nycflights.AirTime) # create new column with division of values
  .withColumn('dist_per_minute',
              format_number('dist_per_minute', 2))       # round that new column's float value to 2 decimal places
  .select(["Origin", "Dest", "dist_per_minute"])
  .distinct()
  .show()
)

You can split the date if you need. Use the year(), month(), dayofmonth(),hour(), and minute() functions with withColumn()

(
  nycflights
  .withColumn('Year', year(nycflights.Date))
  .withColumn('Month', month(nycflights.Date))
  .withColumn('Day', dayofmonth(nycflights.Date))
  .select(["Day", "Month", "Year"])
  .distinct()
  .show()
)

There are more time-based functions:

  • date_sub(): subtract an integer number of days from a Date or Timestamp

  • date_add(): add an integer number of days from a Date or Timestamp

  • datediff(): get the difference between two dates

  • add_months(): add an integer number of months

  • months_between(): get the number of months between two dates

  • next_day(): returns the first date which is later than the value of the date column

  • last_day(): returns the last day of the month which the given date belongs to

  • dayofmonth(): extract the day of the month of a given date as integer

  • dayofyear(): extract the day of the year of a given date as integer

  • weekofyear(): extract the week number of a given date as integer

  • quarter(): extract the quarter of a given date

Let’s transform the timestamp in the first record of nycflights with each of these functions

(
  nycflights
   .limit(10)
   .select('Date')
   .withColumn('dayofyear', dayofyear(nycflights.Date))
   .withColumn('weekofyear', weekofyear(nycflights.Date))
   .show()
   )
spark.stop()