import os
import pandas as pd
"max.rows", 10)
pd.set_option( os.getcwd()
Dask Dataframes
Dask is a flexible parallel computing library for analytic computing written in Python. Dask is similar to Spark, by lazily constructing directed acyclic graph (DAG) of tasks and splitting large datasets into small portions called partitions. See the below image from Dask’s web page for illustration.
It has three main interfaces:
- Array, which works like NumPy arrays;
- Bag, which is similar to RDD interface in Spark;
- DataFrame, which works like Pandas DataFrame.
While it can work on a distributed cluster, Dask works also very well on a single cpu machine.
DataFrames
Dask dataframes look and feel (mostly) like Pandas dataframes but they run on the same infrastructure that powers dask.delayed.
The dask.dataframe
module implements a blocked parallel DataFrame
object that mimics a large subset of the Pandas DataFrame
. One dask DataFrame
is comprised of many in-memory pandas DataFrames
separated along the index. One operation on a dask DataFrame
triggers many pandas operations on the constituent pandas DataFrame
s in a way that is mindful of potential parallelism and memory constraints.
Related Documentation
In this notebook, we will extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is taken from here. This should only take a few seconds to run.
We will use dask.dataframe
construct our computations for us. The dask.dataframe.read_csv
function can take a globstring like "data/nycflights/*.csv"
and build parallel computations on all of our data at once.
Variable descriptions
Name Description
Year
1987-2008Month
1-12DayofMonth
1-31DayOfWee
1 (Monday) - 7 (Sunday)DepTime
actual departure time (local, hhmm)CRSDepTime
scheduled departure time (local, hhmm)ArrTime
actual arrival time (local, hhmm)CRSArrTime
scheduled arrival time (local, hhmm)UniqueCarrier
unique carrier codeFlightNum
flight numberTailNu
plane tail numberActualElapsedTime
in minutesCRSElapsedTime
in minutesAirTime
in minutesArrDelay
arrival delay, in minutesDepDelay
departure delay, in minutesOrigin
origin IATA airport codeDest
destination IATA airport codeDistance
in milesTaxiIn
taxi in time, in minutesTaxiOut
taxi out time in minutesCancelled
was the flight cancelled?CancellationCode
reason for cancellation (A = carrier, B = weather, C = NAS, D = security)Diverted
1 = yes, 0 = noCarrierDelay
in minutesWeatherDelay
in minutesNASDelay
in minutesSecurityDelay
in minutesLateAircraftDelay
in minutes
Prep the Data
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_flight():
= os.getcwd()
here = os.path.join(here,'data', 'nycflights')
flightdir if not os.path.exists(flightdir):
print("Extracting flight data")
= os.path.join('data', 'nycflights.tar.gz')
tar_path with tarfile.open(tar_path, mode='r:gz') as flights:
'data/')
flights.extractall(
# this function call will extract 10 csv files in data/nycflights extract_flight()
Load Data from CSVs in Dask Dataframes
import os
= os.getcwd()
here = os.path.join(here, 'data', 'nycflights', '*.csv')
filenames filenames
import dask
import dask.dataframe as dd
= dd.read_csv(filenames,
df ={'Date': [0, 1, 2]}) parse_dates
Let’s take a look to the dataframe
df
### Get the first 5 rows
5) df.head(
import traceback # we use traceback because we expect an error.
try:
5) # Get the last 5 rows
df.tail(except Exception:
traceback.print_exc()
What just happened?
Unlike pandas.read_csv
which reads in the entire file before inferring datatypes, dask.dataframe.read_csv
only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions.
In this case, the datatypes inferred in the sample are incorrect. The first n
rows have no value for CRSElapsedTime
(which pandas infers as a float
), and later on turn out to be strings (object
dtype). When this happens you have a few options:
- Specify dtypes directly using the
dtype
keyword. This is the recommended solution, as it’s the least error prone (better to be explicit than implicit) and also the most performant. - Increase the size of the
sample
keyword (in bytes) - Use
assume_missing
to makedask
assume that columns inferred to beint
(which don’t allow missing values) are actually floats (which do allow missing values). In our particular case this doesn’t apply.
In our case we’ll use the first option and directly specify the dtypes
of the offending columns.
df.dtypes
= dd.read_csv(filenames,
df ={'Date': [0, 1, 2]},
parse_dates={'TailNum': object,
dtype'CRSElapsedTime': float,
'Cancelled': bool})
5) df.tail(
Let’s take a look at one more example to fix ideas.
len(df)
Why df is ten times longer ?
- Dask investigated the input path and found that there are ten matching files.
- A set of jobs was intelligently created for each chunk - one per original CSV file in this case.
- Each file was loaded into a pandas dataframe, had
len()
applied to it. - The subtotals were combined to give you the final grant total.
Computations with dask.dataframe
We compute the maximum of the DepDelay
column. With dask.delayed
we could create this computation as follows:
= []
maxes for fn in filenames:
= dask.delayed(pd.read_csv)(fn)
df max())
maxes.append(df.DepDelay.
= dask.delayed(max)(maxes)
final_max final_max.compute()
Now we just use the normal Pandas syntax as follows:
%time df.DepDelay.max().compute()
This writes the delayed computation for us and then runs it. Recall that the delayed computation is a dask graph made of up of key-value pairs.
Some things to note:
- As with
dask.delayed
, we need to call.compute()
when we’re done. Up until this point everything is lazy. - Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible.
- This lets us handle datasets that are larger than memory
- This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?)
As with Delayed
objects, you can view the underlying task graph using the .visualize
method:
max().visualize() df.DepDelay.
If you are already familiar with the Pandas API then know how to use dask.dataframe
. There are a couple of small changes.
As noted above, computations on dask DataFrame
objects don’t perform work, instead they build up a dask graph. We can evaluate this dask graph at any time using the .compute()
method.
= df.DepDelay.mean() # create the tasks graph result
%time result.compute() # perform actual computation
Store Data in Apache Parquet Format
Dask encourage dataframe users to store and load data using Parquet instead of csv. Apache Parquet is a columnar binary format that is easy to split into multiple files (easier for parallel loading) and is generally much simpler to deal with than HDF5 (from the Dask library’s perspective). It is also a common format used by other big data systems like Apache Spark and Apache Impala and so is useful to interchange with other systems.
"TailNum", axis=1).to_parquet("nycflights/") # save csv files using parquet format df.drop(
It is possible to specify dtypes and compression when converting. This can definitely help give you significantly greater speedups, but just using the default settings will still be a large improvement.
df.size.compute()
import dask.dataframe as dd
= dd.read_parquet("nycflights/")
df df.head()
= df.DepDelay.mean() result
%time result.compute()
The computation is much faster because pulling out the DepDelay column is easy for Parquet.
Parquet advantages:
- Binary representation of data, allowing for speedy conversion of bytes-on-disk to bytes-in-memory
- Columnar storage, meaning that you can load in as few columns as you need without loading the entire dataset
- Row-chunked storage so that you can pull out data from a particular range without touching the others
- Per-chunk statistics so that you can find subsets quickly
- Compression
Exercise 15.1
If you don’t remember how to use pandas. Please read pandas documentation.
- Use the
head()
method to get the first ten rows - How many rows are in our dataset?
- Use selections
df[...]
to find how many positive (late) and negative (early) departure times there are - In total, how many non-cancelled flights were taken? (To invert a boolean pandas Series s, use ~s).
Divisions and the Index
The Pandas index associates a value to each record/row of your data. Operations that align with the index, like loc
can be a bit faster as a result.
In dask.dataframe
this index becomes even more important. Recall that one dask DataFrame
consists of several Pandas DataFrame
s. These dataframes are separated along the index by value. For example, when working with time series we may partition our large dataset by month.
Recall that these many partitions of our data may not all live in memory at the same time, instead they might live on disk; we simply have tasks that can materialize these pandas DataFrames
on demand.
Partitioning your data can greatly improve efficiency. Operations like loc
, groupby
, and merge/join
along the index are much more efficient than operations along other columns. You can see how your dataset is partitioned with the .divisions
attribute. Note that data that comes out of simple data sources like CSV files aren’t intelligently indexed by default. In these cases the values for .divisions
will be None.
= dd.read_csv(filenames,
df ={'TailNum': str,
dtype'CRSElapsedTime': float,
'Cancelled': bool})
df.divisions
However if we set the index to some new column then dask will divide our data roughly evenly along that column and create new divisions for us. Warning, set_index
triggers immediate computation.
= df.set_index('Year')
df2 df2.divisions
We see here the minimum and maximum values (1990 and 1999) as well as the intermediate values that separate our data well. This dataset has ten partitions, as the final value is assumed to be the inclusive right-side for the last bin.
df2.npartitions
df2.head()
One of the benefits of this is that operations like loc
only need to load the relevant partitions
1991] df2.loc[
1991].compute() df2.loc[
Exercises 15.2
In this section we do a few dask.dataframe
computations. If you are comfortable with Pandas then these should be familiar. You will have to think about when to call compute
.
- In total, how many non-cancelled flights were taken from each airport?
Hint: use df.groupby
. df.groupby(df.A).B.func()
.
- What was the average departure delay from each airport?
Note, this is the same computation you did in the previous notebook (is this approach faster or slower?)
- What day of the week has the worst average departure delay?