%%time
from time import sleep
def slowadd(a, b, delay=1):
sleep(delay)return a + b
1,1) slowadd(
Asynchronous Processing
While many parallel applications can be described as maps, some can be more complex. In this section we look at the asynchronous concurrent.futures
interface, which provides a simple API for ad-hoc parallelism. This is useful for when your computations don’t fit a regular pattern.
Executor.submit
The submit
method starts a computation in a separate thread or process and immediately gives us a Future
object that refers to the result. At first, the future is pending. Once the function completes the future is finished.
We collect the result of the task with the .result()
method, which does not return until the results are available.
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(1) as e:
= e.submit(slowadd, 1, 2)
future print(future.result())
Submit many tasks, receive many futures
Because submit returns immediately we can submit many tasks all at once and they will execute in parallel.
%%time
= [slowadd(i, i, delay=1) for i in range(8)]
results print(results)
%%time
= ThreadPoolExecutor()
e = [e.submit(slowadd, i, i, delay=1) for i in range(8)]
futures = [f.result() for f in futures]
results print(results)
- Submit fires off a single function call in the background, returning a future.
- When we combine submit with a single for loop we recover the functionality of map.
- When we want to collect our results we replace each of our futures,
f
, with a call tof.result()
- We can combine submit with multiple for loops and other general programming to get something more general than map.
Exercise 7.1
Parallelize the following code with e.submit
- Replace the
results
list with a list calledfutures
- Replace calls to
slowadd
andslowsub
withe.submit
calls on those functions - At the end, block on the computation by recreating the
results
list by calling.result()
on each future in thefutures
list.
Extract daily stock data from google
!rm -rf data/daily-stock
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
= os.path.join(where,name)
datadir if not os.path.exists(datadir):
print("Extracting data...")
= os.path.join(where, name+'.tgz')
tar_path with tarfile.open(tar_path, mode='r:gz') as data:
data.extractall(where)
'daily-stock','data') # this function call will extract json files extract_data(
Convert data to pandas DataFrames and save it in hdf5 files
HDF5 is a data model, library, and file format for storing and managing data. This format is widely used and is supported by many languages and platforms.
import json
import pandas as pd
import os, glob
= os.getcwd()
here = os.path.join(here,'data','daily-stock')
datadir = sorted(glob.glob(os.path.join(datadir, '*.json')))
filenames filenames
Sequential version
!rm -f data/daily-stock/*.h5
%%time
import json
import pandas as pd
from tqdm import tqdm
for fn in tqdm(filenames):
with open(fn) as f:
= [json.loads(line) for line in f] # load
data
= pd.DataFrame(data) # parse
df
= fn[:-5] + '.h5'
out_filename '/data') # store
df.to_hdf(out_filename,
!rm -f data/daily-stock/*.h5
Exercise 7.2
Parallelize the loop above using ThreadPoolExecutor
and map
.
Read files and load dataframes.
import os, glob, pandas as pd
= sorted(glob.glob(os.path.join('data', 'daily-stock', '*.h5')))
filenames ={}
series for fn in filenames:
= pd.read_hdf(fn)['close'] series[fn]
Application
Given our HDF5 files from the last section we want to find the two datasets with the greatest pair-wise correlation. This forces us to consider all \(n\times(n-1)\) possibilities.
We use matplotlib to visually inspect the highly correlated timeseries
%matplotlib inline
import matplotlib.pyplot as plt
=(10, 4))
plt.figure(figsize/series[a].max())
plt.plot(series[a]/series[b].max())
plt.plot(series[b]=False); plt.xticks(visible
Analysis
This computation starts out by loading data from disk. We already know how to parallelize it:
= {}
series for fn in filenames:
= pd.read_hdf(fn)['x'] series[fn]
It follows with a doubly nested for loop with an if statement.
= {}
results for a in filenames:
for b in filenames:
if a != b:
= series[a].corr(series[b]) results[a, b]
It is possible to solve this problem with map
, but it requires some cleverness. Instead we’ll learn submit
, an interface to start individual function calls asynchronously.
It finishes with a reduction on small data. This part is fast enough.
= max(results.items(), key=lambda kv: kv[1]) ((a, b), corr)
Exercise 7.3
- Parallelize pair-wise correlations with
e.submit
- Implement two versions one using Processes, another with Threads by replacing
e
with a ProcessPoolExecutor:
Threads
from concurrent.futures import ThreadPoolExecutor
= ThreadPoolExecutor(4) e
Processes
Be careful, a ProcessPoolExecutor
does not run in the jupyter notebook cell. You must run your file in a terminal.
from concurrent.futures import ProcessPoolExecutor
= ProcessPoolExecutor(4) e
- How does performance vary?
Some conclusions about futures
submit
functions can help us to parallelize more complex applications- It didn’t actually speed up the code very much
- Threads and Processes give some performance differences
- This is not very robust.