JMS

Messaging systems

There are two type of messaging system

Point-to-point messaging

Publish-subscribe

JMS

JMS messages

Locating the queue

Connections

Send/receive

Message sender


/*
 * Copyright (c) 2004 Sun Microsystems, Inc.  All rights reserved.  U.S. 
 * Government Rights - Commercial software.  Government users are subject 
 * to the Sun Microsystems, Inc. standard license agreement and 
 * applicable provisions of the FAR and its supplements.  Use is subject 
 * to license terms.  
 * 
 * This distribution may include materials developed by third parties. 
 * Sun, Sun Microsystems, the Sun logo, Java and J2EE are trademarks 
 * or registered trademarks of Sun Microsystems, Inc. in the U.S. and 
 * other countries.  
 * 
 * Copyright (c) 2004 Sun Microsystems, Inc. Tous droits reserves.
 * 
 * Droits du gouvernement americain, utilisateurs gouvernementaux - logiciel
 * commercial. Les utilisateurs gouvernementaux sont soumis au contrat de 
 * licence standard de Sun Microsystems, Inc., ainsi qu'aux dispositions 
 * en vigueur de la FAR (Federal Acquisition Regulations) et des 
 * supplements a celles-ci.  Distribue par des licences qui en 
 * restreignent l'utilisation.
 * 
 * Cette distribution peut comprendre des composants developpes par des 
 * tierces parties. Sun, Sun Microsystems, le logo Sun, Java et J2EE 
 * sont des marques de fabrique ou des marques deposees de Sun 
 * Microsystems, Inc. aux Etats-Unis et dans d'autres pays.
 */
/**
 * The SimpleProducer class consists only of a main method, 
 * which sends several messages to a queue or topic.
 * 
 * Run this program in conjunction with SimpleSynchConsumer or
 * SimpleAsynchConsumer. Specify a queue or topic name on the 
 * command line when you run the program.  By default, the 
 * program sends one message.  Specify a number after the 
 * destination name to send that number of messages.
 */
import javax.jms.*;
import javax.naming.*;

public class SimpleProducer {

    /**
     * Main method.
     *
     * @param args     the destination used by the example,
     *                 its type, and, optionally, the number of 
     *                 messages to send
     */
    public static void main(String[] args) {
        final int          NUM_MSGS;
        
        if ( (args.length < 2) || (args.length > 3) ) {
            System.out.println("Program takes two or three arguments: " +
                "<dest_name> <queue|topic> " +
                "[<number-of-messages>");
            System.exit(1);
        }
        String destName = new String(args[0]);
        String destType = new String(args[1]);
        System.out.println("Destination name is " + destName +
            ", type is " + destType);
        if (args.length == 3){
            NUM_MSGS = (new Integer(args[2])).intValue();
        } else {
            NUM_MSGS = 1;
        }
        
        /* 
         * Create a JNDI API InitialContext object if none exists
         * yet.
         */
        Context jndiContext = null;
        try {
            jndiContext = new InitialContext();
        } catch (NamingException e) {
            System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
            System.exit(1);
        }
        
        /* 
         * Look up connection factory and destination.  If either
         * does not exist, exit.  If you look up a 
         * TopicConnectionFactory instead of a 
         * QueueConnectionFactory, program behavior is the same.
         */
        ConnectionFactory connectionFactory = null;
        Destination dest = null;
        try {
            connectionFactory = (ConnectionFactory)
                jndiContext.lookup("jms/QueueConnectionFactory");
            if (destType.equals("queue")) {
                dest = (Queue) jndiContext.lookup(destName);
            } else if (destType.equals("topic")) {
                dest = (Topic) jndiContext.lookup(destName);
            } else {
                throw new Exception("Invalid destination type" +
                    "; must be queue or topic");
            }
        } catch (Exception e) {
            System.out.println("JNDI API lookup failed: " + 
                e.toString());
            e.printStackTrace();
            System.exit(1);
        }

        /*
         * Create connection.
         * Create session from connection; false means session is
         * not transacted.
         * Create producer and text message.
         * Send messages, varying text slightly.
         * Send end-of-messages message.
         * Finally, close connection.
         */
        Connection connection = null;
        MessageProducer producer = null;
        try {
            connection = connectionFactory.createConnection();
            Session session = connection.createSession(false, 
                Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(dest);
            TextMessage message = session.createTextMessage();
            for (int i = 0; i < NUM_MSGS; i++) {
                message.setText("This is message " + (i + 1));
                System.out.println("Sending message: " + 
                    message.getText());
                producer.send(message);
            }

            /* 
             * Send a non-text control message indicating end of
             * messages.
             */
            producer.send(session.createMessage());
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + 
                e.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {}
            }
        }
    }
}

