|
This document describes a design for an enhancement to the IPC library used by D2D. The enhancement adds a socket connection type to the IPC library and enables clients of the library to queue and receive messages over sockets as an alternative to the current connection type using DEC Message Queues (DMQ).
Related documents:
On 4 December 1996, MarySue Schultz, Gerry Murray, Mike Biere, Xiang Bao Jing, Jim Fluke, Herb Grote, and myself discussed this design and made excellent progress at improving it. The changes from this meeting are listed with red change-bars. |
Although DMQ has operated sufficiently to support local and Denver operations of D2D, the time has finally come to address its shortcomings and create an additional IPC transport mechanism built solely out of facilities provided by nearly any Unix (and some non-Unix) operating systems.
Specifically, I intend to create an IPC transport type that
|
Also, there is an opportunity to clean up certain aspects of the IPC library which I intend to do in tandem with the socket enhancements.
We will deploy the socket based connection enhancement as a separate IPC library with an identical and compatible interface to that of the current IPC library. We'll be able to select which library will be used during a build by adjusting the definition of LIB_IPC in the src/Makefile.config. |
I've listed the goals of the socket connection type enhancement, but let me describe each of them in a little more detail in this section. This will help motivate the design overview.
|
Sockets are well-understood, easy to use, and fast. They'll be used to receive messages by processes using the socket-based connection type. To send messages, processes will make remote procedure calls using the RPC library to a server process. Using the tool rpcgen, we can automate the creation of the much of the client- and server-side code. RPC is available on nearly all Unix platforms, and several non-Unix ones, too.
Simple enough: as we've already talked about a low-cost PC version of D2D for Taiwan's Central Weather Bureau, we won't use any value-added HP/UX extensions.
One of DMQ's best features is that a process can ``send and forget.'' DMQ lets a process continue with its computation and not have to worry about retrying or rescheduling messages it needs to send.
As I hope this socket connection type will eventually supersede DMQ, I'll make sure it provides a similar feature. It won't have all the bells and whistles---quotas on queues, for example---but it'll have enough that we should at least experiment with running a complete data ingest installation with the socket IPC transport.
Because it'll use a socket to receive messages, it'll have a socket descriptor on which is can block---or not. This'll make the D2D displays run more efficiently because we'll have zero processes polling instead of 20 or so per workstation.
Too often has DMQ gotten into a strange state where it needed to be restarted. Unfortunately, restarting DMQ means that all processes needing IPC had to restart as well.
To avoid this, I'll keep the socket connection type design fairly simple so we can verify that it'll work robustly. But I'll also make it so that the IPC server can be restarted without restarting processes that depend on it. If time permits, I'll add a feature so that queued messages can be dumped to disk on exit and restored when the server restarts.
The socket connection type relies on two things:
Because processes receive message solely through the IPC server running on the local host, they will create local (also known as Unix-domain) sockets in the directory /tmp/fxaipc. Local sockets are faster than UDP or TCP sockets. If we need to migrate the socket connection type to a non-Unix host, we merely need to change the socket type to TCP, which requires only small amounts of changes to the code.
One IPC server will run on each host. IPC servers will pass IPC messages from host to host, and from server to client. Client processes will never talk directly to another process, but instead to an IPC server. This figure depicts the client/server architecture:
In this figure, the blue line shows process 1 sending a message to process 2. It does so by queuing the message with the IPC server running on host A. The red line shows process 2 sending a message to process 3 running on a remote host. It sends the message to its local IPC server, which forwards it to the IPC server running on host B, which queues it until process 3 is ready. The black arrows show the socket connections between processes.
If, after implementation, we find that the IPC servers become bogged down forwarding messages to remote IPC servers, we'll modify the servers to start a child process whose sole responsibility will be message forwarding. This will enable the main servers to continue to receive RPCs and sending of messages to processes that are ready. |
In many places in the IPC library there is unnecessary use of pointers and dynamic memory, and commented-out code. Because I'll be locking nearly all of the IPC library files, I'll also take an opportunity to clean up and streamline the code.
Also, clients of the IPC library have misused some of the code, particularly the IPC target class. I'll fix these problems throughout the tree.
The IPC server is the central element of the socket-based connection IPC. It has the following responsibilities:
In addition, the IPC server will maintain statistics on its usage and performance which will be available via an RPC. These statistics will be similar to those kept by the notification server and will help us determine its efficiency and debug problems. |
The server is a daemon process using RPC, called rpc.ipcd (the convention for RPC-based programs to start with "rpc." and for daemons to end in "d"). This process should always be running. I'll modify the start-d2d script to check for and start rpc.ipcd if it's ever not running.
It's also possible that the D2D monitor system could watch the execution of rpc.ipcd and use either its log file or the null RPC procedure to determine if it's running and not hung. The monitor can automatically restart the daemon if it dies or becomes unresponsive. |
The server constantly listens for RPCs, executes them, and tries to send IPC messages to processes or forward them to remote rpc.ipcd's. It maintains a mapping from each IPC target to a queue of messages for that target. The RPCs tell the server to register an IPC target, cancel a registration, or send a message.
To start rpc.ipcd, just type rpc.ipcd. It supports two command-line arguments
The daemon then enters its main loop. It builds a set of writable file descriptors (an fd_set) for each process' queue that has messages pending. It passes this set, along with the set of readable file descriptors being used by RPC, to the select() system call, with an infinite timeout. If any of the process' file descriptors are writable, it then dequeues a message and sends it. If the RPC file descriptors are readable, they're passed to the RPC service routines. |
See a skeletal main() for the server in main.C.
The clients of the IPC server are the processes using the socket connection type. The socket connection code hides this fact from developers, though. Developers use the same IPC interface to which they're already accustomed.
The IPC server accepts the following RPCs:
|
|
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Note that the getMsg and stats RPCs are new. |
When clients first create their SocketConnection object, they also create a local socket in /tmp/fxaipc based on their process ID, removing any one which might've existed. They then call the register RPC, passing their IPC target (named or anonymous) and the socket address (which, for local sockets, is its pathname). If they're registering in response to a restart, they'll set the reregister flag to true.
In response, rpc.ipcd creates a queue for the process. Any messages destined for the process wait in its queue.
When the SocketConnection is destroyed, it calls the cancel RPC, passing its IPC target. In response, rpc.ipcd destroys the queue and any messages not yet received. Any new messages destined for that process result in an error returned to the sending process.
When a process wishes to send a message to a process, it creates the Message object and passes it along with the IPC_Target object to the sendMsg RPC. In response, rpc.ipcd checks the IPC_Target to see if the process is local or remote.
For a local process, rpc.ipcd queues the message in the target process' queue. For a remote process, rpc.ipcd itself calls the sendMsg RPC of the remote rpc.ipcd, thereby forwarding the message to the correct system. The sending process itself needs only maintain a connection to just one rpc.ipcd this way, rather than deal with possibly slow networks while contacting remote rpc.ipcd's.
When a SocketConnection has a DeliveryMode of SYNC and it's being used to send to a remote process, the Message::send method won't return until the message is placed on the queue of the remote IPC server. If DeliveryMode is ASYNC, then Message::send returns as soon as the message is given to the local IPC server. When a SocketConnection is being used to send to a local process, SYNC and ASYNC have the same meaning: Message::send will return when the message is delivered to the local IPC server. When a process wants to receive a message from a specific IPC target, it uses the getMsg RPC. This call returns returns a Message object whose source address matches the given IPC target, or a flag indicating that there are no messages in the queue from the target. |
The utility program rpcgen reads an RPC protocol description file and produces a number of files as output. These output files are C code that can be compiled and linked into programs. This will help speed development of the socket connection type enhancement.
For more information, see the protocol description file for the IPC server, ipc.x.
The IPC server will implement the following measures to prevent it from using excessive resources and to protect against hung or dead processes.
The IPC server will limit the length of the queue for a target process to 100 messages. Note that the limit is in terms of message number, not message size. A process could have a 100 tiny or 100 elephantine messages pending.
The queue length limit will be a prominently defined constant to facilitate adjustment.
|
The select() call in rpc.ipcd's main loop also checks to see if any of its client processes' sockets have exception conditions pending. The IPC server uses this information to clean up dead IPC queues. When a client process dies, an exception condition will be raised on the IPC server's socket that was connected to that process. The IPC server will call treat this condition as if the client had called the cancel RPC. |
Using an atexit handler, the IPC server will save its state each time it exits, cleanly or not. This saved state will include the current IPC target-to-socket address mappings and queued messages. The next time the IPC server starts, it reads this information to provide as seamless operation as possible. If desired, rpc.ipcd can be started with the -norestore option, which prevents the server from restoring its saved state.
It's entirely possible that the saved state information could be out-of-date: for example, the IPC server was shutdown for a software upgrade, and the administrator didn't use the -norestore option. To guard against this case, the IPC server asks each process to reregister itself by sending a special message.
Valid processes will receive this special message through a special Receiver object that's always registered with the IPC Dispatcher. These processes will call the register RPC. Processes that don't exist any more will cause the IPC server to fail to send the message, canceling the registration.
This special reregistration request message will have the highest priority and so will jump to the front of a process' queue and will be sent before any other queued messages.
To implement the IPC server, I'll create two classes: one to represent a queue, and the other to represent the IPC server.
Class IPC_Queue represents a queue of messages for a process. It contains two queues for messages, plus a flag noting if the reregister message needs to be sent. (Using multiple queues and a flag means we don't have to develop a PriorityQueue foundation class, although one would be nice.)
The class declaration is in the file IPC_Queue.H.
The constructor attempts to open the local socket specified by the socketPath argument in nonblocking mode. If the open fails, the isValid method returns false, and the IPC server will destroy the queue. It saves the file descriptor of the socket in the _fd private member.
To queue a message, the IPC server calls the queueMsg method. This method returns true if the queue was succesful, false otherwise. The only reason it would return false is if the queue length is maxed out (100 messages).
To find out if the queue has messages that need to be sent, the IPC server will call the pendingDescriptor method. This method will return -1 if there are no messages pending. If there are, it returns the file descriptor of the target's receive socket. The IPC server passes this file descriptor to select(). If a return from select() indicates that a process is ready to receive a message, the IPC server will call the sendMsg() method, which sends the message. This method returns true if succesful, false otherwise (and leaves errno set). |
When the IPC server wishes to save its state information, it opens a file for writing and iterates over each IPC_Queue, passing the open file to the saveState method. This method appends the saved messages to the file. The restoreState method does the inverse.
A singleton object, the IPC_Server maintains registrations of processes by using a dictionary to map from IPC_Targets to IPC_Queues. The RPCs call methods in this singleton. The return values from these methods determine the return values of the RPCs.
The class declaration is in the file IPC_Server.H.
To register a process, the RPC server function calls the register method in this class. It returns OK if all is well. It returns IN_USE if the IPC_Target is already in the _queues dictionary. It returns WRONG_HOST if the IPC_Target is a named target and the process requesting the registration is not running on the correct host. It returns UNKNOWN_TARGET if the target isn't specified in the socket target file. See the next section for more information.
When rpc.ipcd enters its main loop, it uses the method pendingDescriptors to get an fd_set of file descriptors on which the IPC_Server wants to write. After the select returns, it passes a modified set of descriptors back to IPC_Server, which calls the the sendMsg method of the IPC_Queues that are marked ready in the fd_set. The main loop also calls handleExceptions with an fd_set; the IPC_Server singleton maps these to IPC_Queue objects to be destroyed. |
The fxaTargets file maps named IPC targets to specific DMQ addresses. We'll need a similar way to map named targets to socket-based addresses.
We'll use the FXA config file to hold the socket target information. One of the config file items will be SocketConnection.targets, and its value will be a list of pairs. Each pair lists a named target and a hostname. |
Here's an example: SocketConnection.targets: \ "RADAR_SERVER", "bluebird", \ "TEXT_DB1", "fsldata1", \ "TEXT_DB2", "fsldata1", \ "TEXT_NOTIFICATION", "137.75.43.2", \ "NOTIFICATION_SERVER", "yarmouth", \ "RADAR_MSG_HANDLER", "snipe.fsl.noaa.gov", \ "TABLE_SERVER", "localhost" To be honest, I think this is utterly disgusting. I cannot begin to fathom what the benefit here is versus your typical, white-space delimited, Unix-style configuration file that almost everyone is familiar with, and (better yet), which is easily handled and manipulated with tools like grep, awk, the C preprocessor, etc. |
The host can be specified using a hostname, fully qualified hostname, or IP address in dot notation. Note also that the hostname "localhost" can be used to specify that the process using the named target runs on the local system.
The socket-based connection type uses the socket targets in two ways:
Naturally, a number of changes have to be made to the IPC library to add the socket connection type. Thankfully, most of the changes take the form of new files; but some changes do have to be made to the current files as well, especially to support my proposed cleanup.
This section details the changes required.
Class Connection represents an abstract connection in the IPC library and also is the entry point for clients of the library to wait for messages.
|
Also, we can delete lines 120-121 because a process' IPC_Target will no longer be dynamically allocated (see Class Cleanup).
Then, we'll modify line 143 to support returning the appropriate connection type:
Connection* newConnection; newConnection = new SocketConnection(deliveryMethod, deliveryMode, msgLength);
The msgLength parameter for Socket connections can really be anything, since the socket connection type can support messages of any size (limited by virtual memory). As a special case, a msgLength of zero will mean infinite. I'll modify Connection::matches, which attempts to match a connection given connection parameters, to act as follows:
bool Connection::matches(DeliveryMethod method, DeliveryMode mode, long msgLength) { if (_deliveryMethod == method && _deliveryMode == mode && (_msgLength == 0 || _msgLength >= msgLength) return true; else return false; }
Processes that wish to do event dispatching need to register these file descriptors with the EventDispatcher. The socket IPC library will include the following class: #include "EventDispatcher.H" class ConnectionClient : public DescriptorEventClient { public: SocketConnectionClient(Connection* connection) : _connection(connection) {} private: virtual int descriptorEventMask() { return DescriptorEventClient::Readable; } virtual int descriptor() { return _connection->fd(); } virtual void handleEvent() { _connection->waitForAnyMessage(); } Connection* _connection; }; The main function of processes using the EventDispatcher need to create and register an object of this class by iterating through the Connection class' active list. (It'd be nicer if the Connection class could produce such objects, but then we'd force people using the IPC library to link with the EventDispatcher.) The following fragment will become typical: for (int i = 0; i < Connection::activeList().length(); ++i) getEventDispatcher() ->registerDescriptorClient(new ConnectionClient(Connection::activeList()[i])); You know, it might be better to make this fragment an inline free function in ConnectionClient.H, so that people linking with libipc don't accidentally use it. You'd get to it only by #including ConnectionClient.H. |
To receive an IPC message, a client of the IPC library calls Connection::waitForMessage. Waiting for a message does several things: it waits for a message, depending on a timeout parameter, receives it, process its header, and dispatches it to the correct Receiver object.
To wait for a message from a specific IPC target, the SocketConnection class uses the getMsg RPC. We implement some (admittedly primitive) form of polling (unless the timeout is set to NO_WAIT). Here's the algorithm: if (timeout == NO_WAIT) if (getMsg(target, msg)) processMsg(msg); return IPC_SUCCESS; else return IPC_TRY_AGAIN_LATER; else while (true) if (getMsg(target, msg)) processMsg(msg); return IPC_SUCCESS; if (timeout != WAIT_FOREVER) --timeout; if (timeout <= 0) return IPC_TRY_AGAIN_LATER; sleep(10th of second);
|
Because IPC_Targets are null targets on creation, I'll change the pointer static member _myTarget to an actual object of class IPC_Target, which will be a null target. This'll help avoid overusing the memory allocator.
I'll correct the spelling of the ConnectionMode PERSISTANT to PERSISTENT.
Sharing all the attributes of class Connection, the class SocketConnection represents a socket connection.
The class declaration is in the file SocketConnection.H.
The constructor creates a local socket in nonblocking mode. It then takes the process' IPC_Target and passes it, along with the socket path, to the register RPC. The constructor also creates an RPC client handle.
The destructor calls the cancel RPC and then deletes the local socket. It also destroys the RPC client handle.
The send method calls the sendMsg RPC to send the message to the IPC server.
The waitForAnyMessage method uses select to run a time out on the socket, waiting for any data on its socket. When data arrives, it uses read to read the data into the given buffer, and sets the status parameter to IPC_SUCCESS. If it times out, it sets the status to IPC_TRY_AGAIN_LATER. If an error occurs, it sets it to IPC_HOPELESS.
The class IPC_Target represents the ``address'' at which a process can be found. Currently, the IPC_Target contains a DMQ address and nothing else. In order to use the class with the socket connection type, an IPC target needs to contain the IP address of the host on which a process is running and its process ID.
With these two additional pieces of information, the socket connection type can send a message to any process in the world---so long as an IPC server is running.
A reworked class declaration is in the file IPC_Target.H.
There are now several constructors, each taking none or some of the possible parameters for a targeting scheme. The most interesting constructor is the one which takes a TextString. Normally, this string is a named IPC target which is looked up in both the fxaTargets and the socketTargets files. But it can also be an IPC target produced from the textString (or related) methods. This enables processes to pass IPC targets as strings in a uniform way. For example:
argv[0] = "someProcess"; argv[1] = myIPCtarget.textString().stringPtr(); argv[2] = 0; execvp("someProcess", argv); int main(int argc, char** argv) { IPC_Target parentTarget(argv[1]); ...
In addition to adding support addresses for the socket connection type, I'll cleanup current usage of the IPC target class that assumes DMQ is the sole transport.
For example, the IGC process and the extension framework interpret a command line argument as a numeric DMQ address to construct the IPC targets of their parent processes. This won't support the IP address and process ID requirements for the socket connection type. So, I'll make the IPC target class itself responsible for rendering itself as a string and back again. (Actually, the IPC target class supports these methods, it's just that they're not used by anyone---and the decode method is private and therefore useless.)