Sunday, October 31, 2010

Simple Map Reduce Example in C#

This is an example of map-reduce implementation for IEnumerable. This code is not optimized. I think we shouldn’t create separate Task for each enumerated item. Probably partitioning the enumeration and dealing with bigger chunks of data would be much more efficient.

Browse source code: Google Code

MapReduce function for IEnumerable:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Collections;

namespace MapReduceCode
{
    public static class MapReduce
    {
        public static Task<TResult> ForEachParallel<TItem, TSubResult, TResult, TParam>(this IEnumerable items, Func<TItem, TParam, TSubResult> map, Func<TSubResult[], TResult> reduce, TParam param)
        {
            if (items == null) { throw new ArgumentNullException("items"); }
            if (map == null) { throw new ArgumentNullException("map"); }
            if (reduce == null) { throw new ArgumentNullException("reduce"); }

            return Task<TResult>.Factory.StartNew(() =>
            {
                List<Task<TSubResult>> tasks = new List<Task<TSubResult>>();

                foreach (TItem item in items)
                {
                    Task<TSubResult> t = Task<TSubResult>.Factory.StartNew(item2 =>
                        {
                            var mparam = (Tuple<TItem, TParam>)item2;
                            return map(mparam.Item1, mparam.Item2);
                        },
                        new Tuple<TItem, TParam>(item, param),
                        TaskCreationOptions.None | TaskCreationOptions.AttachedToParent);
                    tasks.Add(t);
                }

                List<TSubResult> results = new List<TSubResult>();
                foreach (Task<TSubResult> task in tasks)
                {
                    results.Add(task.Result);
                }

                return reduce(results.ToArray());
            });
        }
    }
}

Usage example:

using System;
using System.Collections;
using MapReduceCode;
using System.Threading;
using System.Threading.Tasks;

namespace MapReduce
{
    class Program
    {
        static void Main(string[] args)
        {
            var a = Generate().ForEachParallel<int, int, int, int>(
                (element, param)=> 
                {
                    var result = element * param;
                    Console.WriteLine("Map: {0}, {1}", result, Task.CurrentId);
                    Thread.Sleep(new Random(DateTime.Now.Millisecond).Next(500));
                    return result;
                }, 
                (subresult)=>
                {
                    var sum = 0;
                    foreach (var item in subresult)
                    {
                        sum += item;
                    }
                    Console.WriteLine("Reduce: {0}", sum);
                    return sum;
                }, 
                5);

            Console.WriteLine(a.Result);
            
        }

        static IEnumerable Generate()
        {
            for (int i = 0; i < 100; i++)
            {
                yield return i;
            }
        }
    }
}

Result:

Map: 0, 1
Map: 5, 2
Map: 10, 3
Map: 20, 4
Map: 15, 5
Map: 25, 6
Map: 30, 7
Map: 35, 8
Map: 40, 9
Map: 45, 10
Map: 50, 11
Map: 55, 12
Map: 60, 13
Map: 65, 14
Map: 70, 15
Map: 75, 16
Map: 80, 17
Map: 85, 18
Map: 90, 19
Map: 95, 20
Map: 100, 21
Map: 105, 22
Map: 110, 23
Map: 115, 24
Map: 120, 25
Map: 125, 26
Map: 130, 27
Map: 135, 28
Map: 140, 29
Map: 145, 30
Map: 150, 31
Map: 155, 32
Map: 160, 33
Map: 165, 34
Map: 170, 35
Map: 175, 36
Map: 180, 37
Map: 185, 38
Map: 190, 39
Map: 195, 40
Map: 200, 41
Map: 205, 42
Map: 210, 43
Map: 215, 44
Map: 225, 45
Map: 230, 47
Map: 220, 46
Map: 235, 48
Map: 245, 50
Map: 240, 49
Map: 250, 51
Map: 255, 52
Map: 260, 53
Map: 265, 54
Map: 275, 56
Map: 270, 55
Map: 285, 57
Map: 280, 58
Map: 290, 59
Map: 295, 60
Map: 300, 62
Map: 305, 61
Map: 310, 63
Map: 315, 64
Map: 320, 65
Map: 325, 66
Map: 330, 67
Map: 335, 68
Map: 340, 69
Map: 355, 72
Map: 350, 71
Map: 360, 73
Map: 365, 74
Map: 345, 70
Map: 370, 75
Map: 380, 77
Map: 375, 76
Map: 390, 79
Map: 395, 80
Map: 385, 78
Map: 400, 81
Map: 405, 82
Map: 415, 84
Map: 425, 86
Map: 410, 83

Map: 420, 85
Map: 430, 87
Map: 440, 89
Map: 435, 88
Map: 450, 90
Map: 445, 91
Map: 455, 92
Map: 460, 93
Map: 470, 95
Map: 480, 97
Map: 475, 96
Map: 485, 98
Map: 465, 94
Map: 490, 99
Map: 495, 100
Reduce: 24750
24750