2013년 11월 21일 목요일

Durable Subscriptions Topic

JMS Topic 에서는 Subscriber가 Server에 접속한 시점부터 발생한 Message을 받아오게 되어 있다. Subscriber가 종료되어 있는 시점에서 발생한 Message는 다시 받을 수 없게 되어 있다. 이러한 방식의 Nondurable Subscription이라한다. 반면  Durable Subscription의 경우는 Subscriber가 종료되어 있는 동안 발생한 Message을 다음 시작시점에 받아오게 되어 있다. 즉 누락되는 Message없이 Topic을 사용할수 있다는 것이다.

Nondurable Subscribers and Subscriptions


A Durable Subscriber and Subscription

Durable Subscriber Code


package example.durabletopic;

import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) throws JMSException {

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("example.topic");

MessageProducer producer = session.createProducer(topic);

TextMessage message = session.createTextMessage();

for(int i=0; i<1000; i++) {
message.setStringProperty("date", new Date().toString());
message.setText("HELLO JMS WORLD");
producer.send(message);
System.out.println("Sent message '" + message + "'");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

connection.close();
}
}
Message을 보내는 쪽에서는 Nondurable이든 Durable이든 차이가 없다. 차이는 Message을 받는 쪽에서 생긴다.


package example.durabletopic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumers {
public static void main(String[] args) throws JMSException {
new Consumer("C1").start();
}
}

class Consumer extends Thread {

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String clientID;
public Consumer(String clientID) {
this.clientID = clientID;
}

@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.setClientID(clientID);
connection.start();

Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);

Topic topic = session.createTopic("example.topic");

TopicSubscriber durable = session.createDurableSubscriber(topic, "test_durable");

MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
String currentDate = message.getStringProperty("date");
System.out.println("Date:" + currentDate);

if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
durable.setMessageListener(listner);
System.out.println("waiting...");
while (true) {
try {
this.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

}
Client의 Durable 속성은 “clientID”로 구분되어 서버에서 관리되기 때문에 ClientID는 전체에서 유니크 해야 한다.
최초 clientID로 접속한 이후 부터 Durable 속성이 적용된다. 만일 서버에 등록된 ClientID을 삭제하고 싶다면 “session.unsubscribe(name)”을 사용하면되고, 각 JMS 서버에서 관리하는 설정 값을 사용해도 된다.

댓글 없음:

댓글 쓰기

블록체인 개요 및 오픈소스 동향

블록체인(block chain) 블록체인은 공공 거래장부이며 가상 화폐로 거래할때 발생할때 발생할 수 있는 해킹을 막는 기술. 분산 데이터베이스의 한 형태로, 지속적으로 성장하는 데이터 기록 리스트로서 분산 노드의 운영자에 의한 임의 조작이 불가...