Asynchronous Processing#
While many parallel applications can be described as maps, some can be more complex.
In this section we look at the asynchronous concurrent.futures
interface,
which provides a simple API for ad-hoc parallelism.
This is useful for when your computations don’t fit a regular pattern.
Executor.submit#
The submit
method starts a computation in a separate thread or process and immediately gives us a Future
object that refers to the result. At first, the future is pending. Once the function completes the future is finished.
We collect the result of the task with the .result()
method,
which does not return until the results are available.
%%time
from time import sleep
def slowadd(a, b, delay=1):
sleep(delay)
return a + b
slowadd(1,1)
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(1) as e:
future = e.submit(slowadd, 1, 2)
print(future.result())
Submit many tasks, receive many futures#
Because submit returns immediately we can submit many tasks all at once and they will execute in parallel.
%%time
results = [slowadd(i, i, delay=1) for i in range(8)]
print(results)
%%time
e = ThreadPoolExecutor()
futures = [e.submit(slowadd, i, i, delay=1) for i in range(8)]
results = [f.result() for f in futures]
print(results)
Submit fires off a single function call in the background, returning a future.
When we combine submit with a single for loop we recover the functionality of map.
When we want to collect our results we replace each of our futures,
f
, with a call tof.result()
We can combine submit with multiple for loops and other general programming to get something more general than map.
Exercise 7.1#
Parallelize the following code with e.submit
Replace the
results
list with a list calledfutures
Replace calls to
slowadd
andslowsub
withe.submit
calls on those functionsAt the end, block on the computation by recreating the
results
list by calling.result()
on each future in thefutures
list.
Extract daily stock data from google#
!rm -rf data/daily-stock
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
datadir = os.path.join(where,name)
if not os.path.exists(datadir):
print("Extracting data...")
tar_path = os.path.join(where, name+'.tgz')
with tarfile.open(tar_path, mode='r:gz') as data:
data.extractall(where)
extract_data('daily-stock','data') # this function call will extract json files
Convert data to pandas DataFrames and save it in hdf5 files#
HDF5 is a data model, library, and file format for storing and managing data. This format is widely used and is supported by many languages and platforms.
import json
import pandas as pd
import os, glob
here = os.getcwd()
datadir = os.path.join(here,'data','daily-stock')
filenames = sorted(glob.glob(os.path.join(datadir, '*.json')))
filenames
Sequential version#
!rm -f data/daily-stock/*.h5
%%time
import json
import pandas as pd
from tqdm import tqdm
for fn in tqdm(filenames):
with open(fn) as f:
data = [json.loads(line) for line in f] # load
df = pd.DataFrame(data) # parse
out_filename = fn[:-5] + '.h5'
df.to_hdf(out_filename, '/data') # store
!rm -f data/daily-stock/*.h5
Exercise 7.2#
Parallelize the loop above using ThreadPoolExecutor
and map
.
Read files and load dataframes.#
import os, glob, pandas as pd
filenames = sorted(glob.glob(os.path.join('data', 'daily-stock', '*.h5')))
series ={}
for fn in filenames:
series[fn] = pd.read_hdf(fn)['close']
Application#
Given our HDF5 files from the last section we want to find the two datasets with the greatest pair-wise correlation. This forces us to consider all \(n\times(n-1)\) possibilities.
We use matplotlib to visually inspect the highly correlated timeseries
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 4))
plt.plot(series[a]/series[a].max())
plt.plot(series[b]/series[b].max())
plt.xticks(visible=False);
Analysis#
This computation starts out by loading data from disk. We already know how to parallelize it:
series = {}
for fn in filenames:
series[fn] = pd.read_hdf(fn)['x']
It follows with a doubly nested for loop with an if statement.
results = {}
for a in filenames:
for b in filenames:
if a != b:
results[a, b] = series[a].corr(series[b])
It is possible to solve this problem with map
, but it requires some cleverness. Instead we’ll learn submit
, an interface to start individual function calls asynchronously.
It finishes with a reduction on small data. This part is fast enough.
((a, b), corr) = max(results.items(), key=lambda kv: kv[1])
Exercise 7.3#
Parallelize pair-wise correlations with
e.submit
Implement two versions one using Processes, another with Threads by replacing
e
with a ProcessPoolExecutor:
Threads#
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(4)
Processes#
Be careful, a ProcessPoolExecutor
does not run in the jupyter notebook cell.
You must run your file in a terminal.
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor(4)
How does performance vary?
Some conclusions about futures#
submit
functions can help us to parallelize more complex applicationsIt didn’t actually speed up the code very much
Threads and Processes give some performance differences
This is not very robust.