Dask#
process data that doesn’t fit into memory by breaking it into blocks and specifying task chains
parallelize execution of tasks across cores and even nodes of a cluster
move computation to the data rather than the other way around, to minimize communication overheads
http://dask.pydata.org/en/latest/
import dask
Define two slow functions#
from time import sleep
def slowinc(x, delay=1):
sleep(delay)
return x + 1
def slowadd(x, y, delay=1):
sleep(delay)
return x + y
%%time
x = slowinc(1)
y = slowinc(2)
z = slowadd(x, y)
Parallelize with dask.delayed#
Functions wrapped by
dask.delayed
don’t run immediately, but instead put those functions and arguments into a task graph.The result is computed separately by calling the
.compute()
method.
from dask import delayed
x = dask.delayed(slowinc)(1)
y = dask.delayed(slowinc)(2)
z = dask.delayed(slowadd)(x, y)
%%time
z.compute()
Some questions to consider:#
Why did we go from 3s to 2s? Why weren’t we able to parallelize down to 1s?
What would have happened if the inc and add functions didn’t include the
sleep(1)
? Would Dask still be able to speed up this code?What if we have multiple outputs or also want to get access to x or y?
Dask graph#
Contains description of the calculations necessary to produce the result.
The z object is a lazy Delayed object. This object holds everything we need to compute the final result. We can compute the result with .compute() as above or we can visualize the task graph for this value with .visualize().
z.visualize()
Parallelize a loop#
%%time
data = list(range(8))
tasks = []
for x in data:
y = slowinc(x)
tasks.append(y)
total = sum(tasks)
total
Exercise 8.1#
Parallelize this by appending the delayed
slowinc
calls to the listresults
.Display the graph of
total
computationCompute time elapsed for the computation.
Decorator#
It is also common to see the delayed function used as a decorator. Same example:
%%time
@dask.delayed
def slowinc(x, delay=1):
sleep(delay)
return x + 1
@dask.delayed
def slowadd(x, y, delay=1):
sleep(delay)
return x + y
x = slowinc(1)
y = slowinc(2)
z = slowadd(x, y)
z.compute()
z.visualize()
Control flow#
Delay only some functions, running a few of them immediately. This is helpful when those functions are fast and help us to determine what other slower functions we should call.
In the example below we iterate through a list of inputs. If that input is even then we want to call
half
. If the input is odd then we want to callodd_process
. This iseven decision to callhalf
orodd_process
has to be made immediately (not lazily) in order for our graph-building Python code to proceed.
from random import randint
import dask.delayed
def half(x):
sleep(1)
return x // 2
def odd_process(x):
sleep(1)
return 3*x+1
def is_even(x):
return not x % 2
data = [randint(0,100) for i in range(8)]
result = []
for x in data:
if is_even(x):
result.append(half(x))
else:
result.append(odd_process(x))
total = sum(result)
Exercise 8.2#
Parallelize the sequential code above using dask.delayed
You will need to delay some functions, but not all
Visualize and check the computed result
Exercise 8.3#
Parallelize the hdf5 conversion from json files
Create a function
convert_to_hdf
Use dask.compute function on delayed calls of the funtion created list
Is it really faster as expected ?
Hint: Read Delayed Best Practices
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
datadir = os.path.join(where) # directory where extract all datafile
if os.path.exists(datadir): # check if this directory exists
print("Extracting data...")
tar_path = os.path.join(name) # path to the tgz file
with tarfile.open(tar_path, mode='r:gz') as data: # open the tgz file
data.extractall(datadir) # extract all data file in datadir
extract_data('data/daily-stock.tgz','data') # this function call will extract json files
import dask
import os, sys
from glob import glob
import pandas as pd
import json
here = os.getcwd() # get the current directory
filenames = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.json')))
filenames[:5]
%rm data/daily-stock/*.h5
Exercise: Parallelizing a Pandas Groupby Reduction#
In this exercise we read several CSV files and perform a groupby operation in parallel. We are given sequential code to do this and parallelize it with dask.delayed
.
The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data. We will do this by using dask.delayed
together with pandas
. In a future section we will do this same exercise with dask.dataframe
.
Prep data#
First, run this code to prep some data. You don’t need to understand this code.
This extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is taken from here. This should only take a few seconds to run.
Inspect data#
Data are in the file data/nycflights.tar.gz
. You can extract them with the command
tar zxvf nycflights.tar.gz
According to your operating system, double click on the file could do the job.
extract_data('data/nycflights.tar.gz','data')
Read one file with pandas.read_csv
and compute mean departure delay#
import pandas as pd
df = pd.read_csv(os.path.join("data", "nycflights",'1990.csv'))
df.head()
# What is the schema?
df.dtypes
# What originating airports are in the data?
df.Origin.unique()
# Mean departure delay per-airport for one year
df.groupby('Origin').DepDelay.mean()
Sequential code: Mean Departure Delay Per Airport#
The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.
from glob import glob
filenames = sorted(glob(os.path.join('data', "nycflights", '*.csv')))
filenames
%%time
sums = []
counts = []
for fn in filenames:
# Read in file
df = pd.read_csv(fn)
# Groupby origin airport
by_origin = df.groupby('Origin')
# Sum of all departure delays by origin
total = by_origin.DepDelay.sum()
# Number of flights by origin
count = by_origin.DepDelay.count()
# Save the intermediates
sums.append(total)
counts.append(count)
# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
mean
Exercise : Parallelize the code above#
Use dask.delayed
to parallelize the code above. Some extra things you will need to know.
Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.
x = delayed(np.arange)(10) y = (x + 1)[::2].sum() # everything here was delayed
Calling the
.compute()
method works well when you have a single output. When you have multiple outputs you might want to use thedask.compute
function:>>> x = delayed(np.arange)(10) >>> y = x ** 2 >>> min, max = compute(y.min(), y.max()) (0, 81)
This way Dask can share the intermediate values (like
y = x**2
)
So your goal is to parallelize the code above (which has been copied below) using dask.delayed
. You may also want to visualize a bit of the computation to see if you’re doing it correctly.