2013년 11월 11일 월요일

Starting Java Message System - Apache ActiveMQ


최근버전: ActiveMQ 5.9.0(2013.11.07 현재)

Features
  • Supports a variety of Cross Language Clients nd Protocols from Java, C, C++, C#, Ruby, Perl, Python, PHP..
  • full support for the Enterprise Integration Patterns bosth in the JMS client and the Message Broker.
  • Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging.
  • ActiveMQ can be easily embedded into Spring applications and configure using Spring’XML Configuration mecanism.
  • Tested inside popular J2EE servers such as TomEE, Geronimo, JBoss, GlassFish and WebLogic.
  • Supports very fast persistence using JDBC along with a high performance journal.
  • Designed for high performance clustering, client-server, perer based communication.
  • REST API to provide technology agnostic and language neutral web based API to messaging.
  • Ajax to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric.
  • CXF and Axis Support so that ActiveMQ can be easily dropped into either of these web service stacks to provide reliable messaging.
  • Can be used as an in memory JMS provider, ideal for unit test JMS
실행방법
{install_dir}/bin/activemq

모니터링
http://localhost:8161 또는 “jvisualvm”와 같은 JMX을 지원하는 툴, 관리자 계정은 admin/admin

Maven
<dependency>
 <groupId>org.apache.activemq</groupId>
 <artifactId>activemq-all</artifactId>
 <version>5.9.0</version>
</dependency>


Example Code - Topic
Topic은 Message을 구독할 Client가 먼저 MQ에 접속하고  Produce가 Message을 전송합니다.
Client가 접속하기 이전에 Produce가 전송한 Message는 받을수 없습니다.

example.topic.Consumers.java
package example.topic;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Consumers {


public static void main(String[] args) throws JMSException {
new Consumer().start();
new Consumer().start();
new Consumer().start();
}
}


class Consumer extends Thread {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public Consumer() {


}


@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Topic topic = session.createTopic("example.topic");


MessageConsumer consumer = session.createConsumer(topic);


MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println("Date:" + currentDate);


if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


}


}

example.topic.Producer.java
package example.topic;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Producer {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public static void main(String[] args) throws JMSException {


ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();


// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Topic topic = session.createTopic("example.topic");


MessageProducer producer = session.createProducer(topic);


// We will send a small text message saying 'Hello'


TextMessage message = session.createTextMessage();


message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message + "'");


connection.close();
}
}

Example Code - Queue
Queue는 Topic과 달리 Client 접속 이전에 기 전송된 Message에 대하여 받을 수 있으나, 동일한 Queue에 접속한 여러 Client 중 최초 접속 Client에게 배송된다.

example.queue.Consumers.java
package example.queue;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Consumers {


public static void main(String[] args) throws JMSException {
new Consumer("consumer1").start();
new Consumer("consumer2").start();
new Consumer("consumer3").start();
}
}


class Consumer extends Thread {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


private String consumerName;


public Consumer(String consumerName) {
this.consumerName = consumerName;
}


@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue("example.queue");


MessageConsumer consumer = session.createConsumer(queue);


MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println(consumerName + ", Date:"
+ currentDate);


if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println(consumerName
+ ", Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


}


}

example.queue.Producer.java
package example.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Producer {


private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;


public static void main(String[] args) throws JMSException {


ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();


Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue("example.queue");


MessageProducer producer = session.createProducer(queue);


TextMessage message = session.createTextMessage();


message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD, JMS type QUEUE");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message + "'");


connection.close();
}
}

Example Code - Embedded ActiveMQ
“activemq-all-xxx.jar” 라이브러리 만으로도 Embedded 하여 사용할 수 있다.
package example.broker;


import java.net.URI;


import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;


public class EmbeddedBroker extends Thread {


public static void main(String[] args) {
new EmbeddedBroker().start();
}


private BrokerService broker;


@Override
public void run() {
try {
startBroker();


while (true) {
sleep(1000);
if (broker != null) {
ActiveMQDestination[] destinations = broker
.getDestinations();
if (destinations != null && destinations.length > 0) {
for (ActiveMQDestination dest : destinations) {
System.out.println(dest.getQualifiedName());
}
}
}
}
} catch (Exception e) {


e.printStackTrace();
}
}


public void startBroker() throws Exception {
broker = new BrokerService();
broker.setBrokerName("hurukku");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:61616"));
broker.addConnector(connector);
broker.start();


}
}

댓글 없음:

댓글 쓰기

ETL 솔루션 환경

ETL 솔루션 환경 하둡은 대용량 데이터를 값싸고 빠르게 분석할 수 있는 길을 만들어줬다. 통계분석 엔진인 “R”역시 하둡 못지 않게 관심을 받고 있다. 빅데이터 역시 데이터라는 점을 볼때 분산처리와 분석 그 이전에 데이터 품질 등 데이...