Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
PrevUpHomeNext

when_all, return values

As soon as we want to collect return values from all the task functions, we can see right away how to reuse wait_first_value()'s queue<T> for the purpose. All we have to do is avoid closing it after the first value!

But in fact, collecting multiple values raises an interesting question: do we really want to wait until the slowest of them has arrived? Wouldn't we rather process each result as soon as it becomes available?

Fortunately we can present both APIs. Let's define wait_all_values_source() to return shared_ptr<buffered_channel<T>>.

Given wait_all_values_source(), it's straightforward to implement wait_all_values():

template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_values( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef std::vector< return_t > vector_t;
    vector_t results;
    results.reserve( count);

    // get channel
    std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
        wait_all_values_source( std::forward< Fn >( function),
                                std::forward< Fns >( functions) ... );
    // fill results vector
    return_t value;
    while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
        results.push_back( value);
    }
    // return vector to caller
    return results;
}

It might be called like this:

std::vector< std::string > values =
    wait_all_values(
            [](){ return sleeper("wav_late",   150); },
            [](){ return sleeper("wav_middle", 100); },
            [](){ return sleeper("wav_early",   50); });

As you can see from the loop in wait_all_values(), instead of requiring its caller to count values, we define wait_all_values_source() to buffered_channel::close() the queue when done. But how do we do that? Each producer fiber is independent. It has no idea whether it is the last one to buffered_channel::push() a value.

We can address that problem with a counting façade for the queue<>. In fact, our façade need only support the producer end of the queue.

[wait_nqueue]

Armed with nqueue<>, we can implement wait_all_values_source(). It starts just like wait_first_value(). The difference is that we wrap the queue<T> with an nqueue<T> to pass to the producer fibers.

Then, of course, instead of popping the first value, closing the queue and returning it, we simply return the shared_ptr<queue<T>>.

// Return a shared_ptr<buffered_channel<T>> from which the caller can
// retrieve each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::buffered_channel< return_t > channel_t;
    // make the channel
    auto chanp( std::make_shared< channel_t >( 64) );
    // and make an nchannel facade to close it after 'count' items
    auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
    // pass that nchannel facade to all the relevant fibers
    wait_all_values_impl< return_t >( ncp,
                                      std::forward< Fn >( function),
                                      std::forward< Fns >( functions) ... );
    // then return the channel for consumer
    return chanp;
}

For example:

std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
    wait_all_values_source(
            [](){ return sleeper("wavs_third",  150); },
            [](){ return sleeper("wavs_second", 100); },
            [](){ return sleeper("wavs_first",   50); });
std::string value;
while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
    std::cout << "wait_all_values_source() => '" << value
              << "'" << std::endl;
}

wait_all_values_impl() really is just like wait_first_value_impl() except for the use of nqueue<T> rather than queue<T>:

template< typename T, typename Fn >
void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
                           Fn && function) {
    boost::fibers::fiber( [chan, function](){
                              chan->push(function());
                          }).detach();
}


PrevUpHomeNext