= list(range(1,9))
data data
Dask bag
Dask proposes “big data” collections with a small set of high-level primitives like map
, filter
, groupby
, and join
. With these common patterns we can often handle computations that are more complex than map, but are still structured.
- Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs (“messy” data)
- When you encounter a set of data with a format that does not enforce strict structure and datatypes.
Related Documentation
import dask.bag as db
= db.from_sequence(data) b
# Gather results back to local process b.compute()
map(lambda x : x//2).compute() # compute length of each element and collect results b.
from time import sleep
def slow_half( x):
1)
sleep(return x // 2
= b.map(slow_half)
res res
%%time
res.compute()
res.visualize()
# Cartesian product of each pair
b.product(b).compute() # of elements in two sequences (or the same sequence in this case)
Chain operations to construct more complex computations
filter(lambda x: x % 2 > 0)
(b.
.product(b)filter( lambda v : v[0] % v[1] == 0 and v[0] != v[1])
. .compute())
Daily stock example
Let’s use the bag interface to read the json files containing time series.
Each line is a JSON encoded dictionary with the following keys - timestamp: Day. - close: Stock value at the end of the day. - high: Highest value. - low: Lowest value. - open: Opening price.
# preparing data
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
= os.path.join(where,name)
datadir if not os.path.exists(datadir):
print("Extracting data...")
= os.path.join(where, name+'.tgz')
tar_path with tarfile.open(tar_path, mode='r:gz') as data:
data.extractall(where)
'daily-stock','data') # this function call will extract json files extract_data(
%ls data/daily-stock/*.json
import dask.bag as db
import json
= db.read_text('data/daily-stock/*.json') stocks
stocks.npartitions
stocks.visualize()
import json
= stocks.map(json.loads) js
import os, sys
from glob import glob
import pandas as pd
import json
= os.getcwd() # get the current directory
here = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.json')))
filenames filenames
!rm data/daily-stock/*.h5
from tqdm import tqdm
for fn in tqdm(filenames):
with open(fn) as f:
= [json.loads(line) for line in f]
data
= pd.DataFrame(data)
df
= fn[:-5] + '.h5'
out_filename '/data') df.to_hdf(out_filename,
= sorted(glob(os.path.join(here,'data', 'daily-stock', '*.h5')))
filenames filenames
Serial version
%%time
= {}
series for fn in filenames: # Simple map over filenames
= pd.read_hdf(fn)['close']
series[fn]
= {}
results
for a in filenames: # Doubly nested loop over the same collection
for b in filenames:
if a != b: # Filter out bad elements
= series[a].corr(series[b]) # Apply function
results[a, b]
= max(results.items(), key=lambda kv: kv[1]) # Reduction ((a, b), corr)
a, b, corr
Dask.bag methods
We can construct most of the above computation with the following dask.bag methods:
collection.map(function)
: apply function to each element in collectioncollection.product(collection)
: Create new collection with every pair of inputscollection.filter(predicate)
: Keep only elements of colleciton that match the predicate functioncollection.max()
: Compute maximum element
%%time
import dask.bag as db
= db.from_sequence(filenames)
b = b.map(lambda fn: pd.read_hdf(fn)['close'])
series
= (series.product(series)
corr filter(lambda ab: not (ab[0] == ab[1]).all())
.map(lambda ab: ab[0].corr(ab[1])).max()) .
%%time
= corr.compute() result
result
Wordcount with Dask bag
import lorem
for i in range(20):
with open(f"sample{i:02d}.txt","w") as f:
f.write(lorem.text())
%ls *.txt
import glob
'sample*.txt') glob.glob(
import dask.bag as db
import glob
= db.read_text(glob.glob('sample*.txt'))
b
= (b.str.replace(".","") # remove dots
wordcount str.lower() # lower text
.str.strip() # remove \n and trailing spaces
.str.split() # split into words
.# chain all words lists
.flatten() # compute occurences
.frequencies() 10, lambda x: x[1])) # sort and return top 10 words
.topk(
# Run all tasks and return result wordcount.compute()
Genome example
We will use a Dask bag to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.
- First we will define some functions to split the bases into sequences of a certain size
Exercise 9.1
- Implement a function
group_characters(line, n=5)
to groupn
characters together and return a iterator.line
is a text line in genome.txt file.
>>> line = "abcdefghijklmno"
>>> for seq in group_character(line, 5):
print(seq)
"abcde"
"efghi"
"klmno"
- Implement
group_and_split(line)
>>> group_and_split('abcdefghijklmno')
'abcde', 'fghij', 'klmno'] [
- Use the dask bag to compute the frequencies of sequences of five bases.
Exercise 9.2
The FASTA file format is used to write several genome sequences.
- Create a function that can read a FASTA file and compute the frequencies for n = 5 of a given sequence.
%%file testset.fasta
>SEQUENCE_1
MTEITAAMVKELRESTGAGMMDCKNALSETNGDFDKAVQLLREKGLGKAAKKADRLAAEG
LVSVKVSDDFTIAAMRPSYLSYEDLDMTFVENEYKALVAELEKENEERRRLKDPNKPEHK
IPQFASRKQLSDAILKEAEEKIKEELKAQGKPEKIWDNIIPGKMNSFIADNSQLDSKLTL
MGQFYVMDDKKTVEQVIAEKEKEFGGKIKIVEFICFEVGEGLEKKTEDFAAEVAAQL>SEQUENCE_2
SATVSEINSETDFVAKNDQFIALTKDTTAHIQSNSLQSVEELHSSTINGVKFEEYLKSQI ATIGENLVVRRFATLKAGANGVVNGYIHTNGRVGVVIAAACDSAEVASKSRDLLRQICMH
Exercise 9.3
Write a program that uses the function implemented above to read several FASTA files stored in a Dask bag.
%cat data/genome.txt
Some remarks about bag
- Higher level dask collections include functions for common patterns
- Move data to collection, construct lazy computation, trigger at the end
- Use Dask.bag (
product + map
) to handle nested for loop
Bags have the following known limitations
Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/Pandas
Bag.groupby
is slow. You should try to useBag.foldby
if possible.Check the API
dask.dataframe
can be faster thandask.bag
. But sometimes it is easier to load and clean messy data with a bag. We will see later how to transform a bag into adask.dataframe
with the to_dataframe method.