Message receiver


/*
 * Copyright (c) 2004 Sun Microsystems, Inc.  All rights reserved.  U.S. 
 * Government Rights - Commercial software.  Government users are subject 
 * to the Sun Microsystems, Inc. standard license agreement and 
 * applicable provisions of the FAR and its supplements.  Use is subject 
 * to license terms.  
 * 
 * This distribution may include materials developed by third parties. 
 * Sun, Sun Microsystems, the Sun logo, Java and J2EE are trademarks 
 * or registered trademarks of Sun Microsystems, Inc. in the U.S. and 
 * other countries.  
 * 
 * Copyright (c) 2004 Sun Microsystems, Inc. Tous droits reserves.
 * 
 * Droits du gouvernement americain, utilisateurs gouvernementaux - logiciel
 * commercial. Les utilisateurs gouvernementaux sont soumis au contrat de 
 * licence standard de Sun Microsystems, Inc., ainsi qu'aux dispositions 
 * en vigueur de la FAR (Federal Acquisition Regulations) et des 
 * supplements a celles-ci.  Distribue par des licences qui en 
 * restreignent l'utilisation.
 * 
 * Cette distribution peut comprendre des composants developpes par des 
 * tierces parties. Sun, Sun Microsystems, le logo Sun, Java et J2EE 
 * sont des marques de fabrique ou des marques deposees de Sun 
 * Microsystems, Inc. aux Etats-Unis et dans d'autres pays.
 */
/**
 * The SimpleSynchConsumer class consists only of a main method, 
 * which fetches one or more messages from a queue or topic using
 * synchronous message delivery.  Run this program in conjunction
 * with SimpleProducer.  Specify a queue or topic name on the 
 * command line when you run the program.
 */
import javax.jms.*;
import javax.naming.*;

public class SimpleSynchConsumer {

