絶賛翻訳者募集中
ページ内コンテンツ

概要 anchor.png Edit

std.parallelism implements high-level primitives for SMP parallelism. These include parallel foreach, parallel reduce, parallel eager map, pipelining and future/promise parallelism. std.parallelism is recommended when the same operation is to be executed in parallel on different data, or when a function is to be executed in a background thread and its result returned to a well-defined main thread. For communication between arbitrary threads, see std.concurrency.

std.parallelism は、SMP(対称型マルチプロセッシング)並列化のための高水準プリミティブを実装しています。 これにはparallel foreach、parallel map、parallel reduce、future/promiseを含みます。 同じ処理が異なるデータで並列に実行される場合に、std.parallelismは推奨されます。あるいは、関数がバックグラウンドで実行され、スレッドの処理結果が特定のメインスレッドへと戻る場合に有効です。 任意のスレッド間で通信を行う場合は std.concurrency がより適切です。

std.parallelism is based on the concept of a Task. A Task is an object that represents the fundamental unit of work in this library and may be executed in parallel with any other Task. Using Task directly allows programming with a future/promise paradigm. All other supported parallelism paradigms (parallel foreach, map, reduce, pipelining) represent an additional level of abstraction over Task. They automatically create one or more Task objects, or closely related types that are conceptually identical but not part of the public API.

std.parallelism はTaskのコンセプトに基づいています。このライブラリでのTaskは、処理単位を象徴するオブジェクトのことで、他の様々なタスクと並列実行することのできるものです。 直接Taskを使用すればfuture/promiseパラダイムを利用することができます。 その他サポートされている類似パラダイム(parallel foreach, map, reduce, pipelining)は、Taskの上位概念となっています。 これらは、自動的に一つ以上のTaskオブジェクトを生成するか、各パブリックAPIと深く関わりのある型となっています。

After creation, a Task may be executed in a new thread, or submitted to a TaskPool for execution. A TaskPool encapsulates a task queue and its worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that is associated with exactly one task queue. It executes the Task at the front of its queue when the queue has work available, or sleeps when no work is available. Each task queue is associated with zero or more worker threads. If the result of a Task is needed before execution by a worker thread has begun, the Task can be removed from the task queue and executed immediately in the thread where the result is needed.

Taskは生成後、新しいスレッドで動作するか、あるいはTaskPoolを用いて実行されます。 TaskPoolはタスク列とそのワーカースレッドをカプセル化したものです。 その目的は多数のタスクをより少ないスレッドに関連付けることです。 タスク列は、実行待ちのTaskオブジェクトをFIFOで追加していく待ち行列です。 ワーカースレッドは、タスク列と1:1で関連付けられたスレッドです。ワーカスレッドはタスク列に実行可能なタスクがある場合、先頭のタスクから順番に実行していきます。実行可能なタスクがない場合はスリープ状態になります。 タスク列は0個以上のワーカースレッドと関連付けられています。 ワーカースレッドによる実行が始まる前にTaskの結果が必要であれば、Taskはタスク列から取り除かれ、結果が必要なスレッドで実行されます。

Page Top

Warning anchor.png Edit

Unless marked as @trusted or @safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.

Page Top

使い方のイメージ anchor.png Edit

Everything is expanded.Everything is shortened.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 
-
-
|
|
|
|
|
|
|
|
!
|
|
-
|
|
!
|
|
|
|
!
import std.algorithm, std.parallelism, std.range;
void main() {
    // Parallel reduce can be combined with std.algorithm.map to interesting
    // effect. The following example (thanks to Russel Winder) calculates
    // pi by quadrature using std.algorithm.map and TaskPool.reduce.
    // getTerm is evaluated in parallel as needed by TaskPool.reduce.
    //
    // Timings on an Athlon 64 X2 dual core machine:
    //
    // TaskPool.reduce: 12.170 s
    // std.algorithm.reduce: 24.065 s
    immutable n = 1_000_000_000;
    immutable delta = 1.0 / n;
    
    real getTerm(int i) {
        immutable x = ( i - 0.5 ) * delta;
        return delta / ( 1.0 + x * x ) ;
    }
    
    immutable pi = 4.0 * taskPool.reduce!"a + b"(
        std.algorithm.map!getTerm(iota(n))
    );
}
Page Top

作者 anchor.png Edit

David Simcha

Page Top

ライセンス anchor.png Edit

Boost License 1.0

Page Top

struct Task(alias fun,Args...); anchor.png Edit

