Sunday, October 31, 2010

Simple Map Reduce Example in C#

This is an example of map-reduce implementation for IEnumerable. This code is not optimized. I think we shouldn’t create separate Task for each enumerated item. Probably partitioning the enumeration and dealing with bigger chunks of data would be much more efficient.

Browse source code: Google Code

MapReduce function for IEnumerable:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Collections;

namespace MapReduceCode
{
    public static class MapReduce
    {
        public static Task<TResult> ForEachParallel<TItem, TSubResult, TResult, TParam>(this IEnumerable items, Func<TItem, TParam, TSubResult> map, Func<TSubResult[], TResult> reduce, TParam param)
        {
            if (items == null) { throw new ArgumentNullException("items"); }
            if (map == null) { throw new ArgumentNullException("map"); }
            if (reduce == null) { throw new ArgumentNullException("reduce"); }

            return Task<TResult>.Factory.StartNew(() =>
            {
                List<Task<TSubResult>> tasks = new List<Task<TSubResult>>();

                foreach (TItem item in items)
                {
                    Task<TSubResult> t = Task<TSubResult>.Factory.StartNew(item2 =>
                        {
                            var mparam = (Tuple<TItem, TParam>)item2;
                            return map(mparam.Item1, mparam.Item2);
                        },
                        new Tuple<TItem, TParam>(item, param),
                        TaskCreationOptions.None | TaskCreationOptions.AttachedToParent);
                    tasks.Add(t);
                }

                List<TSubResult> results = new List<TSubResult>();
                foreach (Task<TSubResult> task in tasks)
                {
                    results.Add(task.Result);
                }

                return reduce(results.ToArray());
            });
        }
    }
}

Usage example:

using System;
using System.Collections;
using MapReduceCode;
using System.Threading;
using System.Threading.Tasks;

namespace MapReduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var a = Generate().ForEachParallel<int, int, int, int>(
                (element, param)=> 
                {
                    var result = element * param;
                    Console.WriteLine("Map: {0}, {1}", result, Task.CurrentId);
                    Thread.Sleep(new Random(DateTime.Now.Millisecond).Next(500));
                    return result;
                }, 
                (subresult)=>
                {
                    var sum = 0;
                    foreach (var item in subresult)
                    {
                        sum += item;
                    }
                    Console.WriteLine("Reduce: {0}", sum);
                    return sum;
                }, 
                5);

            Console.WriteLine(a.Result);
            
        }

        static IEnumerable Generate()
        {
            for (int i = 0; i < 100; i++)
            {
                yield return i;
            }
        }
    }
}

Result:

Map: 0, 1
Map: 5, 2
Map: 10, 3
Map: 20, 4
Map: 15, 5
Map: 25, 6
Map: 30, 7
Map: 35, 8
Map: 40, 9
Map: 45, 10
Map: 50, 11
Map: 55, 12
Map: 60, 13
Map: 65, 14
Map: 70, 15
Map: 75, 16
Map: 80, 17
Map: 85, 18
Map: 90, 19
Map: 95, 20
Map: 100, 21
Map: 105, 22
Map: 110, 23
Map: 115, 24
Map: 120, 25
Map: 125, 26
Map: 130, 27
Map: 135, 28
Map: 140, 29
Map: 145, 30
Map: 150, 31
Map: 155, 32
Map: 160, 33
Map: 165, 34
Map: 170, 35
Map: 175, 36
Map: 180, 37
Map: 185, 38
Map: 190, 39
Map: 195, 40
Map: 200, 41
Map: 205, 42
Map: 210, 43
Map: 215, 44
Map: 225, 45
Map: 230, 47
Map: 220, 46
Map: 235, 48
Map: 245, 50
Map: 240, 49
Map: 250, 51
Map: 255, 52
Map: 260, 53
Map: 265, 54
Map: 275, 56
Map: 270, 55
Map: 285, 57
Map: 280, 58
Map: 290, 59
Map: 295, 60
Map: 300, 62
Map: 305, 61
Map: 310, 63
Map: 315, 64
Map: 320, 65
Map: 325, 66
Map: 330, 67
Map: 335, 68
Map: 340, 69
Map: 355, 72
Map: 350, 71
Map: 360, 73
Map: 365, 74
Map: 345, 70
Map: 370, 75
Map: 380, 77
Map: 375, 76
Map: 390, 79
Map: 395, 80
Map: 385, 78
Map: 400, 81
Map: 405, 82
Map: 415, 84
Map: 425, 86
Map: 410, 83

Map: 420, 85
Map: 430, 87
Map: 440, 89
Map: 435, 88
Map: 450, 90
Map: 445, 91
Map: 455, 92
Map: 460, 93
Map: 470, 95
Map: 480, 97
Map: 475, 96
Map: 485, 98
Map: 465, 94
Map: 490, 99
Map: 495, 100
Reduce: 24750
24750

2 comments:

  1. how to run this on multi computers?

    ReplyDelete
  2. It's a simple solution for systems where you don't have to deal with computer boundaries. An example is a multicore system (SMP, see http://en.wikipedia.org/wiki/Symmetric_multiprocessing). For multiple computers, a much more sophisticated solution is needed because you have to sync the processing between the computers. That means network communication and you have to deal with memory coherence problems. That means clustering. Check ACM's Parallel computing tech pack for more information: http://techpack.acm.org/parallel/

    ReplyDelete