async_node

[flow_graph.async_node]

A node that allows a flow graph to communicate with an external activity managed by the user or another runtime.

// Defined in header <tbb/flow_graph.h>

namespace tbb {
namespace flow {

    template < typename Input, typename Output, typename Policy = /*implemetation-defined*/ >
    class async_node : public graph_node, public receiver<Input>, public sender<Output> {
    public:
        template<typename Body>
        async_node( graph &g, size_t concurrency, Body body, Policy /*unspecified*/ = Policy(),
                    node_priority_t priority = no_priority );
        template<typename Body>
        async_node( graph &g, size_t concurrency, Body body, node_priority_t priority = no_priority );

        async_node( const async_node& src );
        ~async_node();

        using gateway_type = /*implementation-defined*/;
        gateway_type& gateway();

        bool try_put( const input_type& v );
        bool try_get( output_type& v );
    };

} // namespace flow
} // namespace tbb

Requirements:

async_node executes a user-provided body on incoming messages. The body submits input messages to an external activity for processing outside of the task scheduler. This node also provides gateway_type interface that allows the external activity to communicate with the flow graph.

async_node is a graph_node, receiver<Input> and a sender<Output>.

async_node has a discarding and broadcast-push properties.

async_node has a user-settable concurrency limit. It can be set to one of predefined values. The user can also provide a value of type std::size_t to limit concurrency to a value between 1 and tbb::flow::unlimited.

The body object passed to a async_node is copied. Therefore updates to member variables will not affect the original object used to construct the node. If the state held within a body object must be inspected from outside of the node, the copy_body function can be used to obtain an updated copy.

Member functions

gateway_type meets the GatewayType requirements.

Member functions

template<typename Body>
async_node( graph &g, size_t concurrency, Body body,
               node_priority_t priority = no_priority );

Constructs a async_node that will invoke a copy of body. At most concurrency calls to body may be made concurrently.

Allows to specify node priority.


template<typename Body>
async_node( graph &g, size_t concurrency, Body body, Policy /*unspecified*/ = Policy(),
               node_priority_t priority = no_priority );

Constructs a async_node that will invoke a copy of body. At most concurrency calls to body may be made concurrently.

Allows to specify a policy and node priority.


async_node( const async_node &src )

Constructs an async_node that has the same initial state that src had when it was constructed. The async_node that is constructed will have a reference to the same graph object as src, will have a copy of the initial body used by src, and have the same concurrency threshold as src. The predecessors and successors of src will not be copied.

The new body object is copy-constructed from a copy of the original body provided to src at its construction. Therefore changes made to member variables in src’s body after the construction of src will not affect the body of the new async_node.


gateway_type& gateway()

Returns reference to gateway_type interface.


bool try_put( const input_type& v )

A task is spawned that executes the body(v).

Returns: always returns true, it is responsibility of body to be able to pass v to an external activity. If a message is not properly processed by the body it will be lost.


bool try_get( output_type& v )

Returns: false

Example

The example below shows an async_node that submits some work to AsyncActivity for processing by a user thread.

#include "tbb/flow_graph.h"
#include "tbb/concurrent_queue.h"
#include <thread>

using namespace tbb::flow;
typedef int input_type;
typedef int output_type;
typedef tbb::flow::async_node<input_type, output_type> async_node_type;

class AsyncActivity {
public:
    typedef async_node_type::gateway_type gateway_type;

    struct work_type {
        input_type input;
        gateway_type* gateway;
    };

    AsyncActivity() : service_thread( [this]() {
        while( !end_of_work() ) {
            work_type w;
            while( my_work_queue.try_pop(w) ) {
                output_type result = do_work( w.input );
                //send the result back to the graph
                w.gateway->try_put( result );
                // signal that work is done
                w.gateway->release_wait();
            }
        }
    } ) {}

    void submit( input_type i, gateway_type* gateway ) {
        work_type w = {i, gateway};
        gateway->reserve_wait();
        my_work_queue.push(w);
    }
private:
    bool end_of_work() {
        // indicates that the thread should exit
    }

    output_type do_work( input_type& v ) {
        // performs the work on input converting it to output
    }

    tbb::concurrent_queue<work_type> my_work_queue;
    std::thread service_thread;
};

int main() {
    AsyncActivity async_activity;
    tbb::flow::graph g;

    async_node_type async_consumer( g, unlimited,
    // user functor to initiate async processing
    [&] ( input_type input, async_node_type::gateway_type& gateway ) {
    async_activity.submit( input, &gateway );
    } );

    tbb::flow::input_node<input_type> s( g, [](input_type& v)->bool { /* produce data for async work */ } );
    tbb::flow::async_node<output_type> f( g, unlimited, [](const output_type& v) { /* consume data from async work */ } );

    tbb::flow::make_edge( s, async_consumer );
    tbb::flow::make_edge( async_consumer, f );

    s.activiate();
    g.wait_for_all();
}