data = list(range(1,9))
dataDask bag
Dask proposes “big data” collections with a small set of high-level primitives like map, filter, groupby, and join. With these common patterns we can often handle computations that are more complex than map, but are still structured.
- Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs (“messy” data)
- When you encounter a set of data with a format that does not enforce strict structure and datatypes.
Related Documentation
import dask.bag as db
b = db.from_sequence(data)b.compute() # Gather results back to local processb.map(lambda x : x//2).compute() # compute length of each element and collect resultsfrom time import sleep
def slow_half( x):
sleep(1)
return x // 2
res = b.map(slow_half)
res%%time
res.compute()res.visualize()b.product(b).compute() # Cartesian product of each pair
# of elements in two sequences (or the same sequence in this case)Chain operations to construct more complex computations
(b.filter(lambda x: x % 2 > 0)
.product(b)
.filter( lambda v : v[0] % v[1] == 0 and v[0] != v[1])
.compute())Daily stock example
Let’s use the bag interface to read the json files containing time series.
Each line is a JSON encoded dictionary with the following keys - timestamp: Day. - close: Stock value at the end of the day. - high: Highest value. - low: Lowest value. - open: Opening price.
# preparing data
import os # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives
def extract_data(name, where):
datadir = os.path.join(where,name)
if not os.path.exists(datadir):
print("Extracting data...")
tar_path = os.path.join(where, name+'.tgz')
with tarfile.open(tar_path, mode='r:gz') as data:
data.extractall(where)
extract_data('daily-stock','data') # this function call will extract json files%ls data/daily-stock/*.jsonimport dask.bag as db
import json
stocks = db.read_text('data/daily-stock/*.json')stocks.npartitionsstocks.visualize()import json
js = stocks.map(json.loads)import os, sys
from glob import glob
import pandas as pd
import json
here = os.getcwd() # get the current directory
filenames = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.json')))
filenames!rm data/daily-stock/*.h5from tqdm import tqdm
for fn in tqdm(filenames):
with open(fn) as f:
data = [json.loads(line) for line in f]
df = pd.DataFrame(data)
out_filename = fn[:-5] + '.h5'
df.to_hdf(out_filename, '/data')filenames = sorted(glob(os.path.join(here,'data', 'daily-stock', '*.h5')))
filenamesSerial version
%%time
series = {}
for fn in filenames: # Simple map over filenames
series[fn] = pd.read_hdf(fn)['close']
results = {}
for a in filenames: # Doubly nested loop over the same collection
for b in filenames:
if a != b: # Filter out bad elements
results[a, b] = series[a].corr(series[b]) # Apply function
((a, b), corr) = max(results.items(), key=lambda kv: kv[1]) # Reductiona, b, corrDask.bag methods
We can construct most of the above computation with the following dask.bag methods:
collection.map(function): apply function to each element in collectioncollection.product(collection): Create new collection with every pair of inputscollection.filter(predicate): Keep only elements of colleciton that match the predicate functioncollection.max(): Compute maximum element
%%time
import dask.bag as db
b = db.from_sequence(filenames)
series = b.map(lambda fn: pd.read_hdf(fn)['close'])
corr = (series.product(series)
.filter(lambda ab: not (ab[0] == ab[1]).all())
.map(lambda ab: ab[0].corr(ab[1])).max())%%time
result = corr.compute()resultWordcount with Dask bag
import lorem
for i in range(20):
with open(f"sample{i:02d}.txt","w") as f:
f.write(lorem.text())%ls *.txtimport glob
glob.glob('sample*.txt')import dask.bag as db
import glob
b = db.read_text(glob.glob('sample*.txt'))
wordcount = (b.str.replace(".","") # remove dots
.str.lower() # lower text
.str.strip() # remove \n and trailing spaces
.str.split() # split into words
.flatten() # chain all words lists
.frequencies() # compute occurences
.topk(10, lambda x: x[1])) # sort and return top 10 words
wordcount.compute() # Run all tasks and return resultGenome example
We will use a Dask bag to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.
- First we will define some functions to split the bases into sequences of a certain size
Exercise 9.1
- Implement a function
group_characters(line, n=5)to groupncharacters together and return a iterator.lineis a text line in genome.txt file.
>>> line = "abcdefghijklmno"
>>> for seq in group_character(line, 5):
print(seq)
"abcde"
"efghi"
"klmno"- Implement
group_and_split(line)
>>> group_and_split('abcdefghijklmno')
['abcde', 'fghij', 'klmno']- Use the dask bag to compute the frequencies of sequences of five bases.
Exercise 9.2
The FASTA file format is used to write several genome sequences.
- Create a function that can read a FASTA file and compute the frequencies for n = 5 of a given sequence.
%%file testset.fasta
>SEQUENCE_1
MTEITAAMVKELRESTGAGMMDCKNALSETNGDFDKAVQLLREKGLGKAAKKADRLAAEG
LVSVKVSDDFTIAAMRPSYLSYEDLDMTFVENEYKALVAELEKENEERRRLKDPNKPEHK
IPQFASRKQLSDAILKEAEEKIKEELKAQGKPEKIWDNIIPGKMNSFIADNSQLDSKLTL
MGQFYVMDDKKTVEQVIAEKEKEFGGKIKIVEFICFEVGEGLEKKTEDFAAEVAAQL
>SEQUENCE_2
SATVSEINSETDFVAKNDQFIALTKDTTAHIQSNSLQSVEELHSSTINGVKFEEYLKSQI
ATIGENLVVRRFATLKAGANGVVNGYIHTNGRVGVVIAAACDSAEVASKSRDLLRQICMHExercise 9.3
Write a program that uses the function implemented above to read several FASTA files stored in a Dask bag.
%cat data/genome.txtSome remarks about bag
- Higher level dask collections include functions for common patterns
- Move data to collection, construct lazy computation, trigger at the end
- Use Dask.bag (
product + map) to handle nested for loop
Bags have the following known limitations
Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/Pandas
Bag.groupbyis slow. You should try to useBag.foldbyif possible.Check the API
dask.dataframecan be faster thandask.bag. But sometimes it is easier to load and clean messy data with a bag. We will see later how to transform a bag into adask.dataframewith the to_dataframe method.