HomeData EngineeringData EducationIntroducing MapReduce in C from Scratch Using Threads

Introducing MapReduce in C from Scratch Using Threads

[ad_1]

Luciano Strika

Luciano Strika

Hadoop’s MapReduce is not just a Framework, it’s also a problem-solving philosophy.

Borrowing from functional programming, the MapReduce team realized a lot of different problems could be divided into two common operations: map, and reduce.

Both mapping and reducing steps can be done in parallel.

This meant as long as you could frame your problem in that specific way, there would be a solution to it that could easily be run in parallel. This will usually result in a big performance boost.

That all sounds good, and running things on parallel is usually a good thing, especially when working at scale. But, some of you on the back may be wondering, what are Map and Reduce?

What is MapReduce?

In order to understand the MapReduce framework, we need to understand its two basic operations: Map and Reduce.

They’re both high order functions: Meaning they are functions that can take other functions as their argument.

Specifically, when you need to convert a certain sequence of elements of type A into a result, or series of results of type B, you will:

  • Map all your inputs to a different domain: that means you will transform each of them with a chosen function, applying it to each element.
  • Group the mapped elements by some criterion, usually a grouping key.
  • Reduce the mapped elements on each group with some other function. This function needs to take two arguments and return a single one of the same type, successively running an operation between an accumulator and each value in our collection. It should be commutative and associative, as parallel execution won’t guarantee any order for the operations.

To make this clearer, let’s see an example.

Example of a MapReduce solution

Suppose you’re working for an e-commerce company, and they give you a log file of this form:

John Surname bought 2 apples 
Alice Challice bought 3 bananas
John Surname bought 5 pineapples

Then they ask you to tell them how many fruits each customer bought.

In this case, after parsing this file to turn it into an actual format, like CSV, you could easily go through each line, and add the number of bought fruits on a dictionary under each name.

You could even solve it with a bit of Bash scripting, or load the CSV on a Pandas DataFrame and get some statistics.

However, if the log file was a trillion lines long, bash scripting wouldn’t really cut it. Especially not if you’re not immortal.

You would need to run this in parallel. Let me propose a MapReduce-y way of doing it:

  • Map each line to a Pair of the form <Name, Quantity> by parsing each string.
  • Group by Name.
  • Reduce by summing the quantities.

If you’re familiar with SQL and relational databases, you may have thought of a similar solution. The query would look something like

select user, sum(bought_fruits)
from fruit_transactions group by user;

Why MapReduce scales

Notice how the mapper doesn’t need to see the whole file, just some of the lines. The reducer, on the other hand, only needs to have the lines that have the same Name (the ones that belong to the same group).

You could do this with many different threads on the same computer, and then just join the results.

Or, you could have many different processes running the map jobs, and feeding their output to another set running the reducing job.

If the log was big enough, you could even be running Mapper and Reducer processes on many different computers (say, on a cluster), and then joining their results on some lake in the end.

This kind of solution is very common in ETL jobs and other data-intensive applications, but I won’t delve any further into applications.

If you wish to learn more about this kind of scalable solutions, I recommend you check this O’Reilly book on designing applications at scale.

Programming MapReduce in C

Now that you have an understanding of what MapReduce is, and why MapReduce scales, let’s cut to the chase.

For this first article, we will program two different implementations of the Map function.

One of them will be single-threaded, to introduce a few concepts and show a simple solution. The other one will use the pthread library to make an actually multi-threaded, and much faster version of Map. Finally, we will compare the two and run some benchmarks.

As usual, all the code is available on this C GitHub project.

Single threaded implementation of Map in C

First of all, let’s remember what Map does.

The Map function receives a sequence and a function, and returns the result of applying that function to each element in the sequence.

Since this is C, representing a sequence can be very straight forward: we can just use a pointer to whatever type we’re mapping over!

However, there’s a catch. C is statically typed, and we would like our Map function to be as generic as possible. We want it to be able to map over a sequence of elements of any type (provided they all share a type. Let’s not get carried away here, boys).

How do we solve this? There are probably a few different solutions to this problem. I chose the one that looked like the most simple one, but feel free to pitch in with other ideas.