Task represents the fundamental unit of work. A Task may be executed in parallel with any other Task. Using this struct directly allows future/promise parallelism. In this paradigm, a function (or delegate or other callable) is executed in a thread other than the one it was called from. The calling thread does not block while the function is being executed. A call to workForce, yieldForce, or spinForce is used to ensure that the Task has finished executing and to obtain the return value, if any. These functions and done also act as full memory barriers, meaning that any memory writes made in the thread that executed the Task are guaranteed to be visible in the calling thread after one of these functions returns.

The std.parallelism.task and std.parallelism.scopedTask functions can be used to create an instance of this struct. See task for usage examples.

Function results are returned from yieldForce, spinForce and workForce by ref. If fun returns by ref, the reference will point to the returned reference of fun. Otherwise it will point to a field in this struct.

Copying of this struct is disabled, since it would provide no useful semantics. If you want to pass this struct around, you should do so by reference or pointer.

  • BUGS:
    Changes to ref and out arguments are not propagated to the call site, only to args in this struct.
Page Top

alias args; anchor.png Edit

The arguments the function was called with. Changes to out and ref arguments will be visible here.

Page Top

alias ReturnType; anchor.png Edit

The return type of the function called by this Task. This can be void.

Page Top

@trusted ReturnType spinForce(); anchor.png Edit

If the Task isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value. If it threw an exception, rethrow that exception.

This function should be used when you expect the result of the Task to be available on a timescale shorter than that of an OS context switch.

Page Top

@trusted ReturnType yieldForce(); anchor.png Edit

If the Task isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable. If it threw an exception, rethrow that exception.

This function should be used for expensive functions, as waiting on a condition variable introduces latency, but avoids wasted CPU cycles.

Page Top

@trusted ReturnType workForce(); anchor.png Edit

If this Task was not started yet, execute it in the current thread. If it is finished, return its result. If it is in progress, execute any other Task from the TaskPool instance that this Task was submitted to until this one is finished. If it threw an exception, rethrow that exception. If no other tasks are available or this Task was executed using executeInNewThread, wait on a condition variable.

タスクがまだスタートしていないのであれば、現在のスレッドで実行する。終わったら、結果が返値として返る。その処理が終わるまで、TaskPoolのインスタンスから追加されたほかのタスクも実行します。Taskの処理中に例外が投げられた場合は、この関数もその例外を投げます。他のTaskが無効な場合やこのタスクがexecuteInNewThreadで実行されていた場合、状態が変わるまで待ちます。

Page Top

@trusted bool done(); anchor.png Edit

Returns true if the Task is finished executing.

  • Throws:
    Rethrows any exception thrown during the execution of the Task.
Page Top

@trusted void executeInNewThread();
@trusted void executeInNewThread(int priority); anchor.png Edit

Create a new thread for executing this Task, execute it in the newly created thread, then terminate the thread. This can be used for future/promise parallelism. An explicit priority may be given to the Task. If one is provided, its value is forwarded to core.thread.Thread.priority. See std.parallelism.task for usage example.

Page Top

auto task(alias fun, Args...)(Args args); anchor.png Edit

Creates a Task on the GC heap that calls an alias. This may be executed via Task.executeInNewThread or by submitting to a std.parallelism.TaskPool. A globally accessible instance of TaskPool is provided by std.parallelism.taskPool.

  • Returns:
    A pointer to the Task.
  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    -
    !
     
    -
    -
    !
    |
    |
    -
    !
    |
    -
    !
    !
    
    // Read two files into memory at the same time.
    import std.file;
     
    void main() {
        // Create and execute a Task for reading foo.txt.
        auto file1Task = task!read("foo.txt");
        file1Task.executeInNewThread();
        
        // Read bar.txt in parallel.
        auto file2Data = read("bar.txt");
        
        // Get the results of reading foo.txt.
        auto file1Data = file1Task.yieldForce();
    }
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    -
    |
    |
    |
    |
    |
    |
    |
    |
    -
    -
    -
    |
    |
    !
    |
    -
    !
    |
    |
    |
    |
    |
    |
    |
    |
    |
    -
    !
    |
    |
    |
    !
    
    // Sorts an array using a parallel quick sort algorithm.  The first partition
    // is done serially.  Both recursion branches are then executed in
    // parallel.
    //
    // Timings for sorting an array of 1,000,000 doubles on an Athlon 64 X2
    // dual core machine:
    //
    // This implementation:               176 milliseconds.
    // Equivalent serial implementation:  280 milliseconds
    void parallelSort(T)(T[] data) {
        // Sort small subarrays serially.
        if(data.length < 100) {
             std.algorithm.sort(data);
             return;
        }
     
        // Partition the array.
        swap(data[$ / 2], data[$ - 1]);
        auto pivot = data[$ - 1];
        bool lessThanPivot(T elem) { return elem < pivot; }
     
        auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
        swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
     
        auto less = data[0..$ - greaterEqual.length - 1];
        greaterEqual = data[$ - greaterEqual.length..$];
     
        // Execute both recursion branches in parallel.
        auto recurseTask = task!(parallelSort)(greaterEqual);
        taskPool.put(recurseTask);
        parallelSort(less);
        recurseTask.yieldForce();
    }
  • Notes:
    This function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, see scopedTask, which takes a scope delegate.
