from time import sleep
def f(x):
1)
sleep(return x*x
= list(range(8))
L L
Parallel Computation
This notebook objective is to learn how to parallelize application with Python
Parallel computers
- Multiprocessor/multicore: several processors work on data stored in shared memory
- Cluster: several processor/memory units work together by exchanging data over a network
- Co-processor: a general-purpose processor delegates specific tasks to a special-purpose processor (GPU)
Parallel Programming
- Decomposition of the complete task into independent subtasks and the data flow between them.
- Distribution of the subtasks over the processors minimizing the total execution time.
- For clusters: distribution of the data over the nodes minimizing the communication time.
- For multiprocessors: optimization of the memory access patterns minimizing waiting times.
- Synchronization of the individual processes.
MapReduce
%time sum(f(x) for x in L)
%time sum(map(f,L))
Multiprocessing
multiprocessing
is a package that supports spawning processes.
We can use it to display how many concurrent processes you can launch on your computer.
from multiprocessing import cpu_count
cpu_count()
Futures
The concurrent.futures
module provides a high-level interface for asynchronously executing callables.
The asynchronous execution can be performed with: - threads, using ThreadPoolExecutor, - separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.
concurrent.futures
can’t launch processes on windows. Windows users must install loky.
%%file pmap.py
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
def f(x):
1)
sleep(return x*x
= list(range(8))
L
if __name__ == '__main__':
= time()
begin with ProcessPoolExecutor() as pool:
= sum(pool.map(f, L))
result = time()
end
print(f"result = {result} and time = {end-begin}")
import sys
!{sys.executable} pmap.py
ProcessPoolExecutor
launches one slave process per physical core on the computer.pool.map
divides the input list into chunks and puts the tasks (function + chunk) on a queue.- Each slave process takes a task (function + a chunk of data), runs map(function, chunk), and puts the result on a result list.
pool.map
on the master process waits until all tasks are handled and returns the concatenation of the result lists.
%%time
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
= sum(pool.map(f, L))
results
print(results)
Thread and Process: Differences
A process is an instance of a running program.
Process may contain one or more threads, but a thread cannot contain a process.
Process has a self-contained execution environment. It has its own memory space.
Application running on your computer may be a set of cooperating processes.
Process don’t share its memory, communication between processes implies data serialization.
A thread is made of and exist within a process; every process has at least one thread.
Multiple threads in a process share resources, which helps in efficient communication between threads.
Threads can be concurrent on a multi-core system, with every core executing the separate threads simultaneously.
The Global Interpreter Lock (GIL)
- The Python interpreter is not thread safe.
- A few critical internal data structures may only be accessed by one thread at a time. Access to them is protected by the GIL.
- Attempts at removing the GIL from Python have failed until now. The main difficulty is maintaining the C API for extension modules.
- Multiprocessing avoids the GIL by having separate processes which each have an independent copy of the interpreter data structures.
- The price to pay: serialization of tasks, arguments, and results.
Parallelize text files downloads
- Victor Hugo http://www.gutenberg.org/files/135/135-0.txt
- Marcel Proust http://www.gutenberg.org/files/7178/7178-8.txt
- Emile Zola http://www.gutenberg.org/files/1069/1069-0.txt
- Stendhal http://www.gutenberg.org/files/44747/44747-0.txt
Exercise 6.1
Use ThreadPoolExecutor
to parallelize the code above.
%mkdir books
%%time
import urllib.request as url
= "https://mmassd.github.io/" # "http://svmass2.mass.uhb.fr/hub/static/datasets/"
source +"books/hugo.txt", filename="books/hugo.txt")
url.urlretrieve(source+"books/proust.txt", filename="books/proust.txt")
url.urlretrieve(source+"books/zola.txt", filename="books/zola.txt")
url.urlretrieve(source+"books/stendhal.txt", filename="books/stendhal.txt") url.urlretrieve(source
Wordcount
from glob import glob
from collections import defaultdict
from operator import itemgetter
from itertools import chain
from concurrent.futures import ThreadPoolExecutor
def mapper(filename):
" split text to list of key/value pairs (word,1)"
with open(filename) as f:
= f.read()
data
= data.strip().replace(".","").lower().split()
data
return sorted([(w,1) for w in data])
def partitioner(mapped_values):
""" get lists from mapper and create a dict with
(word,[1,1,1])"""
= defaultdict(list)
res for w, c in mapped_values:
res[w].append(c)
return res.items()
def reducer( item ):
""" Compute words occurences from dict computed
by partioner
"""
= item
w, v return (w,len(v))
Parallel map
- Let’s improve the
mapper
function by print out inside the function the current process name.
Example
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def process_name(n):
" prints out the current process name "
print(f"{mp.current_process().name} ")
with ProcessPoolExecutor() as e:
= e.map(process_name, range(mp.cpu_count())) _
Exercise 6.2
- Modify the mapper function by adding this print.
Parallel reduce
- For parallel reduce operation, data must be aligned in a container. We already created a
partitioner
function that returns this container.
Exercise 6.3
Write a parallel program that uses the three functions above using ThreadPoolExecutor
. It reads all the “sample*.txt” files. Map and reduce steps are parallel.
Increase volume of data
Due to the proxy, code above is not runnable on workstations
Getting the data
- The Latin Library contains a huge collection of freely accessible Latin texts. We get links on the Latin Library’s homepage ignoring some links that are not associated with a particular author.
from bs4 import BeautifulSoup # web scraping library
from urllib.request import *
= "http://www.thelatinlibrary.com/"
base_url = urlopen(base_url)
home_content
= BeautifulSoup(home_content, "lxml")
soup = soup.find_all("a")
author_page_links = [ap["href"] for i, ap in enumerate(author_page_links) if i < 49] author_pages
Generate html links
- Create a list of all links pointing to Latin texts. The Latin Library uses a special format which makes it easy to find the corresponding links: All of these links contain the name of the text author.
= list()
ap_content for ap in author_pages:
+ ap))
ap_content.append(urlopen(base_url
= list()
book_links for path, content in zip(author_pages, ap_content):
= path.split(".")[0]
author_name = BeautifulSoup(content, "lxml")
ap_soup += ([link for link in ap_soup.find_all("a", {"href": True}) if author_name in link["href"]]) book_links
Download webpages content
from urllib.error import HTTPError
= 100
num_pages
for i, bl in enumerate(book_links[:num_pages]):
print("Getting content " + str(i + 1) + " of " + str(num_pages), end="\r", flush=True)
try:
= urlopen(base_url + bl["href"]).read()
content with open(f"book-{i:03d}.dat","wb") as f:
f.write(content)except HTTPError as err:
print("Unable to retrieve " + bl["href"] + ".")
continue
Extract data files
- I already put the content of pages in files named book-*.txt
- You can extract data from the archive by running the cell below
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data():
= os.path.join('data','latinbooks')
datadir if not os.path.exists(datadir):
print("Extracting data...")
= os.path.join('data', 'latinbooks.tgz')
tar_path with tarfile.open(tar_path, mode='r:gz') as books:
'data')
books.extractall(
# this function call will extract text files in data/latinbooks extract_data()
Read data files
from glob import glob
= glob('book*.dat')
files = list()
texts for file in files:
with open(file,'rb') as f:
= f.read()
text texts.append(text)
Extract the text from html and split the text at periods to convert it into sentences.
%%time
from bs4 import BeautifulSoup
= list()
sentences
for i, text in enumerate(texts):
print("Document " + str(i + 1) + " of " + str(len(texts)), end="\r", flush=True)
= BeautifulSoup(text, "lxml")
textSoup = textSoup.find_all("p", attrs={"class":None})
paragraphs = ("".join([p.text.strip().lower() for p in paragraphs[1:-1]]))
prepared for t in prepared.split("."):
= "".join([c for c in t if c.isalpha() or c.isspace()])
part
sentences.append(part.strip())
# print first and last sentence to check the results
print(sentences[0])
print(sentences[-1])
Exercise 6.4
Parallelize this last process using concurrent.futures
.