Socket-based IPC Enhancement for D2D

Updates

 

        On 4 December 1996, MarySue Schultz, Gerry Murray, Mike Biere, Xiang Bao Jing, Jim Fluke, Herb Grote, and myself discussed this design and made excellent progress at improving it. The changes from this meeting are listed with red change-bars.

Overview

 
        • Coexists with DMQ
        • Uses high-level features of RPC and sockets
        • Uses no HP/UX extensions
        • Allows nonblocking sending of data with queuing
        • Supports event dispatching via the select() system call
        • Robust and restartable
 

        We will deploy the socket based connection enhancement as a separate IPC library with an identical and compatible interface to that of the current IPC library. We'll be able to select which library will be used during a build by adjusting the definition of LIB_IPC in the src/Makefile.config.

 

        Our IPC library was intelligently designed to support multiple, concurrent connection mechanisms. DMQ has been the sole type for some time, and it's time another one come along to keep it company. But more importantly, we don't want to take too much of a risk and replace DMQ in its entirety at once. Therefore, both connection types will be available.

        The D2D display software will use the new socket-based connection type because it especially needs the event dispatching features the socket type will provide. The D2D data ingest software will continue to use the DMQ connection type.

 

        If, after implementation, we find that the IPC servers become bogged down forwarding messages to remote IPC servers, we'll modify the servers to start a child process whose sole responsibility will be message forwarding. This will enable the main servers to continue to receive RPCs and sending of messages to processes that are ready.

IPC Server

 

        In addition, the IPC server will maintain statistics on its usage and performance which will be available via an RPC. These statistics will be similar to those kept by the notification server and will help us determine its efficiency and debug problems.

 

        It's also possible that the D2D monitor system could watch the execution of rpc.ipcd and use either its log file or the null RPC procedure to determine if it's running and not hung. The monitor can automatically restart the daemon if it dies or becomes unresponsive.

 

        The daemon then enters its main loop. It builds a set of writable file descriptors (an fd_set) for each process' queue that has messages pending. It passes this set, along with the set of readable file descriptors being used by RPC, to the select() system call, with an infinite timeout. If any of the process' file descriptors are writable, it then dequeues a message and sends it. If the RPC file descriptors are readable, they're passed to the RPC service routines.

 

        Note that the getMsg and stats RPCs are new.

 

        When a SocketConnection has a DeliveryMode of SYNC and it's being used to send to a remote process, the Message::send method won't return until the message is placed on the queue of the remote IPC server. If DeliveryMode is ASYNC, then Message::send returns as soon as the message is given to the local IPC server.

        When a SocketConnection is being used to send to a local process, SYNC and ASYNC have the same meaning: Message::send will return when the message is delivered to the local IPC server.

        When a process wants to receive a message from a specific IPC target, it uses the getMsg RPC. This call returns returns a Message object whose source address matches the given IPC target, or a flag indicating that there are no messages in the queue from the target.

 

        Every pass through the IPC server's main loop includes a step to send at least one message to each process that has messages pending. If a process is not ready to accept a message, the IPC server will note the fact and will try again on the next pass through the loop.

        After trying to send a single message 600 times, the IPC server will assume the process is hung or dead and will cancel its registration as if it called the cancel RPC. Since the IPC server waits a minimum of half a second in each pass through its main loop, this gives a stuck process five minutes to unstick itself. A successful send resets a process' retry counter back to zero.

        Again, the retry limit will be a prominently defined constant.

        Note that sends to a nonexistent process versus one that's busy can be detected. Delayed sends increase the retry counter, but sends that fail outright cause the registration to be immediately canceled.

 

        The select() call in rpc.ipcd's main loop also checks to see if any of its client processes' sockets have exception conditions pending. The IPC server uses this information to clean up dead IPC queues.

        When a client process dies, an exception condition will be raised on the IPC server's socket that was connected to that process. The IPC server will call treat this condition as if the client had called the cancel RPC.

 

        To find out if the queue has messages that need to be sent, the IPC server will call the pendingDescriptor method. This method will return -1 if there are no messages pending. If there are, it returns the file descriptor of the target's receive socket. The IPC server passes this file descriptor to select().

        If a return from select() indicates that a process is ready to receive a message, the IPC server will call the sendMsg() method, which sends the message. This method returns true if succesful, false otherwise (and leaves errno set).

 

        When rpc.ipcd enters its main loop, it uses the method pendingDescriptors to get an fd_set of file descriptors on which the IPC_Server wants to write. After the select returns, it passes a modified set of descriptors back to IPC_Server, which calls the the sendMsg method of the IPC_Queues that are marked ready in the fd_set.

        The main loop also calls handleExceptions with an fd_set; the IPC_Server singleton maps these to IPC_Queue objects to be destroyed.