Page Top

@trusted auto task(F, Args...)(F fun, Args args); anchor.png Edit

Version of task usable from @safe code. Usage mechanics are identical to the non-@safe case, but safety introduces the some restrictions.

  1. fun must be @safe or @trusted.
  2. F must not have any unshared aliasing as defined by std.traits.hasUnsharedAliasing. This means it may not be an unshared delegate or a non-shared class or struct with overloaded opCall. This also precludes accepting template alias parameters.
  3. Args must not have unshared aliasing.
  4. fun must not return by reference.
  5. The return type must not have unshared aliasing unless fun is pure or the Task is executed via executeInNewThread instead of using a TaskPool.
Page Top

auto scopedTask(alias fun, Args...)(Args args);
auto scopedTask(F, Args...)(scope F delegateOrFp, Args args);
@trusted auto scopedTask(F, Args...)(F fun, Args args); anchor.png Edit

These functions allow the creation of Task objects on the stack rather than the GC heap. The lifetime of a Task created by scopedTask cannot exceed the lifetime of the scope it was created in.

scopedTask might be preferred over task:

  1. When a Task that calls a delegate is being created and a closure cannot be allocated due to objects on the stack that have scoped destruction. The delegate overload of scopedTask takes a scope delegate.
  2. As a micro-optimization, to avoid the heap allocation associated with task or with the creation of a closure.

Usage is otherwise identical to task.

  • Notes:
    Task objects created using scopedTask will automatically call Task.yieldForce in their destructor if necessary to ensure the Task is complete before the stack frame they reside on is destroyed.
Page Top

immutable uint totalCPUs; anchor.png Edit

The total number of CPU cores available on the current machine, as reported by the operating system.

Page Top

class TaskPool; anchor.png Edit

This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.

This class should usually be used via the global instantiation available via the std.parallelism.taskPool property. Occasionally it is useful to explicitly instantiate a TaskPool:

  1. When you want TaskPool instances with multiple priorities, for example a low priority pool and a high priority pool.
  2. When the threads in the global task pool are waiting on a synchronization primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.
Page Top

@trusted this(); anchor.png Edit

Default constructor that initializes a TaskPool with totalCPUs - 1 worker threads. The minus 1 is included because the main thread will also be available to do work.

  • Note:
    On single-core machines, the primitives provided by TaskPool operate transparently in single-threaded mode.
Page Top

@trusted this(uint nWorkers); anchor.png Edit

Allows for custom number of worker threads.

Page Top

ParallelForeach!(R) parallel(R)(R range, size_t workUnitSize);
ParallelForeach!(R) parallel(R)(R range); anchor.png Edit

Implements a parallel foreach loop over a range. This works by implicitly creating and submitting one Task to the TaskPool for each worker thread. A work unit is a set of consecutive elements of range to be processed by a worker thread between communication with any other thread. The number of elements processed per work unit is controlled by the workUnitSize parameter. Smaller work units provide better load balancing, but larger work units avoid the overhead of communicating with other threads frequently to fetch the next work unit. Large work units also avoid false sharing in cases where the range is being modified. The less time a single iteration of the loop takes, the larger workUnitSize should be. For very expensive loop bodies, workUnitSize should be 1. An overload that chooses a default work unit size is also available.

  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    -
    !
     
    -
    |
    !
    -
    -
    |
    !
     
    -
    |
    |
    |
    |
    |
    -
    |
    !
    
    // Find the logarithm of every number from 1 to 10_000_000 in parallel.
    auto logs = new double[10_000_000];
     
    // Parallel foreach works with or without an index variable.  It can be
    // iterate by ref if range.front returns by ref.
     
    // Iterate over logs using work units of size 100.
    foreach(i, ref elem; taskPool.parallel(logs, 100)) {
        elem = log(i + 1.0);
    }
     
    // Same thing, but use the default work unit size.
    //
    // Timings on an Athlon 64 X2 dual core machine:
    //
    // Parallel foreach:  388 milliseconds
    // Regular foreach:   619 milliseconds
    foreach(i, ref elem; taskPool.parallel(logs)) {
        elem = log(i + 1.0);
    }
  • Notes:
    The memory usage of this implementation is guaranteed to be constant in range.length.