We will use sequences of void*, and cast everything to this type. This means every element will be represented as a pointer to a random memory address, without specifying a type (or size).

We will trust whatever function we are calling over all these sequence elements knows how to cast them to the right type before using them. We’re effectively delegating that problem away.

A smaller problem we need to solve is sequence length. A pointer to void doesn’t carry the information of how many elements the sequence has. It only knows where it starts, not where it ends.

We will solve this other problem by passing sequence length as a second argument. Knowing that, our Map function becomes pretty straightfoward.

// generic single-threaded map implementation using void* and pointer arithmetics.
void** map(void** things, void* (*f)(void*), int length){

    void** results = malloc(sizeof(void*)*length);
    
    for(int i = 0; i < length; i++){
        void* thing = things[i];
        void* result = (*f)(thing);
        results[i] = result;
    }
    
    return results;
}

 

You see, the function receives a void** to represent the sequence it will map over, and a void* (*f)(void*) function that transforms elements of some generic type to another (or the same) one.

After that, we can use our Map function on any sequence. We only need to do some awkward wrapping and pointer arithmetic beforehand.

Here’s an example, using a function that returns 1 for prime numbers and 0 for the others.

// returns 1 if the number is a prime, 0 otherwise
void* naivePrime(void* number){
    int n = *((int*) number);
    int* res = malloc(sizeof(int));
    *res = 1;
    for(int i = 2; i < n; i++){
        if(n%i==0){ 
            *res=0;
            return res;
            }
    }
    return res;
}

int main(int argc, char** argv){ 
  int N = 1000;
  
  int** numbers = malloc(sizeof(int*)*N);
  for(int i = 0 ; i < N ; i++){
      int* n = malloc(sizeof(int));
      *n = i;
      numbers[i] = n;
  }

  int** resulting_numbers = NULL;
  void** is_prime = map((void**) numbers, naivePrime, N);
  resulting_numbers = (int**) is_prime;
}

As expected, the resulting pointer points to a sequence of integers: 1 corresponds to prime numbers, 0 to composite ones.

Now we’ve gone through the single-threaded Map function, let’s see how to make this run on parallel in C.

Multi-threaded Map function in C

(If you want to add a benchmark using processes and forking, feel free to make a pull request!)

In order to use parallel execution in C, we can either turn to processes, or threads.

For this project, we will be using threads, as they’re more lightweight and, in my opinion, their API is a bit more intuitive for this kind of tutorial.

How to use threads in C

Threads’ API in C is quite intuitive, if only a bit obscure at first.

  • A pointer to a pthread_t: the actual thread.
  • A configuration struct. In this case, we will use NULL for default config.
  • The function we want the thread to run. Unlike a process, a thread will only run a function until it returns, rather than continuing the execution of arbitrary code. This function must take a single void* argument and return another void* value.
  • The input of the aforementioned function. It must be cast to void*.

To use them, we will have to #include <pthread.h>Pthreads‘ man page explains their interface quite nicely. However, for this tutorial, all we will use is the pthread_create function.

pthread_create takes four arguments:

After calling on pthread_create, a parallel thread of execution will begin running the given function.

Once we call pthread_create for each of the chunks we wish to map, we will have to call pthread_join on each of them, which makes the parent (original) thread wait until all the threads it spun finish running.

Otherwise, the program would end before the mapping was done.

Now, let’s feast our eyes on some code.

Using pthread for Parallel MapReduce in C

To code MapReduce’s Map function in C, the first thing we are going to do is define a struct that can store the generic inputs and outputs for it, as well as the function we will be mapping with.

struct map_argument {
    void** things;
    void** results;
    void* (*f)(void*);
    int from;
    int to;
    };

Since parallel execution requires some manner of slicing and partitioning, we will store that logic inside this structure as well, using two different indices for the start and end of our slice.

Next, we will code the function that actually does the mapping: it will cycle the inputs from start to end, storing the result of applying the mapped function to each input in the outputs’ pointer.

void* chunk_map(void* argument){
    struct map_argument* arg = (struct map_argument*) argument;
    for(int i = arg->from; i < arg->to; i++){
        arg->results[i] = (*(arg->f))(arg->things[i]);
    }
    return NULL;
}

