Dask bag#
Dask proposes “big data” collections with a small set of high-level primitives like map
, filter
, groupby
, and join
. With these common patterns we can often handle computations that are more complex than map, but are still structured.
Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs (“messy” data)
When you encounter a set of data with a format that does not enforce strict structure and datatypes.
Related Documentation
data = list(range(1,9))
data
import dask.bag as db
b = db.from_sequence(data)
b.compute() # Gather results back to local process
b.map(lambda x : x//2).compute() # compute length of each element and collect results
from time import sleep
def slow_half( x):
sleep(1)
return x // 2
res = b.map(slow_half)
res
%%time
res.compute()
res.visualize()
b.product(b).compute() # Cartesian product of each pair
# of elements in two sequences (or the same sequence in this case)
Chain operations to construct more complex computations
(b.filter(lambda x: x % 2 > 0)
.product(b)
.filter( lambda v : v[0] % v[1] == 0 and v[0] != v[1])
.compute())
Daily stock example#
Let’s use the bag interface to read the json files containing time series.
Each line is a JSON encoded dictionary with the following keys
timestamp: Day.
close: Stock value at the end of the day.
high: Highest value.
low: Lowest value.
open: Opening price.
# preparing data
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
datadir = os.path.join(where,name)
if not os.path.exists(datadir):
print("Extracting data...")
tar_path = os.path.join(where, name+'.tgz')
with tarfile.open(tar_path, mode='r:gz') as data:
data.extractall(where)
extract_data('daily-stock','data') # this function call will extract json files
%ls data/daily-stock/*.json
import dask.bag as db
import json
stocks = db.read_text('data/daily-stock/*.json')
stocks.npartitions
stocks.visualize()
import json
js = stocks.map(json.loads)
import os, sys
from glob import glob
import pandas as pd
import json
here = os.getcwd() # get the current directory
filenames = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.json')))
filenames
!rm data/daily-stock/*.h5
from tqdm import tqdm
for fn in tqdm(filenames):
with open(fn) as f:
data = [json.loads(line) for line in f]
df = pd.DataFrame(data)
out_filename = fn[:-5] + '.h5'
df.to_hdf(out_filename, '/data')
filenames = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.h5')))
filenames
Serial version#
%%time
series = {}
for fn in filenames: # Simple map over filenames
series[fn] = pd.read_hdf(fn)['close']
results = {}
for a in filenames: # Doubly nested loop over the same collection
for b in filenames:
if a != b: # Filter out bad elements
results[a, b] = series[a].corr(series[b]) # Apply function
((a, b), corr) = max(results.items(), key=lambda kv: kv[1]) # Reduction
a, b, corr
Dask.bag methods#
We can construct most of the above computation with the following dask.bag methods:
collection.map(function)
: apply function to each element in collectioncollection.product(collection)
: Create new collection with every pair of inputscollection.filter(predicate)
: Keep only elements of colleciton that match the predicate functioncollection.max()
: Compute maximum element
%%time
import dask.bag as db
b = db.from_sequence(filenames)
series = b.map(lambda fn: pd.read_hdf(fn)['close'])
corr = (series.product(series)
.filter(lambda ab: not (ab[0] == ab[1]).all())
.map(lambda ab: ab[0].corr(ab[1])).max())
%%time
result = corr.compute()
result
Wordcount with Dask bag#
import lorem
for i in range(20):
with open(f"sample{i:02d}.txt","w") as f:
f.write(lorem.text())
%ls *.txt
import glob
glob.glob('sample*.txt')
import dask.bag as db
import glob
b = db.read_text(glob.glob('sample*.txt'))
wordcount = (b.str.replace(".","") # remove dots
.str.lower() # lower text
.str.strip() # remove \n and trailing spaces
.str.split() # split into words
.flatten() # chain all words lists
.frequencies() # compute occurences
.topk(10, lambda x: x[1])) # sort and return top 10 words
wordcount.compute() # Run all tasks and return result
Genome example#
We will use a Dask bag to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.
First we will define some functions to split the bases into sequences of a certain size
Exercise 9.1#
Implement a function
group_characters(line, n=5)
to groupn
characters together and return a iterator.line
is a text line in genome.txt file.
>>> line = "abcdefghijklmno"
>>> for seq in group_character(line, 5):
print(seq)
"abcde"
"efghi"
"klmno"
Implement
group_and_split(line)
>>> group_and_split('abcdefghijklmno')
['abcde', 'fghij', 'klmno']
Use the dask bag to compute the frequencies of sequences of five bases.
Exercise 9.2#
The FASTA file format is used to write several genome sequences.
Create a function that can read a FASTA file and compute the frequencies for n = 5 of a given sequence.
%%file testset.fasta
>SEQUENCE_1
MTEITAAMVKELRESTGAGMMDCKNALSETNGDFDKAVQLLREKGLGKAAKKADRLAAEG
LVSVKVSDDFTIAAMRPSYLSYEDLDMTFVENEYKALVAELEKENEERRRLKDPNKPEHK
IPQFASRKQLSDAILKEAEEKIKEELKAQGKPEKIWDNIIPGKMNSFIADNSQLDSKLTL
MGQFYVMDDKKTVEQVIAEKEKEFGGKIKIVEFICFEVGEGLEKKTEDFAAEVAAQL
>SEQUENCE_2
SATVSEINSETDFVAKNDQFIALTKDTTAHIQSNSLQSVEELHSSTINGVKFEEYLKSQI
ATIGENLVVRRFATLKAGANGVVNGYIHTNGRVGVVIAAACDSAEVASKSRDLLRQICMH
Exercise 9.3#
Write a program that uses the function implemented above to read several FASTA files stored in a Dask bag.
%cat data/genome.txt
Some remarks about bag#
Higher level dask collections include functions for common patterns
Move data to collection, construct lazy computation, trigger at the end
Use Dask.bag (
product + map
) to handle nested for loop
Bags have the following known limitations
Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/Pandas
Bag.groupby
is slow. You should try to useBag.foldby
if possible.Check the API
dask.dataframe
can be faster thandask.bag
. But sometimes it is easier to load and clean messy data with a bag. We will see later how to transform a bag into adask.dataframe
with the to_dataframe method.