2013년 11월 11일 월요일

Starting Java Message System - Apache ActiveMQ


최근버전: ActiveMQ 5.9.0(2013.11.07 현재)

Features
  • Supports a variety of Cross Language Clients nd Protocols from Java, C, C++, C#, Ruby, Perl, Python, PHP..
  • full support for the Enterprise Integration Patterns bosth in the JMS client and the Message Broker.
  • Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging.
  • ActiveMQ can be easily embedded into Spring applications and configure using Spring’XML Configuration mecanism.
  • Tested inside popular J2EE servers such as TomEE, Geronimo, JBoss, GlassFish and WebLogic.
  • Supports very fast persistence using JDBC along with a high performance journal.
  • Designed for high performance clustering, client-server, perer based communication.
  • REST API to provide technology agnostic and language neutral web based API to messaging.
  • Ajax to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric.
  • CXF and Axis Support so that ActiveMQ can be easily dropped into either of these web service stacks to provide reliable messaging.
  • Can be used as an in memory JMS provider, ideal for unit test JMS
실행방법
{install_dir}/bin/activemq

모니터링
http://localhost:8161 또는 “jvisualvm”와 같은 JMX을 지원하는 툴, 관리자 계정은 admin/admin

Maven
<dependency>
 <groupId>org.apache.activemq</groupId>
 <artifactId>activemq-all</artifactId>
 <version>5.9.0</version>
</dependency>


Example Code - Topic
Topic은 Message을 구독할 Client가 먼저 MQ에 접속하고  Produce가 Message을 전송합니다.
Client가 접속하기 이전에 Produce가 전송한 Message는 받을수 없습니다.

example.topic.Consumers.java
package example.topic;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Consumers {


public static void main(String[] args) throws JMSException {
new Consumer().start();
new Consumer().start();
new Consumer().start();
}
}


class Consumer extends Thread {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public Consumer() {


}


@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Topic topic = session.createTopic("example.topic");


MessageConsumer consumer = session.createConsumer(topic);


MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println("Date:" + currentDate);


if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


}


}

example.topic.Producer.java
package example.topic;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Producer {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public static void main(String[] args) throws JMSException {


ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();


// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Topic topic = session.createTopic("example.topic");


MessageProducer producer = session.createProducer(topic);


// We will send a small text message saying 'Hello'


TextMessage message = session.createTextMessage();


message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message + "'");


connection.close();
}
}

Example Code - Queue
Queue는 Topic과 달리 Client 접속 이전에 기 전송된 Message에 대하여 받을 수 있으나, 동일한 Queue에 접속한 여러 Client 중 최초 접속 Client에게 배송된다.

example.queue.Consumers.java
package example.queue;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Consumers {


public static void main(String[] args) throws JMSException {
new Consumer("consumer1").start();
new Consumer("consumer2").start();
new Consumer("consumer3").start();
}
}


class Consumer extends Thread {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


private String consumerName;


public Consumer(String consumerName) {
this.consumerName = consumerName;
}


@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue("example.queue");


MessageConsumer consumer = session.createConsumer(queue);


MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println(consumerName + ", Date:"
+ currentDate);


if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println(consumerName
+ ", Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


}


}

example.queue.Producer.java
package example.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Producer {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public static void main(String[] args) throws JMSException {


ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue("example.queue");


MessageProducer producer = session.createProducer(queue);


TextMessage message = session.createTextMessage();


message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD, JMS type QUEUE");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message + "'");


connection.close();
}
}

Example Code - Embedded ActiveMQ
“activemq-all-xxx.jar” 라이브러리 만으로도 Embedded 하여 사용할 수 있다.
package example.broker;


import java.net.URI;


import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;


public class EmbeddedBroker extends Thread {


public static void main(String[] args) {
new EmbeddedBroker().start();
}


private BrokerService broker;


@Override
public void run() {
try {
startBroker();


while (true) {
sleep(1000);
if (broker != null) {
ActiveMQDestination[] destinations = broker
.getDestinations();
if (destinations != null && destinations.length > 0) {
for (ActiveMQDestination dest : destinations) {
System.out.println(dest.getQualifiedName());
}
}
}
}
} catch (Exception e) {


e.printStackTrace();
}
}


public void startBroker() throws Exception {
broker = new BrokerService();
broker.setBrokerName("hurukku");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:61616"));
broker.addConnector(connector);
broker.start();


}
}

댓글 없음:

댓글 쓰기

시스템 부팅시 도커 컨테이너 자동 실행

Docker 컨테이너를 운용중인 시스템이 Reboot 되버리면 컨테이너가 자동으로 올라오지 않아 불편해서 시스템 리붓시 컨테이너를 자동으로 시작되게 init 데몬에 등록하기로 했습니다. 서버는 Ubuntu 17.10 Docker는 17.0...