Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

Class reliable_message_queue

boost::log::ipc::reliable_message_queue — A reliable interprocess message queue.

Synopsis

// In header: <boost/log/utility/ipc/reliable_message_queue.hpp>


class reliable_message_queue {
public:
  // types
  typedef uint32_t size_type;  // Queue message size type. 

  // Result codes for various operations on the queue. 
  enum operation_result { succeeded, no_space, aborted };

  // Interprocess queue overflow policies. 
  enum overflow_policy { block_on_overflow, fail_on_overflow, 
                         throw_on_overflow };

  // construct/copy/destruct
  reliable_message_queue() noexcept;
  reliable_message_queue(open_mode::create_only_tag, object_name const &, 
                         uint32_t, size_type, 
                         overflow_policy = block_on_overflow, 
                         permissions const & = permissions());
  reliable_message_queue(open_mode::open_or_create_tag, object_name const &, 
                         uint32_t, size_type, 
                         overflow_policy = block_on_overflow, 
                         permissions const & = permissions());
  reliable_message_queue(open_mode::open_only_tag, object_name const &, 
                         overflow_policy = block_on_overflow, 
                         permissions const & = permissions());
  template<typename... Args> explicit reliable_message_queue(Args const &...);
  reliable_message_queue(reliable_message_queue &&) noexcept;
  reliable_message_queue & operator=(reliable_message_queue &&) noexcept;
  ~reliable_message_queue();

  // public member functions
  void swap(reliable_message_queue &) noexcept;
  void create(object_name const &, uint32_t, size_type, 
              overflow_policy = block_on_overflow, 
              permissions const & = permissions());
  void open_or_create(object_name const &, uint32_t, size_type, 
                      overflow_policy = block_on_overflow, 
                      permissions const & = permissions());
  void open(object_name const &, overflow_policy = block_on_overflow, 
            permissions const & = permissions());
  bool is_open() const noexcept;
  void clear();
  object_name const  & name() const;
  uint32_t capacity() const;
  size_type block_size() const;
  void stop_local();
  void reset_local();
  void close() noexcept;
  operation_result send(void const *, size_type);
  bool try_send(void const *, size_type);
  operation_result receive(void *, size_type, size_type &);
  template<typename ElementT, size_type SizeV> 
    operation_result receive(ElementT(&), size_type &);
  template<typename ContainerT> operation_result receive(ContainerT &);
  bool try_receive(void *, size_type, size_type &);
  template<typename ElementT, size_type SizeV> 
    bool try_receive(ElementT(&), size_type &);
  template<typename ContainerT> bool try_receive(ContainerT &);

  // friend functions
  friend void swap(reliable_message_queue &, reliable_message_queue &) noexcept;

  // public static functions
  static void remove(object_name const &);
};

Description

The queue implements a reliable one-way channel of passing messages from one or multiple writers to a single reader. The format of the messages is user-defined and must be consistent across all writers and the reader. The queue does not enforce any specific format of the messages, other than they should be supplied as a contiguous array of bytes.

The queue internally uses a process-shared storage identified by an object_name (the queue name). Refer to object_name documentation for details on restrictions imposed on object names.

The queue storage is organized as a fixed number of blocks of a fixed size. The block size must be an integer power of 2 and is expressed in bytes. Each written message, together with some metadata added by the queue, consumes an integer number of blocks. Each read message received by the reader releases the blocks allocated for that message. As such the maximum size of a message is slightly less than block size times capacity of the queue. For efficiency, it is recommended to choose block size large enough to accommodate most of the messages to be passed through the queue.

The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point of enqueueing a message when there is not enough free blocks to accommodate the message.

The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies.

If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer.

