2014年2月14日 星期五

JMS (Java Message Service) Example , 使用JBoss as 7

JBoss設定的部份:

1.修改standalone.xml

1.1 在 <extensions> 中增加extension module
<extension module="org.jboss.as.messaging"/>

1.2 在 <profile>中增加subsystem
(可以從
standalone-full.xml中copy出來,<jms-destinations>的部份可以自行增加)


        
            
                true
                102400
                2
                
                    
                    
                        
                    
                    
                
                
                    
                    
                        
                        
                    
                    
                
                
                    
                        
                        
                        
                        
                    
                
                
                    
                    
                        jms.queue.DLQ
                        jms.queue.ExpiryQueue
                        0
                        10485760
                        BLOCK
                        10
                    
                
                
                    
                        
                            
                        
                        
                            
                        
                    
                    
                        
                            
                        
                        
                            
                            
                        
                    
                    
                        
                        
                            
                        
                        
                            
                        
                    
                
                
                    
                        
                        
                    
                    
                        
                        
                    
                
            
        


1.3 在<socket-binding-group>中增加
<socket-binding name="messaging" port="5445"/>
<socket-binding name="messaging-throughput" port="5455"/>

1.4 在<subsystem xmlns="urn:jboss:domain:ejb3:1.2">中增加
<mdb>
<resource-adapter-ref resource-adapter-name="hornetq-ra"/>
<bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
</mdb>

2.建立MessageProducer,範例使用EJB,
注意QUEUE_LOOKUP的值必須是在<jms-destinations>中有設定過的.


package demo.mkn.jsm;

import java.io.Serializable;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Stateless;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.jboss.logging.Logger;

import demo.mkn.vo.Job;

@Stateless
public class JmsHelper {

 private Logger logger = Logger.getLogger(this.getClass());

 public JmsHelper() {
 }

 private QueueSession session;
 private Context context;
 private QueueConnection connection;

 @PostConstruct
 private void init() {
  try {
   String CONNECTION_FACTORY = "ConnectionFactory";
   context = new InitialContext();
   QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup(CONNECTION_FACTORY);
   connection = factory.createQueueConnection();
   session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
  } catch (NamingException ex) {
   logger.info("NamingException : " + ex.getMessage());
  } catch (JMSException ex) {
   logger.info("JMSException : " + ex.getMessage());
  }
 }

 @PreDestroy
 private void end() {
  try {
   connection.close();
   session.close();
  } catch (JMSException ex) {
   logger.info("JMSException : " + ex.getMessage());
  }
 }

 public void pushToTestQueue(Job job) {
  logger.info("pushToTestQueue... " + job);
  push("queue/test", job);

 }

 public void push(String QUEUE_LOOKUP, Serializable req) {
  try {
   Destination queue = (Destination) context.lookup(QUEUE_LOOKUP);

   MessageProducer producer = session.createProducer(queue);
   ObjectMessage message = session.createObjectMessage(req);
   producer.send(message);

  } catch (JMSException ex) {
   logger.info("JMSException : " + ex.getMessage());
  } catch (NamingException e) {
   logger.info("NamingException : " + e.getMessage());
   e.printStackTrace();
  }
 }

}



3.建立MessageListener,
其中@ActivationConfigProperty的設定裡
"destination"指定這個
Listener聽的是那個queue
"
maxSession"指定這個Listener最多可以有幾個,設為1即是singleton
package demo.mkn.jsm;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.jboss.logging.Logger;

import demo.mkn.vo.Job;

@MessageDriven(activationConfig = {
  @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1"),
  @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/test") })
public class Listener implements MessageListener {

 private Logger logger = Logger.getLogger(this.getClass());

 public void onMessage(Message msg) {

  ObjectMessage omsg = (ObjectMessage) msg;
  try {
   Job job = (Job) omsg.getObject();
   logger.info("receive " + job);
   for (int i = 1; i <= 3; i++) {
    logger.info("do " + job + " ... " + i);
    Thread.sleep(1000);
   }
   logger.info("do " + job + " ... done");
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

}


4. 建立要送到queue的Object,這個Object要implements Serializable 
package demo.mkn.vo;

import java.io.Serializable;

public class Job implements Serializable {

 public Job() {
 }

 public Job(String name) {
  this.name = name;
 }

 private String name;

 @Override
 public String toString() {
  return "job " + name ;
 }

 public String getName() {
  return name;
 }

 public void setName(String name) {
  this.name = name;
 }

}



5. 用REST API做測試,連續的Request進來時singleton的Listener會照順序進行處理
package demo.mkn.rest;

import java.util.Random;

import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.jboss.logging.Logger;

import demo.mkn.jsm.JmsHelper;
import demo.mkn.vo.Job;

@Stateless
@Path("demo")
public class DemoRest {
 private Logger logger = Logger.getLogger(this.getClass());

 public DemoRest() {
 }

 @EJB
 JmsHelper jmsHelper;

 @GET
 @Path("/go")
 @Produces(MediaType.APPLICATION_JSON)
 public Response demo() {
  Random rand = new Random();
  int n = rand.nextInt(1000) + 1;
  Job job = new Job("no." + n);
  jmsHelper.pushToTestQueue(job);
  return Response.ok().build();
 }
}


6. 附上參考用的web.xml與pom.xml
web.xml

 JMS Demo
 
  javax.ws.rs.core.Application
  /rest/*
 




pom.xml

 4.0.0
 demo.mkn
 JMSDemo
 war
 1.0-SNAPSHOT
 JMSDemo Maven Webapp
 http://maven.apache.org
 
  
   junit
   junit
   3.8.1
   test
  

  
   org.jboss.spec
   jboss-javaee-6.0
   1.0.0.Final
   pom
   provided
  

  
   javax
   javaee-web-api
   6.0
   provided
  

  
   org.jboss.resteasy
   resteasy-jaxrs
   3.0.4.Final
   provided
  

  
   org.jboss.logging
   jboss-logging
   3.1.3.GA
  

 
 
  JMSDemo
 











2 則留言:

  1. 謝謝你係文章!! 我在網上找了兩天也找不到JBoss 7的JMS example. 真謝謝你的分享!! 支持!

    回覆刪除
  2. 謝謝你係文章!! 我在網上找了兩天也找不到JBoss 7的JMS example. 真謝謝你的分享!! 支持!

    回覆刪除

Related Posts Plugin for WordPress, Blogger...