Dask dataframes on HDFS#

To use Dask dataframes in parallel across an HDFS cluster to read CSV data. We can coordinate these computations with distributed and dask.dataframe.

As Spark, Dask can work in cluster mode. There is several ways to launch a cluster.

Local cluster#

from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster

Remote clusters via SSH#

Code below can be used to launch a Dask SSH cluster on svmass2 server.

from dask.distributed import SSHCluster

svpes = [f"svpe{i:1d}" for i in range(1,10)]
print(svpes)
cluster = SSHCluster(["localhost"] + svpes)
cluster

Yarn cluster#

Follow these instructions to create the environment file.

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster where each worker has two cores and eight GiB of memory
cluster = YarnCluster(environment='environment.tar.gz',
                      worker_vcores=2,
                      worker_memory="8GiB")
# Scale out to ten such workers
cluster.scale(10)

# Connect to the cluster
client = Client(cluster)

SLURM Cluster#

You can use the dask module dask_jobqueue to launch a Dask cluster with the job manager SLURM.

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(cores=16,
             queue='test',
             project='myproject',
             memory="16GB",
             walltime="01:00:00")

The cluster generates a traditional job script and submits that an appropriate number of times to the job queue. You can see the job script that it will generate as follows:

print(cluster.job_script())
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p test
#SBATCH -A myproject
#SBATCH -n 1
#SBATCH --cpus-per-task=16
#SBATCH --mem=15G
#SBATCH -t 01:00:00

/opt/tljh/user/envs/big-data/bin/python -m distributed.cli.dask_worker tcp://192.168.2.54:40623 --nthreads 4 --nprocs 4 --memory-limit 4.00GB --name name --nanny --death-timeout 60

Use the script above to submit your dask pipeline to the HPC server of your insttitution.

New York City Taxi Cab Trip#

We look at the New York City Taxi Cab dataset. This includes every ride made in the city of New York since 2009.

On this website you can see the data for one random NYC yellow taxi on a single day.

On this post, you can see an analysis of this dataset. Postgres and R scripts are available on GitHub. There is also a dashboard available here that updates monthly with the latest taxi, Uber, and Lyft aggregate stats.

```python

from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1, memory_limit='1GB')

`nyc2014` is a dask.dataframe objects which present a subset of the Pandas API to the user, but farm out all of the work to the many Pandas dataframes they control across the network.

nyc2014 = dd.read_csv('/opt/datasets/nyc-data/2014/yellow*.csv',
parse_dates=['pickup_datetime', 'dropoff_datetime'],
skipinitialspace=True)
nyc2014 = c.persist(nyc2014)
progress(nyc2014)
```

Exercises#

  • Display head of the dataframe

  • Display number of rows of this dataframe.

  • Compute the total number of passengers.

  • Count occurrences in the payment_type column both for the full dataset, and filtered by zero tip (tip_amount == 0).

  • Create a new column, tip_fraction

  • Plot the average of the new column tip_fraction grouped by day of week.

  • Plot the average of the new column tip_fraction grouped by hour of day.

Dask dataframe documentation