A blocked reader or writer can be unblocked by calling stop_local. After this method is called, all threads blocked on this particular object are released and return operation_result::aborted. The other instances of the queue (in the current or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the stop_local call the user has to invoke reset_local.

The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a particular writer thread will be received in the order of sending.

Methods of this class are not thread-safe, unless otherwise specified.

reliable_message_queue public construct/copy/destruct

  1. reliable_message_queue() noexcept;

    Default constructor. The method constructs an object that is not associated with any message queue.

    Postconditions:

    is_open() == false

  2. reliable_message_queue(open_mode::create_only_tag, object_name const & name, 
                           uint32_t capacity, size_type block_size, 
                           overflow_policy oflow_policy = block_on_overflow, 
                           permissions const & perms = permissions());

    Constructor. The method is used to construct an object and create the associated message queue. The constructed object will be in running state if the message queue is successfully created.

    Parameters:

    block_size

    Size in bytes of allocation block. Must be a power of 2.

    capacity

    Maximum number of allocation blocks the queue can hold.

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue.

    Postconditions:

    is_open() == true

  3. reliable_message_queue(open_mode::open_or_create_tag, 
                           object_name const & name, uint32_t capacity, 
                           size_type block_size, 
                           overflow_policy oflow_policy = block_on_overflow, 
                           permissions const & perms = permissions());

    Constructor. The method is used to construct an object and create or open the associated message queue. The constructed object will be in running state if the message queue is successfully created or opened. If the message queue that is identified by the name already exists then the other queue parameters are ignored. The actual queue parameters can be obtained with accessors from the constructed object.

    Parameters:

    block_size

    Size in bytes of allocation block. Must be a power of 2.

    capacity

    Maximum number of allocation blocks the queue can hold.

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue.

    Postconditions:

    is_open() == true

  4. reliable_message_queue(open_mode::open_only_tag, object_name const & name, 
                           overflow_policy oflow_policy = block_on_overflow, 
                           permissions const & perms = permissions());

    Constructor. The method is used to construct an object and open the existing message queue. The constructed object will be in running state if the message queue is successfully opened.

    Parameters:

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue. The permissions will only be used if the queue implementation has to create system objects while operating. This parameter is currently not used on POSIX systems.

    Postconditions:

    is_open() == true

  5. template<typename... Args> 
      explicit reliable_message_queue(Args const &... args);

    Constructor with named parameters. The method is used to construct an object and create or open the associated message queue. The constructed object will be in running state if the message queue is successfully created.

    The following named parameters are accepted:

    • open_mode - One of the open mode tags: open_mode::create_only, open_mode::open_only or open_mode::open_or_create.

    • name - Name of the message queue to be associated with.

    • capacity - Maximum number of allocation blocks the queue can hold. Used only if the queue is created.

    • block_size - Size in bytes of allocation block. Must be a power of 2. Used only if the queue is created.

    • overflow_policy - Queue behavior policy in case of overflow, see overflow_policy.

    • permissions - Access permissions for the associated message queue.

    Postconditions:

    is_open() == true

  6. reliable_message_queue(reliable_message_queue && that) noexcept;

    Move constructor. The method move-constructs an object from other. After the call, the constructed object becomes other, while other is left in default constructed state.

    Parameters:

    that

    The object to be moved.

  7. reliable_message_queue & operator=(reliable_message_queue && that) noexcept;

    Move assignment operator. If the object is associated with a message queue, close() is first called and the precondition to calling close() applies. After the call, the object becomes that while that is left in default constructed state.

    Parameters:

    that

    The object to be moved.

    Returns:

    A reference to the assigned object.

  8. ~reliable_message_queue();

    Destructor. Calls close().

reliable_message_queue public member functions

  1. void swap(reliable_message_queue & that) noexcept;

    The method swaps the object with that.

    Parameters:

    that

    The other object to swap with.

  2. void create(object_name const & name, uint32_t capacity, size_type block_size, 
                overflow_policy oflow_policy = block_on_overflow, 
                permissions const & perms = permissions());

    The method creates the message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully created.

    Parameters:

    block_size

    Size in bytes of allocation block. Must be a power of 2.

    capacity

    Maximum number of allocation blocks the queue can hold.

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue.

    Requires:

    is_open() == false

    Postconditions:

    is_open() == true

  3. void open_or_create(object_name const & name, uint32_t capacity, 
                        size_type block_size, 
                        overflow_policy oflow_policy = block_on_overflow, 
                        permissions const & perms = permissions());

    The method creates or opens the message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully created or opened. If the message queue that is identified by the name already exists then the other queue parameters are ignored. The actual queue parameters can be obtained with accessors from this object after this method returns.

    Parameters:

    block_size

    Size in bytes of allocation block. Must be a power of 2.

    capacity

    Maximum number of allocation blocks the queue can hold.

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue.

    Requires:

    is_open() == false

    Postconditions:

    is_open() == true

  4. void open(object_name const & name, 
              overflow_policy oflow_policy = block_on_overflow, 
              permissions const & perms = permissions());

    The method opens the existing message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully opened.

    Parameters:

    name

    Name of the message queue to be associated with.

    oflow_policy

    Queue behavior policy in case of overflow.

    perms

    Access permissions for the associated message queue. The permissions will only be used if the queue implementation has to create system objects while operating. This parameter is currently not used on POSIX systems.

    Requires:

    is_open() == false

    Postconditions:

    is_open() == true

  5. bool is_open() const noexcept;

    Tests whether the object is associated with any message queue.

    Returns:

    true if the object is associated with a message queue, and false otherwise.

  6. void clear();

    This method empties the associated message queue. Concurrent calls to this method, send(), try_send(), receive(), try_receive(), and stop_local() are allowed.

    Requires:

    is_open() == true

  7. object_name const  & name() const;

    The method returns the name of the associated message queue.

    Requires:

    is_open() == true

    Returns:

    Name of the associated message queue

  8. uint32_t capacity() const;

    The method returns the maximum number of allocation blocks the associated message queue can hold. Note that the returned value may be different from the corresponding value passed to the constructor or open_or_create(), for the message queue may not have been created by this object.

    Requires:

    is_open() == true

    Returns:

    Maximum number of allocation blocks the associated message queue can hold.

  9. size_type block_size() const;

    The method returns the allocation block size, in bytes. Each message in the associated message queue consumes an integer number of allocation blocks. Note that the returned value may be different from the corresponding value passed to the constructor or open_or_create(), for the message queue may not have been created by this object.

    Requires:

    is_open() == true

    Returns:

    Allocation block size, in bytes.

  10. void stop_local();

    The method wakes up all threads that are blocked in calls to send() or receive(). Those calls would then return operation_result::aborted. Note that, the method does not block until the woken-up threads have actually returned from send() or receive(). Other means is needed to ensure that calls to send() or receive() have returned, e.g., joining the threads that might be blocking on the calls.

    The method also puts the object in stopped state. When in stopped state, calls to send() or receive() will return immediately with return value operation_result::aborted when they would otherwise block in running state.

    Concurrent calls to this method, send(), try_send(), receive(), try_receive(), and clear() are allowed.

    Requires:

    is_open() == true

  11. void reset_local();

    The method puts the object in running state where calls to send() or receive() may block. This method is not thread-safe.

    Requires:

    is_open() == true

  12. void close() noexcept;

    The method disassociates the associated message queue, if any. No other threads should be using this object before calling this method. The stop_local() method can be used to have any threads currently blocked in send() or receive() return, and prevent further calls to them from blocking. Typically, before calling this method, one would first call stop_local() and then join all threads that might be blocking on send() or receive() to ensure that they have returned from the calls. The associated message queue is destroyed if the object represents the last outstanding reference to it.

    Postconditions:

    is_open() == false

  13. operation_result send(void const * message_data, size_type message_size);

    The method sends a message to the associated message queue. When the object is in running state and the queue has no free space for the message, the method either blocks or throws an exception, depending on the overflow policy that was specified on the queue opening/creation. If blocking policy is in effect, the blocking can be interrupted by calling stop_local(), in which case the method returns operation_result::aborted. When the object is already in the stopped state, the method does not block but returns immediately with return value operation_result::aborted.

    It is possible to send an empty message by passing 0 to the parameter message_size.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Throws: std::logic_error in case if the message size exceeds the queue capacity, system_error in case if a native OS method fails.

    Parameters:

    message_data

    The message data to send. Ignored when message_size is 0.

    message_size

    Size of the message data in bytes. If the size is larger than the associated message queue capacity, an std::logic_error exception is thrown.

    Requires:

    is_open() == true

  14. bool try_send(void const * message_data, size_type message_size);

    The method performs an attempt to send a message to the associated message queue. The method is non-blocking, and always returns immediately. boost::system::system_error is thrown for errors resulting from native operating system calls. Note that it is possible to send an empty message by passing 0 to the parameter message_size. Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Throws: std::logic_error in case if the message size exceeds the queue capacity, system_error in case if a native OS method fails.

    Parameters:

    message_data

    The message data to send. Ignored when message_size is 0.

    message_size

    Size of the message data in bytes. If the size is larger than the maximum size allowed by the associated message queue, an std::logic_error exception is thrown.

    Requires:

    is_open() == true

    Returns:

    true if the message is successfully sent, and false otherwise (e.g., when the queue is full).

  15. operation_result 
    receive(void * buffer, size_type buffer_size, size_type & message_size);

    The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local() is called, in which case the method returns operation_result::aborted. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    buffer

    The memory buffer to store the received message in.

    buffer_size

    The size of the buffer, in bytes.

    message_size

    Receives the size of the received message, in bytes.

    Requires:

    is_open() == true

  16. template<typename ElementT, size_type SizeV> 
      operation_result receive(ElementT(&) buffer, size_type & message_size);

    The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local() is called, in which case the method returns operation_result::aborted. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    buffer

    The memory buffer to store the received message in.

    message_size

    Receives the size of the received message, in bytes.

    Requires:

    is_open() == true

  17. template<typename ContainerT> operation_result receive(ContainerT & container);

    The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local() is called, in which case the method returns operation_result::aborted. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    container

    The container to store the received message in. The container should have value type of char, signed char or unsigned char and support inserting elements at the end.

    Requires:

    is_open() == true

  18. bool try_receive(void * buffer, size_type buffer_size, 
                     size_type & message_size);

    The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    buffer

    The memory buffer to store the received message in.

    buffer_size

    The size of the buffer, in bytes.

    message_size

    Receives the size of the received message, in bytes.

    Requires:

    is_open() == true

    Returns:

    true if a message is successfully received, and false otherwise (e.g., when the queue is empty).

  19. template<typename ElementT, size_type SizeV> 
      bool try_receive(ElementT(&) buffer, size_type & message_size);

    The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    buffer

    The memory buffer to store the received message in.

    message_size

    Receives the size of the received message, in bytes.

    Requires:

    is_open() == true

    Returns:

    true if a message is successfully received, and false otherwise (e.g., when the queue is empty).

  20. template<typename ContainerT> bool try_receive(ContainerT & container);

    The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.

    Concurrent calls to send(), try_send(), receive(), try_receive(), stop_local(), and clear() are allowed.

    Parameters:

    container

    The container to store the received message in. The container should have value type of char, signed char or unsigned char and support inserting elements at the end.

    Requires:

    is_open() == true

    Returns:

    true if a message is successfully received, and false otherwise (e.g., when the queue is empty).

reliable_message_queue friend functions

  1. friend void swap(reliable_message_queue & a, reliable_message_queue & b) noexcept;
    Swaps the two reliable_message_queue objects.

reliable_message_queue public static functions

  1. static void remove(object_name const & name);

    The method frees system-wide resources, associated with the interprocess queue with the supplied name. The queue referred to by the specified name must not be opened in any process at the point of this call. After this call succeeds a new queue with the specified name can be created.

    This call can be useful to recover from an earlier process misbehavior (e.g. a crash without properly closing the message queue). In this case resources allocated for the interprocess queue may remain allocated after the last process closed the queue, which in turn may prevent creating a new queue with the same name. By calling this method before creating a queue the application can attempt to ensure it starts with a clean slate.

    On some platforms resources associated with the queue are automatically reclaimed by the operating system when the last process using those resources terminates (even if it terminates abnormally). On these platforms this call may be a no-op. However, portable code should still call this method at appropriate places to ensure compatibility with other platforms and future library versions, which may change implementation of the queue.

    Parameters:

    name

    Name of the message queue to be removed.


PrevUpHomeNext