Parallel Computation#

This notebook objective is to learn how to parallelize application with Python

Parallel computers#

  • Multiprocessor/multicore: several processors work on data stored in shared memory

  • Cluster: several processor/memory units work together by exchanging data over a network

  • Co-processor: a general-purpose processor delegates specific tasks to a special-purpose processor (GPU)

Parallel Programming#

  • Decomposition of the complete task into independent subtasks and the data flow between them.

  • Distribution of the subtasks over the processors minimizing the total execution time.

  • For clusters: distribution of the data over the nodes minimizing the communication time.

  • For multiprocessors: optimization of the memory access patterns minimizing waiting times.

  • Synchronization of the individual processes.

MapReduce#

from time import sleep
def f(x):
    sleep(1)
    return x*x
L = list(range(8))
L
%time sum(f(x) for x in L)
%time sum(map(f,L))

Multiprocessing#

multiprocessing is a package that supports spawning processes.

We can use it to display how many concurrent processes you can launch on your computer.

from multiprocessing import cpu_count

cpu_count()

Futures#

The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with:

  • threads, using ThreadPoolExecutor,

  • separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

concurrent.futures can’t launch processes on windows. Windows users must install loky.

%%file pmap.py
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def f(x):
    sleep(1)
    return x*x

L = list(range(8))

if __name__ == '__main__':
    
    begin = time()
    with ProcessPoolExecutor() as pool:

        result = sum(pool.map(f, L))
    end = time()
    
    print(f"result = {result} and time = {end-begin}")
import sys
!{sys.executable} pmap.py
  • ProcessPoolExecutor launches one slave process per physical core on the computer.

  • pool.map divides the input list into chunks and puts the tasks (function + chunk) on a queue.

  • Each slave process takes a task (function + a chunk of data), runs map(function, chunk), and puts the result on a result list.

  • pool.map on the master process waits until all tasks are handled and returns the concatenation of the result lists.

%%time
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as pool:

    results = sum(pool.map(f, L))
    
print(results)

Thread and Process: Differences#

  • A process is an instance of a running program.

  • Process may contain one or more threads, but a thread cannot contain a process.

  • Process has a self-contained execution environment. It has its own memory space.

  • Application running on your computer may be a set of cooperating processes.

  • Process don’t share its memory, communication between processes implies data serialization.

  • A thread is made of and exist within a process; every process has at least one thread.

  • Multiple threads in a process share resources, which helps in efficient communication between threads.

  • Threads can be concurrent on a multi-core system, with every core executing the separate threads simultaneously.

The Global Interpreter Lock (GIL)#

  • The Python interpreter is not thread safe.

  • A few critical internal data structures may only be accessed by one thread at a time. Access to them is protected by the GIL.

  • Attempts at removing the GIL from Python have failed until now. The main difficulty is maintaining the C API for extension modules.

  • Multiprocessing avoids the GIL by having separate processes which each have an independent copy of the interpreter data structures.

  • The price to pay: serialization of tasks, arguments, and results.

Parallelize text files downloads#

Exercise 6.1#

Use ThreadPoolExecutor to parallelize the code above.

%mkdir books
%%time
import urllib.request as url
source = "https://mmassd.github.io/"  # "http://svmass2.mass.uhb.fr/hub/static/datasets/"
url.urlretrieve(source+"books/hugo.txt",     filename="books/hugo.txt")
url.urlretrieve(source+"books/proust.txt",   filename="books/proust.txt")
url.urlretrieve(source+"books/zola.txt",     filename="books/zola.txt")
url.urlretrieve(source+"books/stendhal.txt", filename="books/stendhal.txt")

Wordcount#

from glob import glob
from collections import defaultdict
from operator import itemgetter
from itertools import chain
from concurrent.futures import ThreadPoolExecutor

def mapper(filename):
    " split text to list of key/value pairs (word,1)"

    with open(filename) as f:
        data = f.read()
        
    data = data.strip().replace(".","").lower().split()
        
    return sorted([(w,1) for w in data])

def partitioner(mapped_values):
    """ get lists from mapper and create a dict with
    (word,[1,1,1])"""
    
    res = defaultdict(list)
    for w, c in mapped_values:
        res[w].append(c)
        
    return res.items()

def reducer( item ):
    """ Compute words occurences from dict computed
    by partioner
    """
    w, v = item
    return (w,len(v))

Parallel map#

  • Let’s improve the mapper function by print out inside the function the current process name.

Example

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

def process_name(n):
    " prints out the current process name "
    print(f"{mp.current_process().name} ")

with ProcessPoolExecutor() as e:
    _ = e.map(process_name, range(mp.cpu_count()))

Exercise 6.2#

  • Modify the mapper function by adding this print.

Parallel reduce#

  • For parallel reduce operation, data must be aligned in a container. We already created a partitioner function that returns this container.

Exercise 6.3#

Write a parallel program that uses the three functions above using ThreadPoolExecutor. It reads all the “sample*.txt” files. Map and reduce steps are parallel.

Increase volume of data#

Due to the proxy, code above is not runnable on workstations

Getting the data#

  • The Latin Library contains a huge collection of freely accessible Latin texts. We get links on the Latin Library’s homepage ignoring some links that are not associated with a particular author.

from bs4 import BeautifulSoup  # web scraping library
from urllib.request import *

base_url = "http://www.thelatinlibrary.com/"
home_content = urlopen(base_url)

soup = BeautifulSoup(home_content, "lxml")
author_page_links = soup.find_all("a")
author_pages = [ap["href"] for i, ap in enumerate(author_page_links) if i < 49]

Download webpages content#

from urllib.error import HTTPError

num_pages = 100

for i, bl in enumerate(book_links[:num_pages]):
    print("Getting content " + str(i + 1) + " of " + str(num_pages), end="\r", flush=True)
    try:
        content = urlopen(base_url + bl["href"]).read()
        with open(f"book-{i:03d}.dat","wb") as f:
            f.write(content)
    except HTTPError as err:
        print("Unable to retrieve " + bl["href"] + ".")
        continue

Extract data files#

  • I already put the content of pages in files named book-*.txt

  • You can extract data from the archive by running the cell below

import os  # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives

def extract_data():
    datadir = os.path.join('data','latinbooks')
    if not os.path.exists(datadir):
       print("Extracting data...")
       tar_path = os.path.join('data', 'latinbooks.tgz')
       with tarfile.open(tar_path, mode='r:gz') as books:
          books.extractall('data')
            
extract_data() # this function call will extract text files in data/latinbooks

Read data files#

from glob import glob
files = glob('book*.dat')
texts = list()
for file in files:
    with open(file,'rb') as f:
        text = f.read()
    texts.append(text)

Extract the text from html and split the text at periods to convert it into sentences.#

%%time
from bs4 import BeautifulSoup

sentences = list()

for i, text in enumerate(texts):
    print("Document " + str(i + 1) + " of " + str(len(texts)), end="\r", flush=True)
    textSoup = BeautifulSoup(text, "lxml")
    paragraphs = textSoup.find_all("p", attrs={"class":None})
    prepared = ("".join([p.text.strip().lower() for p in paragraphs[1:-1]]))
    for t in prepared.split("."):
        part = "".join([c for c in t if c.isalpha() or c.isspace()])
        sentences.append(part.strip())

# print first and last sentence to check the results
print(sentences[0])
print(sentences[-1])

Exercise 6.4#

Parallelize this last process using concurrent.futures.

References#