Breaking from a parallel foreach loop via a break, labeled break, labeled continue, return or goto statement throws a ParallelForeachError.

In the case of non-random access ranges, parallel foreach buffers lazily to an array of size workUnitSize before executing the parallel portion of the loop. The exception is that, if a parallel foreach is executed over a range returned by asyncBuf or map, the copying is elided and the buffers are simply swapped. In this case workUnitSize is ignored and the work unit size is set to the buffer size of range.

A memory barrier is guaranteed to be executed on exit from the loop, so that results produced by all threads are visible in the calling thread.

  • Exception Handling:
    When at least one exception is thrown from inside a parallel foreach loop, the submission of additional Task objects is terminated as soon as possible, in a non-deterministic manner. All executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown by any work unit are chained using Throwable.next and rethrown. The order of the exception chaining is non-deterministic.
Page Top

template amap(functions...) anchor.png Edit

Eager parallel map. The eagerness of this function means it has less overhead than the lazily evaluated TaskPool.map and should be preferred where the memory requirements of eagerness are acceptable. functions are the functions to be evaluated, passed as template alias parameters in a style similar to std.algorithm.map. The first argument must be a random access range.

Everything is expanded.Everything is shortened.
1
2
3
4
5
6
7
8
9
 
 
-
|
|
|
|
|
!
    auto numbers = iota(100_000_000);
    
    // Find the square roots of numbers.
    //
    // Timings on an Athlon 64 X2 dual core machine:
    //
    // Parallel eager map:                   0.802 s
    // Equivalent serial implementation:     1.768 s
    auto squareRoots = taskPool.amap!sqrt(numbers);

Immediately after the range argument, an optional work unit size argument may be provided. Work units as used by amap are identical to those defined for parallel foreach. If no work unit size is provided, the default work unit size is used.

Everything is expanded.Everything is shortened.
1
2
-
!
    // Same thing, but make work unit size 100.
    auto squareRoots = taskPool.amap!sqrt(numbers, 100);

An output range for returning the results may be provided as the last argument. If one is not provided, an array of the proper type will be allocated on the garbage collected heap. If one is provided, it must be a random access range with assignable elements, must have reference semantics with respect to assignment to its elements, and must have the same length as the input range. Writing to adjacent elements from different threads must be safe.

Everything is expanded.Everything is shortened.
1
2
3
4
5
6
7
8
9
-
|
|
!
 
 
-
!
 
    // Same thing, but explicitly allocate an array to return the results in.
    // The element type of the array may be either the exact type returned by
    // functions or an implicit conversion target.
    auto squareRoots = new float[numbers.length];
    taskPool.amap!sqrt(numbers, squareRoots);
    
    // Multiple functions, explicit output range, and explicit work unit size.
    auto results = new Tuple!(float, real)[numbers.length];
    taskPool.amap!(sqrt, log)(numbers, 100, results);
  • Note:
    A memory barrier is guaranteed to be executed after all results are written but before returning so that results produced by all threads are visible in the calling thread.
  • Tips:
    To perform the mapping operation in place, provide the same range for the input and output range.

To parallelize the copying of a range with expensive to evaluate elements to an array, pass an identity function (a function that just returns whatever argument is provided to it) to amap.

  • Exception Handling:
    When at least one exception is thrown from inside the map functions, the submission of additional Task objects is terminated as soon as possible, in a non-deterministic manner. All currently executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown from any work unit are chained using Throwable.next and rethrown. The order of the exception chaining is non-deterministic.
Page Top

auto amap(Args...)(Args args); anchor.png Edit

Page Top

template map(functions...) anchor.png Edit

A semi-lazy parallel map that can be used for pipelining. The map functions are evaluated for the first bufSize elements and stored in a buffer and made available to popFront. Meanwhile, in the background a second buffer of the same size is filled. When the first buffer is exhausted, it is swapped with the second buffer and filled while the values from what was originally the second buffer are read. This implementation allows for elements to be written to the buffer without the need for atomic operations or synchronization for each write, and enables the mapping function to be evaluated efficiently in parallel.

