package com.ipiel.jmsrequest;Requestor.java
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.Connection;
import javax.jms.QueueConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.transaction.UserTransaction;
@Startup
@Singleton
@TransactionManagement(TransactionManagementType.BEAN)
public class StartupListener {
private Connection conn = null;
private Requestor requestor = null;
@Resource
private UserTransaction utx;
@PostConstruct
void init() {
Context jndi;
try {
jndi = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) jndi
.lookup("jms/sidoQueueFactory");
conn = factory.createConnection();
requestor = Requestor.newRequestor(conn, "jms/testRequestQueue",
"jms/testReplyQueue", "jms/testInvalidQueue");
send();
synchronized (this) {
try {
wait(2000);
} catch (InterruptedException e) {
}
}
readReply();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private void send() throws Exception {
utx.begin();
requestor.send();
utx.commit();
}
private void readReply() throws Exception {
utx.begin();
requestor.receiveSync();
utx.commit();
}
}
package com.ipiel.jmsrequest;JndiUtil.java
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
public class Requestor {
private Session session;
private Destination replyQueue;
private MessageProducer requestProducer;
private MessageConsumer replyConsumer;
private MessageProducer invalidProducer;
protected Requestor() {
super();
}
public static Requestor newRequestor(Connection connection,
String requestQueueName, String replyQueueName,
String invalidQueueName) throws JMSException, NamingException {
Requestor requestor = new Requestor();
requestor.initialize(connection, requestQueueName, replyQueueName,
invalidQueueName);
return requestor;
}
protected void initialize(Connection connection, String requestQueueName,
String replyQueueName, String invalidQueueName)
throws NamingException, JMSException {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination requestQueue = JndiUtil.getDestination(requestQueueName);
replyQueue = session.createTemporaryQueue();
Destination invalidQueue = JndiUtil.getDestination(invalidQueueName);
requestProducer = session.createProducer(requestQueue);
replyConsumer = session.createConsumer(replyQueue);
invalidProducer = session.createProducer(invalidQueue);
connection.start();
}
public void send() throws Exception {
TextMessage requestMessage = session.createTextMessage();
requestMessage.setText("Hello world.");
requestMessage.setJMSReplyTo(replyQueue);
requestProducer.send(requestMessage);
requestProducer.close();
}
public void receiveSync() throws Exception {
System.out.println("Listening to reply " + replyConsumer);
Message msg = replyConsumer.receive(5000);
System.out.println("reply: " + msg);
if (msg instanceof TextMessage) {
TextMessage replyMessage = (TextMessage) msg;
} else {
System.out.println("Invalid message detected");
}
replyConsumer.close();
}
}
package com.ipiel.jmsrequest;Note that the request will start from the StartupListener, this bean will automatically start after deployment. jmsreply project (contains a single MDB): Replier.java
import javax.jms.Destination;
import javax.jms.Queue;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JndiUtil {
public static Destination getDestination(String requestQueueName) throws NamingException {
Context jndi = new InitialContext();
return (Queue) jndi.lookup(requestQueueName);
}
}
package com.ipiel.jmsreply;Things you should take extra careful:
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@MessageDriven(mappedName = "jms/testRequestQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") })
public class Replier implements MessageListener {
public static Replier newReplier(Connection connection,
String requestQueueName, String invalidQueueName)
throws JMSException, NamingException {
Replier replier = new Replier();
replier.initialize(connection, requestQueueName, invalidQueueName);
return replier;
}
public void initialize(Connection connection, String requestQueueName,
String invalidQueueName) throws NamingException, JMSException {
System.out.println("Init replier.");
}
public void onMessage(Message message) {
try {
if ((message instanceof TextMessage)
&& (message.getJMSReplyTo() != null)) {
TextMessage requestMessage = (TextMessage) message;
System.out.println("Received request");
try {
Context jndi = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) jndi
.lookup("jms/sidoQueueFactory");
Connection conn = factory.createConnection();
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
String contents = requestMessage.getText();
Destination replyDestination = message.getJMSReplyTo();
MessageProducer replyProducer = session
.createProducer(replyDestination);
TextMessage replyMessage = session.createTextMessage();
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
replyMessage.setText(contents);
replyProducer.send(replyMessage);
replyProducer.close();
System.out.println("Sent reply");
} catch (Exception e1) {
}
} else {
System.out.println("Invalid message detected");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}