Map and reduce

Last updated on 2023-01-11 | Edit this page

Estimated time: 90 minutes

Overview

Questions

  • What abstractions does Dask offer?
  • What programming patterns exist in the parallel universe?

Objectives

  • Recognize map, filter and reduce patterns
  • Create programs using these building blocks
  • Use the visualize method to create dependency graphs

In computer science bags refer to unordered collections of data. In Dask, a bag is a collection that is chunked internally. When you perform operations on a bag, these operations are automatically parallelized over the chunks inside the bag.

Dask bags let you compose functionality using several primitive patterns: the most important of these are map, filter, groupby, flatten, and reduction.

Discussion

Open the Dask documentation on bags. Discuss the map, filter, flatten and reduction methods

In this set of operations reduction is rather special. All other operations on bags could be written in terms of a reduction.

Operations on this level can be distinguished in several categories:

  • map (N to N) applies a function one-to-one on a list of arguments. This operation is embarrassingly parallel.
  • filter (N to <N) selects a subset from the data.
  • reduce (N to 1) computes an aggregate from a sequence of data; if the operation permits it (summing, maximizing, etc) this can be done in parallel by reducing chunks of data and then further processing the results of those chunks.
  • groupby (1 bag to N bags) groups data in subcategories.
  • flatten (N bags to 1 bag) combine many bags into one.

Let’s see an example of it in action:

First, let’s create the bag containing the elements we want to work with (in this case, the numbers from 0 to 5).

PYTHON

import dask.bag as db

bag = db.from_sequence(['mary', 'had', 'a', 'little', 'lamb'])

{: .source}

Map

To illustrate the concept of map, we’ll need a mapping function. In the example below we’ll just use a function that squares its argument:

PYTHON

# Create a function for mapping
def f(x):
    return x.upper()

# Create the map and compute it
bag.map(f).compute()

OUTPUT

out: ['MARY', 'HAD', 'A', 'LITTLE', 'LAMB']

We can also visualize the mapping:

PYTHON

# Visualize the map
bag.map(f).visualize()
boxes and arrows
A map operation.

Filter

To illustrate the concept of filter, it is useful to have a function that returns a boolean. In this case, we’ll use a function that returns True if the argument contains the letter ‘a’, and False if it doesn’t.

PYTHON

# Return True if x is even, False if not
def pred(x):
    return 'a' in x

bag.filter(pred).compute()

OUTPUT

[out]: ['mary', 'had', 'a', 'lamb']

Difference between filter and map

Without executing it, try to forecast what would be the output of bag.map(pred).compute().

The output will be [True, True, True, False, True].

Reduction

PYTHON

def count_chars(x):
    per_word = [len(w) for w in x]

    return sum(per_word)

bag.reduction(count_chars, sum).visualize()
boxes and arrows
A reduction.

Challenge: consider pluck

We previously discussed some generic operations on bags. In the documentation, lookup the pluck method. How would you implement this if pluck wasn’t there?

hint: Try pluck on some example data.

PYTHON

from dask import bags as db

data = [
   { "name": "John", "age": 42 },
   { "name": "Mary", "age": 35 },
   { "name": "Paul", "age": 78 },
   { "name": "Julia", "age": 10 }
]

bag = db.from_sequence(data)
...

The pluck method is a mapping. The input is supposed to be a bag of dictionaries.

PYTHON

from functools import partial
from operator import getitem
bag.map(partial(getitem, "name")).compute()

FIXME: find replacement for word counting example

Challenge: Dask version of Pi estimation

Use map and mean functions on Dask bags to compute \(\pi\).

PYTHON

import dask.bag
from numpy import repeat
import random

def calc_pi(N):
    """Computes the value of pi using N random samples."""
    M = 0
    for i in range(N):
        # take a sample
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x*x + y*y < 1.: M+=1
    return 4 * M / N

bag = dask.bag.from_sequence(repeat(10**7, 24))
shots = bag.map(calc_pi)
estimate = shots.mean()
estimate.compute()

Note

By default Dask runs a bag using multi-processing. This alleviates problems with the GIL, but also means a larger overhead.

Key Points

  • Use abstractions to keep programs manageable