Finally the star of the show, the function that starts the threads, assigns a map_argument to each of them, and waits for all the map jobs to run, finally returning the results.

void** concurrent_map(void** things, void* (*f)(void*), int length,
                      int nthreads){

    void** results = malloc(sizeof(void*)*length);
    struct map_argument arguments[nthreads];
    pthread_t threads[nthreads];
    int chunk_size = length/nthreads;

    for(int j = 0 ; j < nthreads; j++){
        int from = j*chunk_size;
        struct map_argument* argument = &arguments[j];        
        init_map_argument(argument, things, results, f, from, from+chunk_size);
        pthread_create(&threads[j], NULL, chunk_map, (void*) argument);
    }


    for(int i = 0; i < length % nthreads; i++){
        int idx = chunk_size*nthreads + i;
        results[idx] = (*f)(things[idx]);
    }

    for(int k = 0; k < nthreads; k++){
        pthread_join(threads[k], NULL);
    }

    return results;
}

Notice how this function allows us to choose how many threads we want, and partitions the data accordingly. It also handles pthreads ‘ creation and joining.

Finally, the way we would call this function in main looks something like this:

concurrent_map( (void**) numbers, twice, N, NTHREADS)

Where NTHREADS is the number of threads we want, and N is how many elements numbers has.

Now the code is done, let’s run some benchmarks! Is this really going to be faster? Will all this wrapper code make things a lot slower? Let’s find out!

Map in C, Benchmarks: Single-threaded vs Multi-threaded

In order to measure performance improvements from using parallel Map, I tested some single-threaded algorithms against their multi-threaded counterparts.

First benchmark: slow_twice

For my first test, I used the slow_twice function, which simply multiplies each number by 2.

You may be wondering, ‘why is it called slow?’. The answer is simple: we will double each number 1000 times.

This makes the operation slower, so we can measure time differences without having to use so many numbers that initialization takes too long. It also lets us benchmark the case of many memory writes.

Since execution time for each number is constant, the non-parallel algorithm’s time grows pretty much linearly on input size.

I then ran it with 2, 4 and 8 threads. My laptop has 4 cores, and I found that to be the optimum number of threads to use as well. For some other algorithms, I’ve found using a multiple of my quantity of cores to be optimum, but this hasn’t been the case.

Benchmark Results

I ran each benchmark 10 times and took the average, just in case.

Here are the results:

Introducing MapReduce in C from Scratch Using Threads 1

For both test cases, using 4 threads was about three times faster than the single-threaded implementation. This proves using Parallel Map is a lot faster than using a common single-threaded version.

There was also a cost to adding more than 4 threads, probably due to the overhead of initialization and context switching.

Second benchmark: is_prime

For this benchmark I coded a naive prime testing function: it simply iterates through all the numbers smaller than the input, and returns 1 if any divides it, 0 otherwise.

Notice how this function takes O(n) instead of O(1) for each element, so a few partitions of our data (which is ordered) will be a lot slower than the others. I wonder how this will affect running times?

Introducing MapReduce in C from Scratch Using Threads 2
Introducing MapReduce in C from Scratch Using Threads 4

In this case, again the parallel algorithm beats the single-threaded one. No big surprises there. However, this time there’s an improvement when using over 4 threads!

I think this is because when partitioning our inputs, dividing it into smaller chunks makes the slowest partition take less time, thus making our bottleneck smaller.

Conclusion

I had a lot of fun running this experiment.

Picking how many threads to use turns out to be a lot harder than just “use the same amount as cores”, and depends a lot on our input even for very dumb algorithms.

This may help us understand why optimizing a cluster’s configuration can be such a daunting task for a big application.

In the future, I may add a parallel reduce implementation to complete this little framework.

A few other benchmarks that might’ve been fun and I may run in the future are Map in C vs Python List Comprehensions, and C vs SIMD-Assembly.

If you want to level up as a Data scientist, check out my best Machine Learning books list and my Bash tutorial.

Remember you can use this code any way you like, or run your own experiments, and if you do please don’t forget to let me know your results in the comments!

[ad_2]

This article has been published from the source link without modifications to the text. Only the headline has been changed.

Source link

 

Most Popular