    /**
     * Main method.
     *
     * @param args     the destination name and type used by the 
     *                 example
     */
    public static void main(String[] args) {
        String             destName = null;
        String             destType = null;
        Context            jndiContext = null;
        ConnectionFactory  connectionFactory = null;
        Connection         connection = null;
        Session            session = null;
        Destination        dest = null;
        MessageConsumer    consumer = null;
        TextMessage        message = null;
                
        if (args.length != 2) {
            System.out.println("Program takes two arguments: " +
                "<dest_name> <queue|topic>");
            System.exit(1);
        }
        destName = new String(args[0]);
        destType = new String(args[1]);
        System.out.println("Destination name is " + destName +
            ", type is " + destType);
        
        /* 
         * Create a JNDI API InitialContext object if none exists
         * yet.
         */
        try {
            jndiContext = new InitialContext();
        } catch (NamingException e) {
            System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
            System.exit(1);
        }
        
        /* 
         * Look up connection factory and destination.  If either
         * does not exist, exit.  If you look up a 
         * TopicConnectionFactory instead of a 
         * QueueConnectionFactory, program behavior is the same.
         */
        try {
            connectionFactory = (ConnectionFactory)
                jndiContext.lookup("jms/QueueConnectionFactory");
            if (destType.equals("queue")) {
                dest = (Queue) jndiContext.lookup(destName);
            } else if (destType.equals("topic")) {
                dest = (Topic) jndiContext.lookup(destName);
            } else {
                throw new Exception("Invalid destination type" +
                    "; must be queue or topic");
            }
        } catch (Exception e) {
            System.out.println("JNDI API lookup failed: " +
                e.toString());
            System.exit(1);
        }

        /*
         * Create connection.
         * Create session from connection; false means session is
         * not transacted.
         * Create consumer, then start message delivery.
         * Receive all text messages from destination until
         * a non-text message is received indicating end of
         * message stream.
         * Close connection.
         */
        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, 
                Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(dest);
            connection.start();
            while (true) {
                Message m = consumer.receive(1);
                if (m != null) {
                    if (m instanceof TextMessage) {
                        message = (TextMessage) m;
                        System.out.println("Reading message: " +
                            message.getText());
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + 
                e.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {}
            }
        }
    }
}

Variations

Utility class

The programmatic interface to the lookup service depends on the type of lookup service. Sun supply a "portability" class to handle JDNI and IMQ lokup


/*
 * @(#)SampleUtilities.java	1.4 02/05/02
 * 
 * Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.
 * 
 * Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
 * modify and redistribute this software in source and binary code form,
 * provided that i) this copyright notice and license appear on all copies of
 * the software; and ii) Licensee does not utilize the software in a manner
 * which is disparaging to Sun.
 *
 * This software is provided "AS IS," without a warranty of any kind. ALL
 * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
 * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
 * NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
 * LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
 * OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
 * LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
 * INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
 * CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
 * OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGES.
 *
 * This software is not designed or intended for use in on-line control of
 * aircraft, air traffic, aircraft navigation or aircraft communications; or in
 * the design, construction, operation or maintenance of any nuclear
 * facility. Licensee represents and warrants that it will not use or
 * redistribute the Software for such purposes.
 */
import javax.naming.*;
import javax.jms.*;

/**
 * Utility class for JMS sample programs.
 * <p>
 * Set the <code>USE_JNDI</code> variable to true or false depending on whether
 * your provider uses JNDI. 
 * <p>
 * Contains the following methods: 
 * <ul> 
 *   <li> getQueueConnectionFactory
 *   <li> getTopicConnectionFactory
 *   <li> getQueue
 *   <li> getTopic
 *   <li> jndiLookup
 *   <li> exit
 *   <li> receiveSynchronizeMessages
 *   <li> sendSynchronizeMessages
 * </ul>
 *
 * Also contains the class DoneLatch, which contains the following methods:
 * <ul> 
 *   <li> waitTillDone
 *   <li> allDone
 * </ul>
 *
 * @author Kim Haase
 * @author Joseph Fialli
 * @version 1.7, 08/18/00
 */
public class SampleUtilities {
    /*
     Define the System Property "USE_JNDI" true to use JNDI lookup()
     e.g. $JAVA -DUSE_JNDI=true ....
    */
    public static final boolean USE_JNDI = Boolean.getBoolean("USE_JNDI");
    /*
     The prefix to use for JNDI lookup names
    */
    public static final String jndiNamePrefix = "cn=";

    public static final String  QUEUECONFAC = "QueueConnectionFactory";
    public static final String  TOPICCONFAC = "TopicConnectionFactory";
    private static Context      jndiContext = null;
    
    /**
     * Returns a QueueConnectionFactory object.
     * If provider uses JNDI, serves as a wrapper around jndiLookup method. 
     * If provider does not use JNDI, substitute provider-specific code here.
     *
     * @return		a QueueConnectionFactory object
     * @throws		javax.naming.NamingException (or other exception)
     *                   if name cannot be found
     */
    public static javax.jms.QueueConnectionFactory getQueueConnectionFactory() 
      throws Exception {
        if (USE_JNDI) {
            return (javax.jms.QueueConnectionFactory) jndiLookup(QUEUECONFAC);
        } else {
            // return new provider-specific QueueConnectionFactory
            return new com.sun.messaging.QueueConnectionFactory();
        }
    }
    