map has more overhead than the simpler procedure used by amap but avoids the need to keep all results in memory simultaneously and works with non-random access ranges.

  • Parameters:
    source
    The input range to be mapped. If source is not random access it will be lazily buffered to an array of size bufSize before the map function is evaluated. (For an exception to this rule, see Notes.)
    bufSize
    The size of the buffer to store the evaluated elements.
    workUnitSize
    The number of elements to evaluate in a single Task. Must be less than or equal to bufSize, and should be a fraction of bufSize such that all worker threads can be used. If the default of size_t.max is used, workUnitSize will be set to the pool-wide default.
  • Returns:
    An input range representing the results of the map. This range has a length iff source has a length.
  • Notes:
    If a range returned by map or asyncBuf is used as an input to map, then as an optimization the copying from the output buffer of the first range to the input buffer of the second range is elided, even though the ranges returned by map and asyncBuf are non-random access ranges. This means that the bufSize parameter passed to the current call to map will be ignored and the size of the buffer will be the buffer size of source.
  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    -
    |
    |
    !
     
     
     
     
     
     
    -
    |
    !
    
        // Pipeline reading a file, converting each line to a number, taking the
        // logarithms of the numbers, and performing the additions necessary to
        // find the sum of the logarithms.
     
        auto lineRange = File("numberList.txt").byLine();
        auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
        auto nums = taskPool.map!(to!double)(dupedLines);
        auto logs = taskPool.map!log10(nums);
     
        double sum = 0;
        foreach(elem; logs) {
            sum += elem;
        }
  • Exception Handling:
    Any exceptions thrown while iterating over source or computing the map function are re-thrown on a call to popFront or, if thrown during construction, are simply allowed to propagate to the caller. In the case of exceptions thrown while computing the map function, the exceptions are chained as in TaskPool.amap.
Page Top

auto map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max); anchor.png Edit

Page Top

auto asyncBuf(S)(S source, size_t bufSize = 100); anchor.png Edit

Given a source range that is expensive to iterate over, returns an input range that asynchronously buffers the contents of source into a buffer of bufSize elements in a worker thread, while making prevously buffered elements from a second buffer, also of size bufSize, available via the range interface of the returned object. The returned range has a length iff hasLength!(S). asyncBuf is useful, for example, when performing expensive operations on the elements of ranges that represent data on a disk or network.

  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
     
     
    -
    -
    |
    |
    !
    |
    |
    -
    |
    !
    |
    |
    -
    |
    |
    !
    !
    
    import std.conv, std.stdio;
     
    void main() {
        // Fetch lines of a file in a background thread while processing
        // prevously fetched lines, dealing with byLine's buffer recycling by
        // eagerly duplicating every line.
        auto lines = File("foo.txt").byLine();
        auto duped = std.algorithm.map!"a.idup"(lines);
        
        // Fetch more lines in the background while we process the lines already
        // read into memory into a matrix of doubles.
        double[][] matrix;
        auto asyncReader = taskPool.asyncBuf(duped);
        
        foreach(line; asyncReader) {
            auto ls = line.split("\t");
            matrix ~= to!(double[])(ls);
        }
    }
  • Exception Handling:
    Any exceptions thrown while iterating over source are re-thrown on a call to popFront or, if thrown during construction, simply allowed to propagate to the caller.
Page Top

auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100); anchor.png Edit

Given a callable object next that writes to a user-provided buffer and a second callable object empty that determines whether more data is available to write via next, returns an input range that asynchronously calls next with a set of size nBuffers of buffers and makes the results available in the order they were obtained via the input range interface of the returned object. Similarly to the input range overload of asyncBuf, the first half of the buffers are made available via the range interface while the second half are filled and vice-versa.

  • Parameters:
    next
    A callable object that takes a single argument that must be an array with mutable elements. When called, next writes data to the array provided by the caller.
    empty
    A callable object that takes no arguments and returns a type implicitly convertible to bool. This is used to signify that no more data is available to be obtained by calling next.
    initialBufSize
    The initial size of each buffer. If next takes its array by reference, it may resize the buffers.
    nBuffers
    The number of buffers to cycle through when calling next.
  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    -
    |
    !
     
    -
    |
    !
     
    -
    |
    !
     
     
    -
    |
    |
    !
    
        // Fetch lines of a file in a background thread while processing prevously
        // fetched lines, without duplicating any lines.
        auto file = File("foo.txt");
     
        void next(ref char[] buf) {
            file.readln(buf);
        }
     
        // Fetch more lines in the background while we process the lines already
        // read into memory into a matrix of doubles.
        double[][] matrix;
        auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
     
        foreach(line; asyncReader) {
            auto ls = line.split("\t");
            matrix ~= to!(double[])(ls);
        }
  • Exception Handling:
    Any exceptions thrown while iterating over range are re-thrown on a call to popFront.
  • Warning:
    Using the range returned by this function in a parallel foreach loop will not work because buffers may be overwritten while the task that processes them is in queue. This is checked for at compile time and will result in a static assertion failure.
