2013년 11월 21일 목요일

Durable Subscriptions Topic

JMS Topic 에서는 Subscriber가 Server에 접속한 시점부터 발생한 Message을 받아오게 되어 있다. Subscriber가 종료되어 있는 시점에서 발생한 Message는 다시 받을 수 없게 되어 있다. 이러한 방식의 Nondurable Subscription이라한다. 반면  Durable Subscription의 경우는 Subscriber가 종료되어 있는 동안 발생한 Message을 다음 시작시점에 받아오게 되어 있다. 즉 누락되는 Message없이 Topic을 사용할수 있다는 것이다.

Nondurable Subscribers and Subscriptions


A Durable Subscriber and Subscription

Durable Subscriber Code


package example.durabletopic;

import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) throws JMSException {

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("example.topic");

MessageProducer producer = session.createProducer(topic);

TextMessage message = session.createTextMessage();

for(int i=0; i<1000; i++) {
message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD");
producer.send(message);
System.out.println("Sent message '" + message + "'");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

connection.close();
}
}
Message을 보내는 쪽에서는 Nondurable이든 Durable이든 차이가 없다. 차이는 Message을 받는 쪽에서 생긴다.


package example.durabletopic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumers {
public static void main(String[] args) throws JMSException {
new Consumer("C1").start();
}
}

class Consumer extends Thread {

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String clientID;
public Consumer(String clientID) {
this.clientID = clientID;
}

@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.setClientID(clientID);
connection.start();

Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);

Topic topic = session.createTopic("example.topic");

TopicSubscriber durable = session.createDurableSubscriber(topic, "test_durable");

MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println("Date:" + currentDate);

if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
durable.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

}
Client의 Durable 속성은 “clientID”로 구분되어 서버에서 관리되기 때문에 ClientID는 전체에서 유니크 해야 한다.
최초 clientID로 접속한 이후 부터 Durable 속성이 적용된다. 만일 서버에 등록된 ClientID을 삭제하고 싶다면 “session.unsubscribe(name)”을 사용하면되고, 각 JMS 서버에서 관리하는 설정 값을 사용해도 된다.

댓글 없음:

댓글 쓰기

ETL 솔루션 환경

ETL 솔루션 환경 하둡은 대용량 데이터를 값싸고 빠르게 분석할 수 있는 길을 만들어줬다. 통계분석 엔진인 “R”역시 하둡 못지 않게 관심을 받고 있다. 빅데이터 역시 데이터라는 점을 볼때 분산처리와 분석 그 이전에 데이터 품질 등 데이...