    /**
     * Returns a TopicConnectionFactory object.
     * If provider uses JNDI, serves as a wrapper around jndiLookup method.
     * If provider does not use JNDI, substitute provider-specific code here.
     *
     * @return		a TopicConnectionFactory object
     * @throws		javax.naming.NamingException (or other exception)
     *                   if name cannot be found
     */
    public static javax.jms.TopicConnectionFactory getTopicConnectionFactory() 
      throws Exception {
        if (USE_JNDI) {
            return (javax.jms.TopicConnectionFactory) jndiLookup(TOPICCONFAC);
        } else {
            // return new provider-specific TopicConnectionFactory
            return new com.sun.messaging.TopicConnectionFactory();
        }
    }
    
    /**
     * Returns a Queue object.
     * If provider uses JNDI, serves as a wrapper around jndiLookup method.
     * If provider does not use JNDI, substitute provider-specific code here.
     *
     * @param name      String specifying queue name
     * @param session   a QueueSession object
     *
     * @return		a Queue object
     * @throws		javax.naming.NamingException (or other exception)
     *                   if name cannot be found
     */
    public static javax.jms.Queue getQueue(String name, 
                                           javax.jms.QueueSession session) 
      throws Exception {
        if (USE_JNDI) {
            return (javax.jms.Queue) jndiLookup(name);
        } else {
            return session.createQueue(name);
        }
    }
    
    /**
     * Returns a Topic object.
     * If provider uses JNDI, serves as a wrapper around jndiLookup method.
     * If provider does not use JNDI, substitute provider-specific code here.
     *
     * @param name      String specifying topic name
     * @param session   a TopicSession object
     *
     * @return		a Topic object
     * @throws		javax.naming.NamingException (or other exception)
     *                   if name cannot be found
     */
    public static javax.jms.Topic getTopic(String name, 
                                           javax.jms.TopicSession session) 
      throws Exception {
        if (USE_JNDI) {
            return (javax.jms.Topic) jndiLookup(name);
        } else {
            return session.createTopic(name);
        }
    }
    
    /**
     * Creates a JNDI InitialContext object if none exists yet.  Then looks up 
     * the string argument and returns the associated object.
     *
     * @param name	the name of the object to be looked up
     *
     * @return		the object bound to <code>name</code>
     * @throws		javax.naming.NamingException if name cannot be found
     */
    public static Object jndiLookup(String name) throws NamingException {
        Object    obj = null;

        if (jndiContext == null) {
            try {
                jndiContext = new InitialContext();
            } catch (NamingException e) {
                System.out.println("Could not create JNDI context: " + 
                    e.toString());
                throw e;
            }
        }
        try {
           obj = jndiContext.lookup(jndiNamePrefix + name);
        } catch (NamingException e) {
            System.out.println("JNDI lookup failed for:" + name + ": " + e.toString());
            throw e;
        }
        return obj;
    }
   
    /**
     * Calls System.exit().
     * 
     * @param result	The exit result; 0 indicates no errors
     */
    public static void exit(int result) {
        System.exit(result);
    }
   
