import dask
Dask
- process data that doesn’t fit into memory by breaking it into blocks and specifying task chains
- parallelize execution of tasks across cores and even nodes of a cluster
- move computation to the data rather than the other way around, to minimize communication overheads
http://dask.pydata.org/en/latest/
Define two slow functions
from time import sleep
def slowinc(x, delay=1):
sleep(delay)return x + 1
def slowadd(x, y, delay=1):
sleep(delay)return x + y
%%time
= slowinc(1)
x = slowinc(2)
y = slowadd(x, y) z
Parallelize with dask.delayed
- Functions wrapped by
dask.delayed
don’t run immediately, but instead put those functions and arguments into a task graph. - The result is computed separately by calling the
.compute()
method.
from dask import delayed
= dask.delayed(slowinc)(1)
x = dask.delayed(slowinc)(2)
y = dask.delayed(slowadd)(x, y) z
%%time
z.compute()
Some questions to consider:
- Why did we go from 3s to 2s? Why weren’t we able to parallelize down to 1s?
- What would have happened if the inc and add functions didn’t include the
sleep(1)
? Would Dask still be able to speed up this code? - What if we have multiple outputs or also want to get access to x or y?
Dask graph
- Contains description of the calculations necessary to produce the result.
- The z object is a lazy Delayed object. This object holds everything we need to compute the final result. We can compute the result with .compute() as above or we can visualize the task graph for this value with .visualize().
z.visualize()
Parallelize a loop
%%time
= list(range(8))
data
= []
tasks
for x in data:
= slowinc(x)
y
tasks.append(y)
= sum(tasks)
total total
Exercise 8.1
- Parallelize this by appending the delayed
slowinc
calls to the listresults
. - Display the graph of
total
computation - Compute time elapsed for the computation.
Decorator
It is also common to see the delayed function used as a decorator. Same example:
%%time
@dask.delayed
def slowinc(x, delay=1):
sleep(delay)return x + 1
@dask.delayed
def slowadd(x, y, delay=1):
sleep(delay)return x + y
= slowinc(1)
x = slowinc(2)
y = slowadd(x, y)
z z.compute()
z.visualize()
Control flow
- Delay only some functions, running a few of them immediately. This is helpful when those functions are fast and help us to determine what other slower functions we should call.
- In the example below we iterate through a list of inputs. If that input is even then we want to call
half
. If the input is odd then we want to callodd_process
. This iseven decision to callhalf
orodd_process
has to be made immediately (not lazily) in order for our graph-building Python code to proceed.
from random import randint
import dask.delayed
def half(x):
1)
sleep(return x // 2
def odd_process(x):
1)
sleep(return 3*x+1
def is_even(x):
return not x % 2
= [randint(0,100) for i in range(8)]
data
= []
result for x in data:
if is_even(x):
result.append(half(x))else:
result.append(odd_process(x))
= sum(result) total
Exercise 8.2
- Parallelize the sequential code above using dask.delayed
- You will need to delay some functions, but not all
- Visualize and check the computed result
Exercise 8.3
- Parallelize the hdf5 conversion from json files
- Create a function
convert_to_hdf
- Use dask.compute function on delayed calls of the funtion created list
- Is it really faster as expected ?
Hint: Read Delayed Best Practices
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
= os.path.join(where) # directory where extract all datafile
datadir if os.path.exists(datadir): # check if this directory exists
print("Extracting data...")
= os.path.join(name) # path to the tgz file
tar_path with tarfile.open(tar_path, mode='r:gz') as data: # open the tgz file
# extract all data file in datadir
data.extractall(datadir)
'data/daily-stock.tgz','data') # this function call will extract json files extract_data(
import dask
import os, sys
from glob import glob
import pandas as pd
import json
= os.getcwd() # get the current directory
here = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.json'))) filenames
5] filenames[:
%rm data/daily-stock/*.h5
Exercise: Parallelizing a Pandas Groupby Reduction
In this exercise we read several CSV files and perform a groupby operation in parallel. We are given sequential code to do this and parallelize it with dask.delayed
.
The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data. We will do this by using dask.delayed
together with pandas
. In a future section we will do this same exercise with dask.dataframe
.
Prep data
First, run this code to prep some data. You don’t need to understand this code.
This extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is taken from here. This should only take a few seconds to run.
Inspect data
Data are in the file data/nycflights.tar.gz
. You can extract them with the command
tar zxvf nycflights.tar.gz
According to your operating system, double click on the file could do the job.
'data/nycflights.tar.gz','data') extract_data(
Read one file with pandas.read_csv
and compute mean departure delay
import pandas as pd
= pd.read_csv(os.path.join("data", "nycflights",'1990.csv'))
df df.head()
# What is the schema?
df.dtypes
# What originating airports are in the data?
df.Origin.unique()
# Mean departure delay per-airport for one year
'Origin').DepDelay.mean() df.groupby(
Sequential code: Mean Departure Delay Per Airport
The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.
from glob import glob
= sorted(glob(os.path.join('data', "nycflights", '*.csv')))
filenames filenames
%%time
= []
sums = []
counts for fn in filenames:
# Read in file
= pd.read_csv(fn)
df
# Groupby origin airport
= df.groupby('Origin')
by_origin
# Sum of all departure delays by origin
= by_origin.DepDelay.sum()
total
# Number of flights by origin
= by_origin.DepDelay.count()
count
# Save the intermediates
sums.append(total)
counts.append(count)
# Combine intermediates to get total mean-delay-per-origin
= sum(sums)
total_delays = sum(counts)
n_flights = total_delays / n_flights mean
mean
Exercise : Parallelize the code above
Use dask.delayed
to parallelize the code above. Some extra things you will need to know.
Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.
= delayed(np.arange)(10) x = (x + 1)[::2].sum() # everything here was delayed y
Calling the
.compute()
method works well when you have a single output. When you have multiple outputs you might want to use thedask.compute
function:>>> x = delayed(np.arange)(10) >>> y = x ** 2 >>> min, max = compute(y.min(), y.max()) 0, 81) (
This way Dask can share the intermediate values (like
y = x**2
)
So your goal is to parallelize the code above (which has been copied below) using dask.delayed
. You may also want to visualize a bit of the computation to see if you’re doing it correctly.