NYC Flights data analysis with Spark#
Reference : so many pyspark examples
In this notebook, we will extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is taken from here.
Variable descriptions
Name Description
Year
1987-2008Month
1-12DayofMonth
1-31DayOfWeek
1 (Monday) - 7 (Sunday)DepTime
actual departure time (local, hhmm)CRSDepTime
scheduled departure time (local, hhmm)ArrTime
actual arrival time (local, hhmm)CRSArrTime
scheduled arrival time (local, hhmm)UniqueCarrier
unique carrier codeFlightNum
flight numberTailNu
plane tail numberActualElapsedTime
in minutesCRSElapsedTime
in minutesAirTime
in minutesArrDelay
arrival delay, in minutesDepDelay
departure delay, in minutesOrigin
origin IATA airport codeDest
destination IATA airport codeDistance
in milesTaxiIn
taxi in time, in minutesTaxiOut
taxi out time in minutesCancelled
was the flight cancelled?CancellationCode
reason for cancellation (A = carrier, B = weather, C = NAS, D = security)Diverted
1 = yes, 0 = noCarrierDelay
in minutesWeatherDelay
in minutesNASDelay
in minutesSecurityDelay
in minutesLateAircraftDelay
in minutes
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.cores.max", "4") \
.appName("NYCFlights") \
.master("spark://b2-120-gra11:7077") \
.getOrCreate()
nycflights = spark.read.parquet("hdfs://localhost:54310/data/nycflights.parquet")
nycflights.show()
Let’s take a look to the dataframe scheme
nycflights.printSchema()
Let’s group and aggregate groupBy()
will group one or more DF columns and prep them for aggregration functions
(nycflights
.groupby('Origin') # creates 'GroupedData'
.count() # creates a new column with aggregate `count` values
.show())
Use the agg()
function to perform multiple aggregations
(nycflights
.groupby('Origin')
.agg({'DepDelay': 'avg', 'ArrDelay': 'avg'}) # note the new column names
.show())
You can’t perform multiple aggregrations on the same column (only the last is performed)
(nycflights
.groupby('DayOfWeek')
.agg({'DepDelay': 'min', 'DepDelay': 'max'})
.show())
Use groupBy()
with a few columns, then aggregate
(
nycflights
.groupby(['DayOfWeek', 'Origin', 'Dest']) # group by these unique combinations
.count() # perform a 'count' aggregation on the groups
.orderBy(['DayOfWeek', 'count'],
ascending = [1, 0]) # order by `DayOfWeek` ascending, `count` descending
.show(40)
)
Use groupBy()
+ pivot()
+ an aggregation function to make a pivot table!
Get a table of flights by month for each carrier
(
nycflights
.groupBy('DayOfWeek') # group the data for aggregation by `month` number
.pivot('UniqueCarrier') # provide columns of data by `carrier` abbreviation
.count() # create aggregations as a count of rows
.show()
)
Column Operations#
Column
instances can be created by:
(1) Selecting a column from a DataFrame
df.colName
df["colName"]
df.select(df.colName)
df.withColumn(df.colName)
(2) Creating one from an expression
df.colName + 1
1 / df.colName
Once you have a Column
instance, you can apply a wide range of functions. Some of the functions covered here are:
format_number()
: apply formatting to a number, rounded tod
decimal places, and return the result as a stringwhen()
&otherwise()
:when()
evaluates a list of conditions and returns one of multiple possible result expressions; ifotherwise()
is not invoked,None
is returned for unmatched conditionsconcat_ws()
: concatenates multiple input string columns together into a single string column, using the given separatorto_utc_timestamp()
: assumes the given timestamp is in given timezone and converts to UTCyear()
: extracts the year of a given date as integermonth()
: extracts the month of a given date as integerdayofmonth()
: extracts the day of the month of a given date as integerhour()
: extract the hour of a given date as integerminute()
: extract the minute of a given date as integer
Perform 2 different aggregations, rename those new columns, then do some rounding of the aggregrate values
from pyspark.sql.functions import *
(
nycflights
.groupby('DayOfWeek')
.agg({'DepDelay': 'avg', 'ArrDelay': 'avg'})
.withColumnRenamed('avg(DepDelay)', 'mean_arr_delay')
.withColumnRenamed('avg(ArrDelay)', 'mean_dep_delay')
.withColumn('mean_arr_delay', format_number('mean_arr_delay', 1))
.withColumn('mean_dep_delay', format_number('mean_dep_delay', 1))
.show()
)
Add a new column (far_or_near
) with a string based on a comparison
on a numeric column; uses: withColumn()
, when()
, and otherwise()
from pyspark.sql.types import * # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions
(
nycflights
.withColumn('far_or_near',
when(nycflights.Distance > 1000, 'far') # the `if-then` statement
.otherwise('near')) # the `else` statement
.select(["Origin", "Dest", "far_or_near"])
.distinct()
.show()
)
Perform a few numerical computations across columns
(
nycflights
.withColumn('dist_per_minute',
nycflights.Distance / nycflights.AirTime) # create new column with division of values
.withColumn('dist_per_minute',
format_number('dist_per_minute', 2)) # round that new column's float value to 2 decimal places
.select(["Origin", "Dest", "dist_per_minute"])
.distinct()
.show()
)
You can split the date if you need. Use the year()
, month()
, dayofmonth()
,hour()
, and minute()
functions with withColumn()
(
nycflights
.withColumn('Year', year(nycflights.Date))
.withColumn('Month', month(nycflights.Date))
.withColumn('Day', dayofmonth(nycflights.Date))
.select(["Day", "Month", "Year"])
.distinct()
.show()
)
There are more time-based functions:
date_sub()
: subtract an integer number of days from a Date or Timestampdate_add()
: add an integer number of days from a Date or Timestampdatediff()
: get the difference between two datesadd_months()
: add an integer number of monthsmonths_between()
: get the number of months between two datesnext_day()
: returns the first date which is later than the value of the date columnlast_day()
: returns the last day of the month which the given date belongs todayofmonth()
: extract the day of the month of a given date as integerdayofyear()
: extract the day of the year of a given date as integerweekofyear()
: extract the week number of a given date as integerquarter()
: extract the quarter of a given date
Let’s transform the timestamp in the first record of nycflights
with each of these functions
(
nycflights
.limit(10)
.select('Date')
.withColumn('dayofyear', dayofyear(nycflights.Date))
.withColumn('weekofyear', weekofyear(nycflights.Date))
.show()
)
spark.stop()