int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);
The zmq_proxy_steerable() function starts the built-in 0MQ proxy in the current application thread, as zmq_proxy() do. Please, refer to this function for the general description and usage. We describe here only the additional control flow provided by the socket passed as the fourth argument "control".
If the control socket is not NULL, the proxy supports control flow. If PAUSE is received on this socket, the proxy suspends its activities. If RESUME is received, it goes on. If TERMINATE is received, it terminates smoothly. If STATISTICS is received, the proxy will reply on the control socket sending a multipart message with 8 frames, each with an unsigned integer 64-bit wide that provide in the following order: - number of messages received by the frontend socket - number of bytes received by the frontend socket - number of messages sent out the frontend socket - number of bytes sent out the frontend socket - number of messages received by the backend socket - number of bytes received by the backend socket - number of messages sent out the backend socket - number of bytes sent out the backend socket
At start, the proxy runs normally as if zmq_proxy was used.
If the control socket is NULL, the function behave exactly as if zmq_proxy(3) had been called.
Refer to zmq_socket(3) for a description of the available socket types. Refer to zmq_proxy(3) for a description of the zmq_proxy.
The zmq_proxy_steerable() function returns 0 if TERMINATE is sent to its control socket. Otherwise, it returns -1 and errno set to ETERM or EINTR (the 0MQ context associated with either of the specified sockets was terminated) or EFAULT (the provided frontend or backend was invalid).
Creating a shared queue proxy.
// Create frontend, backend and control sockets void *frontend = zmq_socket (context, ZMQ_ROUTER); assert (frontend); void *backend = zmq_socket (context, ZMQ_DEALER); assert (backend); void *control = zmq_socket (context, ZMQ_SUB); assert (control); // Bind sockets to TCP ports assert (zmq_bind (frontend, "tcp://*:5555") == 0); assert (zmq_bind (backend, "tcp://*:5556") == 0); assert (zmq_connect (control, "tcp://*:5557") == 0); // Subscribe to the control socket since we have chosen SUB here assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); // Start the queue proxy, which runs until ETERM or "TERMINATE" // received on the control socket zmq_proxy_steerable (frontend, backend, NULL, control);
Set up a controller in another node, process or whatever.
void *control = zmq_socket (context, ZMQ_PUB); assert (control); assert (zmq_bind (control, "tcp://*:5557") == 0); // pause the proxy assert (zmq_send (control, "PAUSE", 5, 0) == 0); // resume the proxy assert (zmq_send (control, "RESUME", 6, 0) == 0); // terminate the proxy assert (zmq_send (control, "TERMINATE", 9, 0) == 0); // check statistics assert (zmq_send (control, "STATISTICS", 10, 0) == 0); zmq_msg_t stats_msg; while (1) { assert (zmq_msg_init (&stats_msg) == 0); assert (zmq_recvmsg (control, &stats_msg, 0) == sizeof (uint64_t)); assert (rc == sizeof (uint64_t)); printf ("Stat: %lu\n", *(unsigned long int *)zmq_msg_data (&stats_msg)); if (!zmq_msg_get (&stats_msg, ZMQ_MORE)) break; assert (zmq_msg_close (&stats_msg) == 0); } assert (zmq_msg_close (&stats_msg) == 0); --- SEE ALSO
zmq_proxy(3) zmq_bind(3) zmq_connect(3) zmq_socket(3) zmq(7)
This page was written by the 0MQ community. To make a change please read the 0MQ Contribution Policy at m[blue]http://www.zeromq.org/docs:contributingm[].