`

activemq学习(1)

阅读更多

 

一:jms介绍
         jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可。
         另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式。区别在于:
        对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
         对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。
       关于api的简单基础可以看下:http://www.javaeye.com/topic/64707,简单的参考!
二:ActiveMQ介绍
         activeMQ是apache下的一个开源jms产品,具体参见apache官方网站;
         Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License
 三:开始实现代码
       1: 使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ成功启动。
        启动成功之后可以运行:http://localhost:8161/admin/index.jsp  查看一下。
       2:发送端,sender
   
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
   
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
   
public class Sender {
    private static final int SEND_NUMBER = 5;
   
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
   
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("test-queue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,可以更改
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息
            sendMessage(session, producer);
            session.commit();
   
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
   
    }
    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + i);
            producer.send(message);
        }
    }
}
   
         3:接收端,receive
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
   
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
   
public class Receiver {
    public static void main(String[] args) {
   
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
   
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            //test-queue跟sender的保持一致,一个创建一个来接收
            destination = session.createQueue("test-queue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    System.out.println("==================");
                    try {
                        System.out.println("RECEIVE1第一个获得者:"
                                + ((TextMessage) arg0).getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
   
                }
            });
   
            MessageConsumer consumer1 = session.createConsumer(destination);
            consumer1.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    System.out.println("+++++++++++++++++++");
                    try {
                        System.out.println("RECEIVE1第二个获得者:"
                                + ((TextMessage) arg0).getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
   
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        //在eclipse里运行的时候,这里不要关闭,这样就可以一直等待服务器发送了,不然就直接结束了。
        // } finally {
        // try {
        // if (null != connection)
        // connection.close();
        // } catch (Throwable ignore) {
        // }
        // }
   
    }
}
   
         4:发送端,sender
         上面的是用Queue的方式来创建,下面再用topic的方式实现同样的功能。
   
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
   
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
   
public class TopicTest {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
   
        Connection connection = factory.createConnection();
        connection.start();
   
        // 创建一个Topic
        Topic topic = new ActiveMQTopic("testTopic");
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
   
        // 注册消费者1
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get "
                            + ((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
   
        // 注册消费者2
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get "
                            + ((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
   
        });
   
        // 创建一个生产者,然后发送多个消息。
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i < 10; i++) {
            System.out.println("producer begin produce=======");
            producer.send(session.createTextMessage("Message:" + i));
        }
    }
   
}

以上引自: http://www.dev26.com/bbs/topic/92

 

 

 

消息发布者
package com.googlecode.garbagecan.jmsstudy.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageProducer producer = session.createProducer(topic);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

		while(true) {
			TextMessage message = session.createTextMessage();
			message.setText("message_" + System.currentTimeMillis());
			producer.send(message);
			System.out.println("Sent message: " + message.getText());

			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

//		session.close();
//		connection.stop();
//		connection.close();
	}
}



消息订阅者(消息消费者)
package com.googlecode.garbagecan.jmsstudy.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageConsumer consumer = session.createConsumer(topic);
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				TextMessage tm = (TextMessage) message;
				try {
					System.out.println("Received message: " + tm.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
//		session.close();
//		connection.stop();
//		connection.close();
	}
}

 以上引自:http://www.open-open.com/lib/view/open1328079945062.html

 

此步骤主要是利用两个实例对jms及activemq有个大体了解,按照实例写了一个测试DEMO,关于queue和topic的两个。

 

分享到:
评论
1 楼 ssy341 2012-04-20  
下载来试试,学习学习~~~

相关推荐

Global site tag (gtag.js) - Google Analytics