The Socket Targets

 

        We'll use the FXA config file to hold the socket target information. One of the config file items will be SocketConnection.targets, and its value will be a list of pairs. Each pair lists a named target and a hostname.

 

        Here's an example:

        SocketConnection.targets: \
           "RADAR_SERVER",        "bluebird",           \
           "TEXT_DB1",            "fsldata1",           \
           "TEXT_DB2",            "fsldata1",           \
           "TEXT_NOTIFICATION",   "137.75.43.2",        \
           "NOTIFICATION_SERVER", "yarmouth",           \
           "RADAR_MSG_HANDLER",   "snipe.fsl.noaa.gov", \
           "TABLE_SERVER",        "localhost"

        To be honest, I think this is utterly disgusting. I  cannot begin to fathom what the benefit here is versus your typical, white-space delimited, Unix-style configuration file that almost everyone is familiar with, and (better yet), which is easily handled and manipulated with tools like grep, awk, the C preprocessor, etc.

Changes to the IPC Library

 

        Class Connection defines two connection types: SYNC and ASYNC. The socket connection type is another kind of ASYNC type. But because we want to have just the D2D display software use sockets, we'll add a third connection type called ASYNC_SOCKET. This will enable the existing D2D data ingest code to continue to use DMQ. I'll modify all the display message classes to use ASYNC_SOCKET instead of ASYNC, as they're doing now.

        The getConnection method checks to see if the message size is under DMQ's limit. Because the socket type doesn't have a limit, the if statement on line 113 of Connection.C needs to be modified:

        if (msgLength > DMQ_Connection::maxMsgLength
            && deliverMode != ASYNC_SOCKET)
 

        Processes that wish to do event dispatching need to register these file descriptors with the EventDispatcher. The socket IPC library will include the following class:

        #include "EventDispatcher.H"
        
        class ConnectionClient : public DescriptorEventClient
            {
            public:
                SocketConnectionClient(Connection* connection)
                    : _connection(connection) {}
            private:
                virtual int descriptorEventMask()
                    { return DescriptorEventClient::Readable; }
                virtual int descriptor()
                    { return _connection->fd(); }
                virtual void handleEvent()
                    { _connection->waitForAnyMessage(); }
                Connection* _connection;
            };

        The main function of processes using the EventDispatcher need to create and register an object of this class by iterating through the Connection class' active list. (It'd be nicer if the Connection class could produce such objects, but then we'd force people using the IPC library to link with the EventDispatcher.) The following fragment will become typical:

        for (int i = 0; i < Connection::activeList().length(); ++i)
            getEventDispatcher()
                ->registerDescriptorClient(new
                    ConnectionClient(Connection::activeList()[i]));

        You know, it might be better to make this fragment an inline free function in ConnectionClient.H, so that people linking with libipc don't accidentally use it. You'd get to it only by #including ConnectionClient.H.

 

        To wait for a message from a specific IPC target, the SocketConnection class uses the getMsg RPC. We implement some (admittedly primitive) form of polling (unless the timeout is set to NO_WAIT). Here's the algorithm:

        if (timeout == NO_WAIT)
            if (getMsg(target, msg))
                processMsg(msg);
                return IPC_SUCCESS;
            else return IPC_TRY_AGAIN_LATER;
        else
            while (true)
                if (getMsg(target, msg))
                    processMsg(msg);
                    return IPC_SUCCESS;
                if (timeout != WAIT_FOREVER)
                    --timeout;
                    if (timeout <= 0)
                        return IPC_TRY_AGAIN_LATER;
                sleep(10th of second);

        In order to wait for messages on both DMQ and socket connections, the waitForMessage method must be drastically rewritten. First, I'll factor out the message processing code and put it in its own static method:

        void Connection::processMsg(byte* msgStream)

        Then, I need to modify waitForMessage to handle the following cases:

        • There is only one Connection in the active list. In this case, I can call that subclass' waitForAnyMessage method directly.
        • There are several Connections in the active list, but they all support event dispatching through file descriptors. In this case, I can use a select to efficiently block, with the given timeout, until one or more of the file descriptors are ready.
        • There are several Connections in the active list, and at least one of them does not support event dispatching through file descriptors. In this case, I have to poll each of the connections for data. This is the really sticky case.

        I've made a skeletal implementation of the code to implement the above. It probably needs a good review, but keep in mind that I'm leaving out the details (such as the msgStream and msgStreamLength parameters).

        Note: I won't be implementing the feature that allows waiting for a message from a certain IPC target in the first version of this software. I will make sure Connection::waitForMessage with the IPC target argument doesn't choke on socket connections.


Sean Kelly