Page Top

template reduce(functions...) anchor.png Edit

Parallel reduce on a random access range. Except as otherwise noted, usage is similar to std.algorithm.reduce . This function works by splitting the range to be reduced into work units, which are slices to be reduced in parallel. Once the results from all work units are computed, a final serial reduction is performed on these results to compute the final answer. Therefore, care must be taken to choose the seed value appropriately.

Because the reduction is being performed in parallel, functions must be associative. For notational simplicity, let # be an infix operator representing functions. Then, (a # b) # c must equal a # (b # c). Floating point addition is not associative even though addition in exact arithmetic is. Summing floating point numbers using this function may give different results than summing serially. However, for many practical purposes floating point addition can be treated as associative.

Note that, since functions are assumed to be associative, additional optimizations are made to the serial portion of the reduction algorithm. These take advantage of the instruction level parallelism of modern CPUs, in addition to the thread-level parallelism that the rest of this module exploits. This can lead to better than linear speedups relative to reduce">std.algorithm.reduce, especially for fine-grained benchmarks like dot products.

An explicit seed may be provided as the first argument. If provided, it is used as the seed for all work units and for the final reduction of results from all work units. Therefore, if it is not the identity value for the operation being performed, results may differ from those generated by std.algorithm.reduce or depending on how many work units are used. The next argument must be the range to be reduced.

Everything is expanded.Everything is shortened.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
-
|
|
|
|
|
!
 
 
 
    // Find the sum of squares of a range in parallel, using an explicit seed.
    //
    // Timings on an Athlon 64 X2 dual core machine:
    //
    // Parallel reduce:                     72 milliseconds
    // Using std.algorithm.reduce instead:  181 milliseconds
    auto nums = iota(10_000_000.0f);
    auto sumSquares = taskPool.reduce!"a + b"(
        0.0, std.algorithm.map!"a * a"(nums)
    );

If no explicit seed is provided, the first element of each work unit is used as a seed. For the final reduction, the result from the first work unit is used as the seed.

Everything is expanded.Everything is shortened.
1
2
3
-
|
!
    // Find the sum of a range in parallel, using the first element of each
    // work unit as the seed.
    auto sum = taskPool.reduce!"a + b"(nums);

An explicit work unit size may be specified as the last argument. Specifying too small a work unit size will effectively serialize the reduction, as the final reduction of the result of each work unit will dominate computation time. If TaskPool.size for this instance is zero, this parameter is ignored and one work unit is used.

Everything is expanded.Everything is shortened.
1
2
3
4
5
-
!
 
-
!
    // Use a work unit size of 100.
    auto sum2 = taskPool.reduce!"a + b"(nums, 100);
 
    // Work unit size of 100 and explicit seed.
    auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);

Parallel reduce supports multiple functions, like std.algorithm.reduce.

Everything is expanded.Everything is shortened.
1
2
3
4
-
!
 
 
    // Find both the min and max of nums.
    auto minMax = taskPool.reduce!(min, max)(nums);
    assert(minMax[0] == reduce!min(nums));
    assert(minMax[1] == reduce!max(nums));
  • Exception Handling:
    After this function is finished executing, any exceptions thrown are chained together via Throwable.next and rethrown. The chaining order is non-deterministic.
Page Top

auto reduce(Args...)(Args args); anchor.png Edit

Page Top

const nothrow @property @safe size_t workerIndex(); anchor.png Edit

Gets the index of the current thread relative to this TaskPool. Any thread not in this pool will receive an index of 0. The worker threads in this pool receive unique indices of 1 through this.size.

