This package contains an implementation of a simple filesystem-based message queue system.

Introduction

The model of the message queueing system is as follows:

Example

Here's an example of how a map-reduce {@link org.apache.hadoop.mapred.Mapper} implementation could use message queues for inter-task communication, specifically to gracefully stop or resume the processing:
public class MQAwareJob implements Mapper {
	static final int JOB_RUN   = 0;
	static final int JOB_PAUSE = 1;
	static final int JOB_STOP  = 2;
	...
	private MsgQueue mq;
	private int runState;
	...

	private static class CtrlListener implements MsgQueueListener {
		private MQAwareJob job;
		
		public CtrlListener(MQAwareJob job) {
			this.job = job;
		}
		
		public void processEvent(MsgTopic topic, MsgTopicEvent evt) throws Exception {
			if (evt.getType() == evt.TYPE_MSG_ADDED) {
				Msg m = topic.getMsg(evt.getMsgId());
				if (m.getSubject().equals("STOP")) {
					job.runState = JOB_STOP;
				} else if (m.getSubject().equals("PAUSE")) {
					job.runState = JOB_PAUSE;
				} else if (m.getSubject().equals("START")) {
					job.runState = JOB_START;
				}
			}
		}
	}			
		
	public void configure(JobConf conf) {
		runState = JOB_RUN;
		CtrlListener ctrlListener = new CtrlListener(this);
		// create or connect to a message queue for this job
		mq = new MsgQueue(conf);
		mq.getMsgTopic(MsgTopic.TOPIC_CONTROL).addMsgQueueListener(ctrlListener);
		// register any other queue or topic listeners, as needed
		...
	}

	public void map( ... ) {
		switch (runState) {
		case JOB_STOP:
			// discard all further input
			return;
		case JOB_PAUSE:
			int cnt = 0;
			while (runState == JOB_PAUSE) {
				try {
					Thread.sleep(1000);
				} catch (Exception e) {};
				reporter.setStatus("paused " + cnt++ + "s");
			}
			break;
		default:
			// just run
		}
		...
	}
}