This is an example of map-reduce implementation for IEnumerable. This code is not optimized. I think we shouldn’t create separate Task for each enumerated item. Probably partitioning the enumeration and dealing with bigger chunks of data would be much more efficient.
Browse source code: Google Code
MapReduce function for IEnumerable:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Collections;
namespace MapReduceCode
{
public static class MapReduce
{
public static Task<TResult> ForEachParallel<TItem, TSubResult, TResult, TParam>(this IEnumerable items, Func<TItem, TParam, TSubResult> map, Func<TSubResult[], TResult> reduce, TParam param)
{
if (items == null) { throw new ArgumentNullException("items"); }
if (map == null) { throw new ArgumentNullException("map"); }
if (reduce == null) { throw new ArgumentNullException("reduce"); }
return Task<TResult>.Factory.StartNew(() =>
{
List<Task<TSubResult>> tasks = new List<Task<TSubResult>>();
foreach (TItem item in items)
{
Task<TSubResult> t = Task<TSubResult>.Factory.StartNew(item2 =>
{
var mparam = (Tuple<TItem, TParam>)item2;
return map(mparam.Item1, mparam.Item2);
},
new Tuple<TItem, TParam>(item, param),
TaskCreationOptions.None | TaskCreationOptions.AttachedToParent);
tasks.Add(t);
}
List<TSubResult> results = new List<TSubResult>();
foreach (Task<TSubResult> task in tasks)
{
results.Add(task.Result);
}
return reduce(results.ToArray());
});
}
}
}
Usage example:
using System;
using System.Collections;
using MapReduceCode;
using System.Threading;
using System.Threading.Tasks;
namespace MapReduce
{
class Program
{
static void Main(string[] args)
{
var a = Generate().ForEachParallel<int, int, int, int>(
(element, param)=>
{
var result = element * param;
Console.WriteLine("Map: {0}, {1}", result, Task.CurrentId);
Thread.Sleep(new Random(DateTime.Now.Millisecond).Next(500));
return result;
},
(subresult)=>
{
var sum = 0;
foreach (var item in subresult)
{
sum += item;
}
Console.WriteLine("Reduce: {0}", sum);
return sum;
},
5);
Console.WriteLine(a.Result);
}
static IEnumerable Generate()
{
for (int i = 0; i < 100; i++)
{
yield return i;
}
}
}
}
Result:
Map: 0, 1
Map: 5, 2
Map: 10, 3
Map: 20, 4
Map: 15, 5
Map: 25, 6
Map: 30, 7
Map: 35, 8
Map: 40, 9
Map: 45, 10
Map: 50, 11
Map: 55, 12
Map: 60, 13
Map: 65, 14
Map: 70, 15
Map: 75, 16
Map: 80, 17
Map: 85, 18
Map: 90, 19
Map: 95, 20
Map: 100, 21
Map: 105, 22
Map: 110, 23
Map: 115, 24
Map: 120, 25
Map: 125, 26
Map: 130, 27
Map: 135, 28
Map: 140, 29
Map: 145, 30
Map: 150, 31
Map: 155, 32
Map: 160, 33
Map: 165, 34
Map: 170, 35
Map: 175, 36
Map: 180, 37
Map: 185, 38
Map: 190, 39
Map: 195, 40
Map: 200, 41
Map: 205, 42
Map: 210, 43
Map: 215, 44
Map: 225, 45
Map: 230, 47
Map: 220, 46
Map: 235, 48
Map: 245, 50
Map: 240, 49
Map: 250, 51
Map: 255, 52
Map: 260, 53
Map: 265, 54
Map: 275, 56
Map: 270, 55
Map: 285, 57
Map: 280, 58
Map: 290, 59
Map: 295, 60
Map: 300, 62
Map: 305, 61
Map: 310, 63
Map: 315, 64
Map: 320, 65
Map: 325, 66
Map: 330, 67
Map: 335, 68
Map: 340, 69
Map: 355, 72
Map: 350, 71
Map: 360, 73
Map: 365, 74
Map: 345, 70
Map: 370, 75
Map: 380, 77
Map: 375, 76
Map: 390, 79
Map: 395, 80
Map: 385, 78
Map: 400, 81
Map: 405, 82
Map: 415, 84
Map: 425, 86
Map: 410, 83
Map: 420, 85
Map: 430, 87
Map: 440, 89
Map: 435, 88
Map: 450, 90
Map: 445, 91
Map: 455, 92
Map: 460, 93
Map: 470, 95
Map: 480, 97
Map: 475, 96
Map: 485, 98
Map: 465, 94
Map: 490, 99
Map: 495, 100
Reduce: 24750
24750