This function is useful for maintaining worker-local resources.

  • Examples:
    Everything is expanded.Everything is shortened.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    -
    |
    |
    |
    !
     
     
    -
    |
    -
    -
    |
    !
    !
    |
    -
    |
    !
    |
    -
    |
    |
    !
    !
    
        // Execute a loop that computes the greatest common divisor of every
        // number from 0 through 999 with 42 in parallel.  Write the results out to
        // a set of files, one for each thread.  This allows results to be written
        // out without any synchronization.
     
        import std.stdio, std.conv, std.range, std.numeric;
     
        void main() {
            auto filesHandles = new File[taskPool.size + 1];
            scope(exit) {
                foreach(ref handle; fileHandles) {
                    handle.close();
                }
            }
     
            foreach(i, ref handle; fileHandles) {
                handle = File("workerResults" ~ to!string(i) ~ ".txt");
            }
     
            foreach(num; parallel(iota(1_000))) {
                auto outHandle = fileHandles[taskPool.workerIndex];
                outHandle.writeln(num, '\t', gcd(num, 42));
            }
        }
Page Top

struct WorkerLocalStorage(T); anchor.png Edit

Struct for creating worker-local storage. Worker-local storage is thread-local storage that exists only for worker threads in a given TaskPool plus a single thread outside the pool. It is allocated on the garbage collected heap in a way that avoids false sharing, and doesn't necessarily have global scope within any thread. It can be accessed from any worker thread in the TaskPool that created it, and one thread outside this TaskPool. All threads outside the pool that created a given instance of worker-local storage share a single slot.

Since the underlying data for this struct is heap-allocated, this struct has reference semantics when passed between functions.

The main uses cases for WorkerLocalStorageStorage are:

  1. Performing parallel reductions with an imperative, as opposed to functional, programming style. In this case, it's useful to treat WorkerLocalStorageStorage as local to each thread for only the parallel portion of an algorithm.
  2. Recycling temporary buffers across iterations of a parallel foreach loop.

Examples:

Everything is expanded.Everything is shortened.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
-
|
!
 
 
 
-
|
|
|
!
 
-
!
-
|
!
    // Calculate pi as in our synopsis example, but use an imperative instead
    // of a functional style.
    immutable n = 1_000_000_000;
    immutable delta = 1.0L / n;
 
    auto sums = taskPool.workerLocalStorage(0.0L);
    foreach(i; parallel(iota(n))) {
        immutable x = ( i - 0.5L ) * delta;
        immutable toAdd = delta / ( 1.0 + x * x );
        sums.get = sums.get + toAdd;
    }
 
    // Add up the results from each worker thread.
    real pi = 0;
    foreach(threadResult; sums.toRange) {
        pi += 4.0L * threadResult;
    }
Page Top

@property T get(); anchor.png Edit

Get the current thread's instance. Returns by ref. Note that calling get from any thread outside the TaskPool that created this instance will return the same reference, so an instance of worker-local storage should only be accessed from one thread outside the pool that created it. If this rule is violated, undefined behavior will result.

If assertions are enabled and toRange has been called, then this WorkerLocalStorage instance is no longer worker-local and an assertion failure will result when calling this method. This is not checked when assertions are disabled for performance reasons.

Page Top

@property void get(T val); anchor.png Edit

Assign a value to the current thread's instance. This function has the same caveats as its overload.

Page Top

@property WorkerLocalStorageRange!(T) toRange(); anchor.png Edit

Returns a range view of the values for all threads, which can be used to further process the results of each thread after running the parallel part of your algorithm. Do not use this method in the parallel portion of your algorithm.

Calling this function sets a flag indicating that this struct is no longer worker-local, and attempting to use the get method again will result in an assertion failure if assertions are enabled.

Page Top

WorkerLocalStorage!(T) workerLocalStorage(T)(lazy T initialVal = T.init); anchor.png Edit

Creates an instance of worker-local storage, initialized with a given value. The value is lazy so that you can, for example, easily create one instance of a class for each worker. For usage example, see the WorkerLocalStorage struct.

Page Top

@trusted void stop(); anchor.png Edit

Signals to all worker threads to terminate as soon as they are finished with their current Task, or immediately if they are not executing a Task. Tasks that were in queue will not be executed unless a call to Task.workForce, Task.yieldForce or Task.spinForce causes them to be executed.

Use only if you have waitied on every Task and therefore know the queue is empty, or if you speculatively executed some tasks and no longer need the results.

Page Top

@trusted void finish(); anchor.png Edit

Signals worker threads to terminate when the queue becomes empty. Does not block.

Page Top

const pure nothrow @property @safe size_t size(); anchor.png Edit

Returns the number of worker threads in the pool.

Page Top

void put(alias fun, Args...)(ref Task!(fun,Args) task);
void put(alias fun, Args...)(Task!(fun,Args)* task); anchor.png Edit

