There are two type of messaging system
TextMessage
has a text body
BytesMessage
has a byte array body
MapMessage
has a set of string/value pairs, etc
TextMessage
has methods get/setText()
, etc
new InitialContext()
java.naming.factory.initial
which will need to be set to a suitable class for JNDI, System Message Queue, etc
ConnectionFactory
from the lookup service e.g
connectionFactory = (ConnectionFactory)
jndiContext.lookup("jms/QueueConnectionFactory");
dest = (Queue) jndiContext.lookup(queueName);
connection = connectionFactory.createConnection()
on = connectionFactory.createConnection();
Session session = connection.createSession()
producer = session.createProducer(dest)
consumer = session.createConsumer(dest)
producer.send(msg)
msg = consumer.receive()
/*
* Copyright (c) 2004 Sun Microsystems, Inc. All rights reserved. U.S.
* Government Rights - Commercial software. Government users are subject
* to the Sun Microsystems, Inc. standard license agreement and
* applicable provisions of the FAR and its supplements. Use is subject
* to license terms.
*
* This distribution may include materials developed by third parties.
* Sun, Sun Microsystems, the Sun logo, Java and J2EE are trademarks
* or registered trademarks of Sun Microsystems, Inc. in the U.S. and
* other countries.
*
* Copyright (c) 2004 Sun Microsystems, Inc. Tous droits reserves.
*
* Droits du gouvernement americain, utilisateurs gouvernementaux - logiciel
* commercial. Les utilisateurs gouvernementaux sont soumis au contrat de
* licence standard de Sun Microsystems, Inc., ainsi qu'aux dispositions
* en vigueur de la FAR (Federal Acquisition Regulations) et des
* supplements a celles-ci. Distribue par des licences qui en
* restreignent l'utilisation.
*
* Cette distribution peut comprendre des composants developpes par des
* tierces parties. Sun, Sun Microsystems, le logo Sun, Java et J2EE
* sont des marques de fabrique ou des marques deposees de Sun
* Microsystems, Inc. aux Etats-Unis et dans d'autres pays.
*/
/**
* The SimpleProducer class consists only of a main method,
* which sends several messages to a queue or topic.
*
* Run this program in conjunction with SimpleSynchConsumer or
* SimpleAsynchConsumer. Specify a queue or topic name on the
* command line when you run the program. By default, the
* program sends one message. Specify a number after the
* destination name to send that number of messages.
*/
import javax.jms.*;
import javax.naming.*;
public class SimpleProducer {
/**
* Main method.
*
* @param args the destination used by the example,
* its type, and, optionally, the number of
* messages to send
*/
public static void main(String[] args) {
final int NUM_MSGS;
if ( (args.length < 2) || (args.length > 3) ) {
System.out.println("Program takes two or three arguments: " +
"<dest_name> <queue|topic> " +
"[<number-of-messages>");
System.exit(1);
}
String destName = new String(args[0]);
String destType = new String(args[1]);
System.out.println("Destination name is " + destName +
", type is " + destType);
if (args.length == 3){
NUM_MSGS = (new Integer(args[2])).intValue();
} else {
NUM_MSGS = 1;
}
/*
* Create a JNDI API InitialContext object if none exists
* yet.
*/
Context jndiContext = null;
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI API " +
"context: " + e.toString());
System.exit(1);
}
/*
* Look up connection factory and destination. If either
* does not exist, exit. If you look up a
* TopicConnectionFactory instead of a
* QueueConnectionFactory, program behavior is the same.
*/
ConnectionFactory connectionFactory = null;
Destination dest = null;
try {
connectionFactory = (ConnectionFactory)
jndiContext.lookup("jms/QueueConnectionFactory");
if (destType.equals("queue")) {
dest = (Queue) jndiContext.lookup(destName);
} else if (destType.equals("topic")) {
dest = (Topic) jndiContext.lookup(destName);
} else {
throw new Exception("Invalid destination type" +
"; must be queue or topic");
}
} catch (Exception e) {
System.out.println("JNDI API lookup failed: " +
e.toString());
e.printStackTrace();
System.exit(1);
}
/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create producer and text message.
* Send messages, varying text slightly.
* Send end-of-messages message.
* Finally, close connection.
*/
Connection connection = null;
MessageProducer producer = null;
try {
connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(dest);
TextMessage message = session.createTextMessage();
for (int i = 0; i < NUM_MSGS; i++) {
message.setText("This is message " + (i + 1));
System.out.println("Sending message: " +
message.getText());
producer.send(message);
}
/*
* Send a non-text control message indicating end of
* messages.
*/
producer.send(session.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {}
}
}
}
}
Message receiver
/*
* Copyright (c) 2004 Sun Microsystems, Inc. All rights reserved. U.S.
* Government Rights - Commercial software. Government users are subject
* to the Sun Microsystems, Inc. standard license agreement and
* applicable provisions of the FAR and its supplements. Use is subject
* to license terms.
*
* This distribution may include materials developed by third parties.
* Sun, Sun Microsystems, the Sun logo, Java and J2EE are trademarks
* or registered trademarks of Sun Microsystems, Inc. in the U.S. and
* other countries.
*
* Copyright (c) 2004 Sun Microsystems, Inc. Tous droits reserves.
*
* Droits du gouvernement americain, utilisateurs gouvernementaux - logiciel
* commercial. Les utilisateurs gouvernementaux sont soumis au contrat de
* licence standard de Sun Microsystems, Inc., ainsi qu'aux dispositions
* en vigueur de la FAR (Federal Acquisition Regulations) et des
* supplements a celles-ci. Distribue par des licences qui en
* restreignent l'utilisation.
*
* Cette distribution peut comprendre des composants developpes par des
* tierces parties. Sun, Sun Microsystems, le logo Sun, Java et J2EE
* sont des marques de fabrique ou des marques deposees de Sun
* Microsystems, Inc. aux Etats-Unis et dans d'autres pays.
*/
/**
* The SimpleSynchConsumer class consists only of a main method,
* which fetches one or more messages from a queue or topic using
* synchronous message delivery. Run this program in conjunction
* with SimpleProducer. Specify a queue or topic name on the
* command line when you run the program.
*/
import javax.jms.*;
import javax.naming.*;
public class SimpleSynchConsumer {
/**
* Main method.
*
* @param args the destination name and type used by the
* example
*/
public static void main(String[] args) {
String destName = null;
String destType = null;
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination dest = null;
MessageConsumer consumer = null;
TextMessage message = null;
if (args.length != 2) {
System.out.println("Program takes two arguments: " +
"<dest_name> <queue|topic>");
System.exit(1);
}
destName = new String(args[0]);
destType = new String(args[1]);
System.out.println("Destination name is " + destName +
", type is " + destType);
/*
* Create a JNDI API InitialContext object if none exists
* yet.
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI API " +
"context: " + e.toString());
System.exit(1);
}
/*
* Look up connection factory and destination. If either
* does not exist, exit. If you look up a
* TopicConnectionFactory instead of a
* QueueConnectionFactory, program behavior is the same.
*/
try {
connectionFactory = (ConnectionFactory)
jndiContext.lookup("jms/QueueConnectionFactory");
if (destType.equals("queue")) {
dest = (Queue) jndiContext.lookup(destName);
} else if (destType.equals("topic")) {
dest = (Topic) jndiContext.lookup(destName);
} else {
throw new Exception("Invalid destination type" +
"; must be queue or topic");
}
} catch (Exception e) {
System.out.println("JNDI API lookup failed: " +
e.toString());
System.exit(1);
}
/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create consumer, then start message delivery.
* Receive all text messages from destination until
* a non-text message is received indicating end of
* message stream.
* Close connection.
*/
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(dest);
connection.start();
while (true) {
Message m = consumer.receive(1);
if (m != null) {
if (m instanceof TextMessage) {
message = (TextMessage) m;
System.out.println("Reading message: " +
message.getText());
} else {
break;
}
}
}
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {}
}
}
}
}
Variations
The programmatic interface to the lookup service depends on the type of lookup service. Sun supply a "portability" class to handle JDNI and IMQ lokup
/*
* @(#)SampleUtilities.java 1.4 02/05/02
*
* Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.naming.*;
import javax.jms.*;
/**
* Utility class for JMS sample programs.
* <p>
* Set the <code>USE_JNDI</code> variable to true or false depending on whether
* your provider uses JNDI.
* <p>
* Contains the following methods:
* <ul>
* <li> getQueueConnectionFactory
* <li> getTopicConnectionFactory
* <li> getQueue
* <li> getTopic
* <li> jndiLookup
* <li> exit
* <li> receiveSynchronizeMessages
* <li> sendSynchronizeMessages
* </ul>
*
* Also contains the class DoneLatch, which contains the following methods:
* <ul>
* <li> waitTillDone
* <li> allDone
* </ul>
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.7, 08/18/00
*/
public class SampleUtilities {
/*
Define the System Property "USE_JNDI" true to use JNDI lookup()
e.g. $JAVA -DUSE_JNDI=true ....
*/
public static final boolean USE_JNDI = Boolean.getBoolean("USE_JNDI");
/*
The prefix to use for JNDI lookup names
*/
public static final String jndiNamePrefix = "cn=";
public static final String QUEUECONFAC = "QueueConnectionFactory";
public static final String TOPICCONFAC = "TopicConnectionFactory";
private static Context jndiContext = null;
/**
* Returns a QueueConnectionFactory object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @return a QueueConnectionFactory object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory()
throws Exception {
if (USE_JNDI) {
return (javax.jms.QueueConnectionFactory) jndiLookup(QUEUECONFAC);
} else {
// return new provider-specific QueueConnectionFactory
return new com.sun.messaging.QueueConnectionFactory();
}
}
/**
* Returns a TopicConnectionFactory object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @return a TopicConnectionFactory object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.TopicConnectionFactory getTopicConnectionFactory()
throws Exception {
if (USE_JNDI) {
return (javax.jms.TopicConnectionFactory) jndiLookup(TOPICCONFAC);
} else {
// return new provider-specific TopicConnectionFactory
return new com.sun.messaging.TopicConnectionFactory();
}
}
/**
* Returns a Queue object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @param name String specifying queue name
* @param session a QueueSession object
*
* @return a Queue object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.Queue getQueue(String name,
javax.jms.QueueSession session)
throws Exception {
if (USE_JNDI) {
return (javax.jms.Queue) jndiLookup(name);
} else {
return session.createQueue(name);
}
}
/**
* Returns a Topic object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @param name String specifying topic name
* @param session a TopicSession object
*
* @return a Topic object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.Topic getTopic(String name,
javax.jms.TopicSession session)
throws Exception {
if (USE_JNDI) {
return (javax.jms.Topic) jndiLookup(name);
} else {
return session.createTopic(name);
}
}
/**
* Creates a JNDI InitialContext object if none exists yet. Then looks up
* the string argument and returns the associated object.
*
* @param name the name of the object to be looked up
*
* @return the object bound to <code>name</code>
* @throws javax.naming.NamingException if name cannot be found
*/
public static Object jndiLookup(String name) throws NamingException {
Object obj = null;
if (jndiContext == null) {
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI context: " +
e.toString());
throw e;
}
}
try {
obj = jndiContext.lookup(jndiNamePrefix + name);
} catch (NamingException e) {
System.out.println("JNDI lookup failed for:" + name + ": " + e.toString());
throw e;
}
return obj;
}
/**
* Calls System.exit().
*
* @param result The exit result; 0 indicates no errors
*/
public static void exit(int result) {
System.exit(result);
}
/**
* Wait for 'count' messages on controlQueue before continuing. Called by
* a publisher to make sure that subscribers have started before it begins
* publishing messages.
* <p>
* If controlQueue doesn't exist, the method throws an exception.
*
* @param prefix prefix (publisher or subscriber) to be displayed
* @param controlQueueName name of control queue
* @param count number of messages to receive
*/
public static void receiveSynchronizeMessages(String prefix,
String controlQueueName,
int count)
throws Exception {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue controlQueue = null;
QueueReceiver queueReceiver = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
controlQueue = getQueue(controlQueueName, queueSession);
queueConnection.start();
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
throw e;
}
try {
System.out.println(prefix + "Receiving synchronize messages from "
+ controlQueueName + "; count = " + count);
queueReceiver = queueSession.createReceiver(controlQueue);
while (count > 0) {
queueReceiver.receive();
count--;
System.out.println(prefix
+ "Received synchronize message; expect "
+ count + " more");
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
throw e;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
/**
* Send a message to controlQueue. Called by a subscriber to notify a
* publisher that it is ready to receive messages.
* <p>
* If controlQueue doesn't exist, the method throws an exception.
*
* @param prefix prefix (publisher or subscriber) to be displayed
* @param controlQueueName name of control queue
*/
public static void sendSynchronizeMessage(String prefix,
String controlQueueName)
throws Exception {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue controlQueue = null;
QueueSender queueSender = null;
TextMessage message = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
controlQueue = getQueue(controlQueueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
throw e;
}
try {
queueSender = queueSession.createSender(controlQueue);
message = queueSession.createTextMessage();
message.setText("synchronize");
System.out.println(prefix + "Sending synchronize message to "
+ controlQueueName);
queueSender.send(message);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
throw e;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
/**
* Monitor class for asynchronous examples. Producer signals end of
* message stream; listener calls allDone() to notify consumer that the
* signal has arrived, while consumer calls waitTillDone() to wait for this
* notification.
*
* @author Joseph Fialli
* @version 1.7, 08/18/00
*/
static public class DoneLatch {
boolean done = false;
/**
* Waits until done is set to true.
*/
public void waitTillDone() {
synchronized (this) {
while (! done) {
try {
this.wait();
} catch (InterruptedException ie) {}
}
}
}
/**
* Sets done to true.
*/
public void allDone() {
synchronized (this) {
done = true;
this.notify();
}
}
}
}