This is an example of map-reduce implementation for IEnumerable. This code is not optimized. I think we shouldn’t create separate Task for each enumerated item. Probably partitioning the enumeration and dealing with bigger chunks of data would be much more efficient.
Browse source code: Google Code
MapReduce function for IEnumerable:
using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Collections; namespace MapReduceCode { public static class MapReduce { public static Task<TResult> ForEachParallel<TItem, TSubResult, TResult, TParam>(this IEnumerable items, Func<TItem, TParam, TSubResult> map, Func<TSubResult[], TResult> reduce, TParam param) { if (items == null) { throw new ArgumentNullException("items"); } if (map == null) { throw new ArgumentNullException("map"); } if (reduce == null) { throw new ArgumentNullException("reduce"); } return Task<TResult>.Factory.StartNew(() => { List<Task<TSubResult>> tasks = new List<Task<TSubResult>>(); foreach (TItem item in items) { Task<TSubResult> t = Task<TSubResult>.Factory.StartNew(item2 => { var mparam = (Tuple<TItem, TParam>)item2; return map(mparam.Item1, mparam.Item2); }, new Tuple<TItem, TParam>(item, param), TaskCreationOptions.None | TaskCreationOptions.AttachedToParent); tasks.Add(t); } List<TSubResult> results = new List<TSubResult>(); foreach (Task<TSubResult> task in tasks) { results.Add(task.Result); } return reduce(results.ToArray()); }); } } }
Usage example:
using System; using System.Collections; using MapReduceCode; using System.Threading; using System.Threading.Tasks; namespace MapReduce { class Program { static void Main(string[] args) { var a = Generate().ForEachParallel<int, int, int, int>( (element, param)=> { var result = element * param; Console.WriteLine("Map: {0}, {1}", result, Task.CurrentId); Thread.Sleep(new Random(DateTime.Now.Millisecond).Next(500)); return result; }, (subresult)=> { var sum = 0; foreach (var item in subresult) { sum += item; } Console.WriteLine("Reduce: {0}", sum); return sum; }, 5); Console.WriteLine(a.Result); } static IEnumerable Generate() { for (int i = 0; i < 100; i++) { yield return i; } } } }
Result:
Map: 0, 1
Map: 5, 2
Map: 10, 3
Map: 20, 4
Map: 15, 5
Map: 25, 6
Map: 30, 7
Map: 35, 8
Map: 40, 9
Map: 45, 10
Map: 50, 11
Map: 55, 12
Map: 60, 13
Map: 65, 14
Map: 70, 15
Map: 75, 16
Map: 80, 17
Map: 85, 18
Map: 90, 19
Map: 95, 20
Map: 100, 21
Map: 105, 22
Map: 110, 23
Map: 115, 24
Map: 120, 25
Map: 125, 26
Map: 130, 27
Map: 135, 28
Map: 140, 29
Map: 145, 30
Map: 150, 31
Map: 155, 32
Map: 160, 33
Map: 165, 34
Map: 170, 35
Map: 175, 36
Map: 180, 37
Map: 185, 38
Map: 190, 39
Map: 195, 40
Map: 200, 41
Map: 205, 42
Map: 210, 43
Map: 215, 44
Map: 225, 45
Map: 230, 47
Map: 220, 46
Map: 235, 48
Map: 245, 50
Map: 240, 49
Map: 250, 51
Map: 255, 52
Map: 260, 53
Map: 265, 54
Map: 275, 56
Map: 270, 55
Map: 285, 57
Map: 280, 58
Map: 290, 59
Map: 295, 60
Map: 300, 62
Map: 305, 61
Map: 310, 63
Map: 315, 64
Map: 320, 65
Map: 325, 66
Map: 330, 67
Map: 335, 68
Map: 340, 69
Map: 355, 72
Map: 350, 71
Map: 360, 73
Map: 365, 74
Map: 345, 70
Map: 370, 75
Map: 380, 77
Map: 375, 76
Map: 390, 79
Map: 395, 80
Map: 385, 78
Map: 400, 81
Map: 405, 82
Map: 415, 84
Map: 425, 86
Map: 410, 83
Map: 420, 85
Map: 430, 87
Map: 440, 89
Map: 435, 88
Map: 450, 90
Map: 445, 91
Map: 455, 92
Map: 460, 93
Map: 470, 95
Map: 480, 97
Map: 475, 96
Map: 485, 98
Map: 465, 94
Map: 490, 99
Map: 495, 100
Reduce: 24750
24750