Put a Task object on the back of the task queue. The Task object may be passed by pointer or reference.

  • Example:
    Everything is expanded.Everything is shortened.
    1
    2
    3
    4
    5
    6
    7
    
     
     
    -
    !
     
    -
    !
    
        import std.file;
        
        // Create a task.
        auto t = task!read("foo.txt");
        
        // Add it to the queue to be executed.
        taskPool.put(t);
  • Notes:
    @trusted overloads of this function are called for Tasks if std.traits.hasUnsharedAliasing is false for the Task's return type or the function the Task executes is pure. Task objects that meet all other requirements specified in the @trusted overloads of task and scopedTask may be created and executed from @safe code via Task.executeInNewThread but not via TaskPool.

While this function takes the address of variables that may be on the stack, some overloads are marked as @trusted. Task includes a destructor that waits for the task to complete before destroying the stack frame it is allocated on. Therefore, it is impossible for the stack frame to be destroyed before the task is complete and no longer referenced by a TaskPool.

Page Top

@property @trusted bool isDaemon();
@property @trusted void isDaemon(bool newVal); anchor.png Edit

These properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a program from terminating as long as it has not terminated.

If any TaskPool with non-daemon threads is active, either stop or finish must be called on it before the program can terminate.

The worker treads in the TaskPool instance returned by the taskPool property are daemon by default. The worker threads of manually instantiated task pools are non-daemon by default.

Note:
For a size zero pool, the getter arbitrarily returns true and the setter has no effect.

Page Top

@property @trusted int priority();
@property @trusted void priority(int newPriority); anchor.png Edit

These functions allow getting and setting the OS scheduling priority of the worker threads in this TaskPool. They forward to core.thread.Thread.priority, so a given priority value here means the same thing as an identical priority value in core.thread.

  • Note:
    For a size zero pool, the getter arbitrarily returns core.thread.Thread.PRIORITY_MIN and the setter has no effect.
Page Top

@property @trusted TaskPool taskPool(); anchor.png Edit

Returns a lazily initialized global instantiation of TaskPool. This function can safely be called concurrently from multiple non-worker threads. The worker threads in this pool are daemon threads, meaning that it is not necessary to call TaskPool.stop or TaskPool.finish before terminating the main thread.

遅延評価で初期化するTaskPoolのグローバルなインスタンスを返します。この関数は複数の非ワーカースレッドから平行して呼ぶことが可能です。このプールのワーカースレッドはデーモンスレッドです。要するに、メインスレッドを終了する前にTaskPool.stopやTaskPool.finishを呼ぶ必要はありません。

Page Top

@property @trusted uint defaultPoolThreads();
@property @trusted void defaultPoolThreads(uint newVal); anchor.png Edit

These properties get and set the number of worker threads in the TaskPool instance returned by taskPool. The default value is totalCPUs - 1. Calling the setter after the first call to taskPool does not changes number of worker threads in the instance returned by taskPool.

これらプロパティ関数はtaskPoolによって返されるTaskPoolに対してワーカースレッドの数をget/setします。デフォルト値はtotalCPUs-1です。最初にtaskPoolを呼んだ後にsetterを呼んでも、taskPoolから返されたインスタンスのワーカースレッドの数は変更されません。

Page Top

ParallelForeach!(R) parallel(R)(R range);
ParallelForeach!(R) parallel(R)(R range, size_t workUnitSize); anchor.png Edit

Convenience functions that forwards to taskPool.parallel. The purpose of these is to make parallel foreach less verbose and more readable. 先のtaskPool.parallelの便利関数。parallel foreachの冗長性をなくし、読みやすくします。

  • Example:
    Everything is expanded.Everything is shortened.
    1
    2
    3
    4
    5
    6
    7
    
    -
    |
    !
     
    -
    |
    !
    
    // Find the logarithm of every number from 1 to 1_000_000 in parallel,
    // using the default TaskPool instance.
    auto logs = new double[1_000_000];
     
    foreach(i, ref elem; parallel(logs)) {
        elem = log(i + 1.0);
    }

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード印刷に適した表示   ページ新規作成 全ページ一覧 単語検索 最新ページの一覧   ヘルプ   最新ページのRSS 1.0 最新ページのRSS 2.0 最新ページのRSS Atom Powered by xpWiki
Counter: 4066, today: 3, yesterday: 2
初版日時: 2011-12-21 (水) 20:14:48
最終更新: 2011-12-21 (水) 20:14:48 (JST) (2128d) by SHOO
メインメニュー

ログイン

ユーザー名:


パスワード:





パスワード紛失  |新規登録

Menu