    /**
     * Wait for 'count' messages on controlQueue before continuing.  Called by
     * a publisher to make sure that subscribers have started before it begins
     * publishing messages.
     * <p>
     * If controlQueue doesn't exist, the method throws an exception.
     *
     * @param prefix	prefix (publisher or subscriber) to be displayed
     * @param controlQueueName	name of control queue 
     * @param count	number of messages to receive
     */
    public static void receiveSynchronizeMessages(String prefix,
                                                  String controlQueueName, 
                                                  int count) 
      throws Exception {
        QueueConnectionFactory  queueConnectionFactory = null;
        QueueConnection         queueConnection = null;
        QueueSession            queueSession = null;
        Queue                   controlQueue = null;
        QueueReceiver           queueReceiver = null;

        try {
            queueConnectionFactory = 
                SampleUtilities.getQueueConnectionFactory();
            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession(false, 
                                                 Session.AUTO_ACKNOWLEDGE);
            controlQueue = getQueue(controlQueueName, queueSession);
            queueConnection.start();
        } catch (Exception e) {
            System.out.println("Connection problem: " + e.toString());
            if (queueConnection != null) {
                try {
                    queueConnection.close();
                } catch (JMSException ee) {}
            }
            throw e;
        } 

        try {
            System.out.println(prefix + "Receiving synchronize messages from "
                               + controlQueueName + "; count = " + count);
            queueReceiver = queueSession.createReceiver(controlQueue);
            while (count > 0) {
                queueReceiver.receive();
                count--;
                System.out.println(prefix 
                                   + "Received synchronize message; expect " 
                                   + count + " more");
            }
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
            throw e;
        } finally {
            if (queueConnection != null) {
                try {
                    queueConnection.close();
                } catch (JMSException e) {}
            }
        }
    }

    /**
     * Send a message to controlQueue.  Called by a subscriber to notify a
     * publisher that it is ready to receive messages.
     * <p>
     * If controlQueue doesn't exist, the method throws an exception.
     *
     * @param prefix	prefix (publisher or subscriber) to be displayed
     * @param controlQueueName	name of control queue
     */
    public static void sendSynchronizeMessage(String prefix,
                                              String controlQueueName) 
      throws Exception {
        QueueConnectionFactory  queueConnectionFactory = null;
        QueueConnection         queueConnection = null;
        QueueSession            queueSession = null;
        Queue                   controlQueue = null;
        QueueSender             queueSender = null;
        TextMessage             message = null;

        try {
            queueConnectionFactory = 
                SampleUtilities.getQueueConnectionFactory();
            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession(false,
                                                 Session.AUTO_ACKNOWLEDGE);
            controlQueue = getQueue(controlQueueName, queueSession);
        } catch (Exception e) {
            System.out.println("Connection problem: " + e.toString());
            if (queueConnection != null) {
                try {
                    queueConnection.close();
                } catch (JMSException ee) {}
            }
            throw e;
        } 

        try {
            queueSender = queueSession.createSender(controlQueue);
            message = queueSession.createTextMessage();
            message.setText("synchronize");
            System.out.println(prefix + "Sending synchronize message to " 
                               + controlQueueName);
            queueSender.send(message);
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
            throw e;
        } finally {
            if (queueConnection != null) {
                try {
                    queueConnection.close();
                } catch (JMSException e) {}
            }
        }
    }

    /**
     * Monitor class for asynchronous examples.  Producer signals end of
     * message stream; listener calls allDone() to notify consumer that the 
     * signal has arrived, while consumer calls waitTillDone() to wait for this 
     * notification.
     *
     * @author Joseph Fialli
     * @version 1.7, 08/18/00
     */
    static public class DoneLatch {
        boolean  done = false;

        /**
         * Waits until done is set to true.
         */
        public void waitTillDone() {
            synchronized (this) {
                while (! done) {
                    try {
                        this.wait();
                    } catch (InterruptedException ie) {}
                }
            }
        }
        
        /**
         * Sets done to true.
         */
        public void allDone() {
            synchronized (this) {
                done = true;
                this.notify();
            }
        }
    }
}

References


Jan Newmarch (http://jan.newmarch.name)
jan@newmarch.name
Last modified: Fri Oct 8 11:50:15 EST 2004
Copyright ©Jan Newmarch
Copyright © Jan Newmarch, Monash University, 2007
Creative Commons License This work is licensed under a Creative Commons License
The moral right of Jan Newmarch to be identified as the author of this page has been asserted.