2013년 11월 20일 수요일

Starting Java Message System - RabbitMQ

RabbitMQ는 지금까지 설명한 ActiveMQ와 JMS와 아주 다른 플렛폼이다.

먼저 RabbitMQ는 erlang이라는 언어로 만들어졌기 때문에 설치를 위해서 erlang을 설치해줘야 한다. 또한 AMQP(Advanced Message Queueing Protocol) 표준을 따르는 오픈소스 메세지 브로커 소프트웨어이다. AMQP 표준을 따르는 미들웨어 제품들은 서로 다른 플랫폼과 서로 다른 언어로 작성된 이종시스템 간의 메시징을 할 수 있다.

Feature Highlights
Reliability
여러가지 방법 -persistence, delivery acknowledgements, publisher confirms, high availability- 으로 성능을 담보로 신뢰성을 확보할 수 있다.

Flexible Routing, Clustering, Federation, Highly Available Queues

Multi-protocol
AMQP 0-9-1지원, 그 외 plugin 형태로 STOMP, MQTT, AMQP1.0, HTTP 지원

Many Clients
Java, Ruby, Python, .NET, PHP, Perl, Erlang, Node.js, C/C++ 등 지원
특히 Java의 경우 Spring Framework을 지원하다.  JMS Client도 지원하는데 이것은 상용이다.

Management UI, Tracing, Plugin System 등...

설치
server와 client로 나누어 있으며 각각 해당 환경에 따라 설치한다.

Management Plugin
RabbitMQ는 Plugin 형태로 관리자 기능을 제공한다. 별도로 설치해야 하는데 설치 방법
“{RabbitMQ_HOME}/sbin/rabbitmq-plugins enable rabbitmq_management”
Plugin이 설치되고 나면 Server을 재시작 한다.
http://localhost:15672로 접근하면 관리자 화면으로 접근 가능하고 기본 계정은 “quest/quest”이다.
HTTP API로 관리자 기능을 제공하기도 하는데 “http://localhost:15672/api”로 접근하면 API 리스트를 확인할 수 있다.

Tutorials
P: Producing을 의미, Producer는 메시지를 전송하는 프로그램.
C: Consuming을 의미, Consumer는 메시지를 받는 프로그램.
빨간색 건전지 같은건 Queue을 의미한다.

Java Client을 개발하기 위해서는 “rabbitmq-java-client”(http://www.rabbitmq.com/java-client.html)을 다운로드 받아야 합니다.

Example 1: Hello world

Code: P
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;


public class Send {
       
 private final static String QUEUE_NAME = "hello";


 public static void main(String[] argv) throws Exception {
              
   ConnectionFactory factory = new ConnectionFactory();
   factory.setHost("localhost");
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();


   channel.queueDeclare(QUEUE_NAME, false, false, false, null);
   String message = "Hello World!";
   channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
   System.out.println(" [x] Sent '" + message + "'");
   
   channel.close();
   connection.close();
 }
}

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
ConnectionFactory에서는 IP, Port, UserName, Password 등을 지정할 수 있다.

AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue,
                                 boolean durable,
                                 boolean exclusive,
                                 boolean autoDelete,
                                 java.util.Map<java.lang.String,java.lang.Object> arguments)
                                 throws java.io.IOException
Declare a queue
Parameters:
queue - the name of the queue
durable - true if we are declaring a durable queue (the queue will survive a server restart)
exclusive - true if we are declaring an exclusive queue (restricted to this connection)
autoDelete - true if we are declaring an autodelete queue (server will delete it when no longer in use)
arguments - other properties (construction arguments) for the queue
Returns:
a declaration-confirm method to indicate the queue was successfully declared
Throws:
java.io.IOException - if an error is encountered


Code: C
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;


public class Recv {
       
   private final static String QUEUE_NAME = "hello";


   public static void main(String[] argv) throws Exception {


   ConnectionFactory factory = new ConnectionFactory();
   factory.setHost("localhost");
   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();


   channel.queueDeclare(QUEUE_NAME, false, false, false, null);
   System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
   
   QueueingConsumer consumer = new QueueingConsumer(channel);
   channel.basicConsume(QUEUE_NAME, true, consumer);
   
   while (true) {
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ← until another message has been delivered from the server
     String message = new String(delivery.getBody());
     System.out.println(" [x] Received '" + message + "'");
   }
 }
}


java.lang.String basicConsume(java.lang.String queue,
                             boolean autoAck,
                             Consumer callback)
                             throws java.io.IOException
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
Parameters:
queue - the name of the queue
autoAck - true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
callback - an interface to the consumer object
Returns:
the consumerTag generated by the server
Throws:
java.io.IOException - if an error is encountered

Example 2: Work queues
Work queue(일명 Task Queues)는 multiple worker 사이에 Message을 처리 하는 방식입니다.
C1, C2는 동일한 형태의 메시지를 받게 되고, 서로 다르게 처리할 수는 있다. 분산 방식은 Round-robin방식이다.


Code: P
package example.workqueues;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;


public class NewTask {
 private static final String TASK_QUEUE_NAME = "task_queue";


 public static void main(String[] argv) throws Exception {


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
   
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
   
String message = getMessage(argv);
   
channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
   
channel.close();
connection.close();
 }
   
 private static String getMessage(String[] strings){
if (strings.length < 1)
  return "Hello World!";
return joinStrings(strings, " ");
 }  
 private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
  words.append(delimiter).append(strings[i]);
}
return words.toString();
 }
}

Code: C1, C2
package example.workqueues;


import java.io.IOException;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;


public class Workers implements Runnable {


private static final String TASK_QUEUE_NAME = "task_queue";
private String workerName;


public static void main(String[] argv) throws Exception {
Thread thread1 = new Thread(new Workers("C1"));
thread1.start();
Thread thread2 = new Thread(new Workers("C2"));
thread2.start();
}


private void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(1000);
}
}
public Workers(String name) {
this.workerName = name;
}


@Override
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*]"+workerName+", Waiting for messages. To exit press CTRL+C");


channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);


while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x]"+workerName+", Received '" + message + "'");
doWork(message);
System.out.println(" [x]"+workerName+", Done");


channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}


}
}

Worker에서 autoAck = ture을 하게 되면 메시지를 받은 즉시 Server에서 해당 메시지가 삭제되게 된다. 이런경우 메시지 처리중에 오류가 발생한다면 그 메시지는 복구될수가 없게 된다. 이를 회피하기 위해서는 autoAck=false로 하고 Worker가 작업이 끝난후에 ack을 전달하면 된다.
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
 
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 
//...      
 
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

위에서 Worker가 비정상 종료 되더라도 메시지가 소실되는 일이 없게 하는 방법을 알아봤다. 그럼 Server가 종료되더라도 서버에 보관되어 있던 메시지가 소실되지 않게 하는 방법으로는 Channel.queueDelcare 호출시 “durable” 속석을 true로 해주는 것이다.
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);


Example 3: Publish/Subscribe
Sending messages to many consumers at once.

X: Exchange
Exchage는 메시지 교환기로 4가지 타입을 가지고 있다.
direct, topic, headers 그리고 fanout.
RabbitMQ의 모든 메시지는 Exchage을 통해 전달 된다. 하지만 전 Example들에서는 default Exchange을 사용하고 있다. 아래의 Exchanes 리스트을 보면 첫 번째 교환기는 이름이 없고 direct 타입이다. 이것이 default Exchange다.

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
       
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

channel.exchangeDeclare("logs", "fanout");
Publish/Subscribe 형, fanout 타입(산개형) 교환기를 지정한다.

channel.basicPublish( "logs", "", null, message.getBytes());
위에서 지정한 fanout형 교환기를 사용하여 메시지를 전달한다.

Temporary queues
String queueName = channel.queueDeclare().getQueue();
클라이언트 별로 임의의 Queue을 만들어 접속하게 한다. 클라이언트가 서버에서 보내는 과거 메시지는 필요 없고 접속 이후의 메시지만 필요한 경우, 접속이 끊긴 경우 큐을 자동으로 삭제해야 하는 경우 Temporary Queues가 유용하다.

Example 4: Routing
Receiving messages selectively
클라이언트(worker)는 리스닝할 routingKey을 queueBinding 시에 등록하여 서버에서 전달되는 메시지 중에 특정 메시지만 골라서 전달 받게 된다. routingKey는 하나 이상 등록 할 수 있다.

Code: P
package example.routing;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class EmitLogDirect {


private static final String EXCHANGE_NAME = "direct_logs";


public static void main(String[] argv) throws Exception {


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


channel.exchangeDeclare(EXCHANGE_NAME, "direct");


String message = "Hello RabbitMQ!";


channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
System.out.println(" [x] Sent 'info':'" + message + "'");


channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
System.out.println(" [x] Sent 'error':'" + message + "'");


channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
System.out.println(" [x] Sent 'warning':'" + message + "'");
channel.close();
connection.close();
}
}

Code: C1, C2
package example.routing;


import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;


public class ReceiveLogsDirect implements Runnable {


private static final String EXCHANGE_NAME = "direct_logs";


public static void main(String[] argv) throws Exception {
String[] c1Keys = {"error"};
Thread c1 = new Thread(new ReceiveLogsDirect("c1", c1Keys));
c1.start();
String[] c2Keys = {"info", "error", "warning"};
Thread c2 = new Thread(new ReceiveLogsDirect("c2", c2Keys));
c2.start();
}
private String threadName;
private String[] routingKeys;
public ReceiveLogsDirect(String name, String[] rountingKeys) {
this.threadName = name;
this.routingKeys = rountingKeys;
}
@Override
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();


for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}


System.out.println(" [*] "+threadName+", Waiting for messages. To exit press CTRL+C");


QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);


while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] "+threadName+", Received '" + routingKey + "':'" + message+ "'");
}
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Console log…
[*] c1, Waiting for messages. To exit press CTRL+C
[*] c2, Waiting for messages. To exit press CTRL+C
[x] c1, Received 'error':'Hello RabbitMQ!'
[x] c2, Received 'info':'Hello RabbitMQ!'
[x] c2, Received 'error':'Hello RabbitMQ!'
[x] c2, Received 'warning':'Hello RabbitMQ!'

Example 5: Topics
Receiving messages based on a pattern.
Exchange는 “topic” 타입으로 생성한다.
Exchange을 통해 전달 되는 메시지를 routingKey의 pattern 기반으로 구분하여 worker가 받게 된다.
  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.
Code: P
package example.topic;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class EmitLogTopic {


private static final String EXCHANGE_NAME = "topic_logs";


public static void main(String[] argv) throws Exception {


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


channel.exchangeDeclare(EXCHANGE_NAME, "topic");


String message = "Hello RabbitMQ Topic!";


channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
System.out.println(" [x] Sent 'quick.orange.rabbit':'" + message + "'");


channel.basicPublish(EXCHANGE_NAME, "lazy.orange.elephant", null, message.getBytes());
System.out.println(" [x] Sent 'lazy.orange.elephant':'" + message + "'");


channel.basicPublish(EXCHANGE_NAME, "quick.orange.fox", null, message.getBytes());
System.out.println(" [x] Sent 'quick.orange.fox':'" + message + "'");
channel.basicPublish(EXCHANGE_NAME, "lazy.brown.fox", null, message.getBytes());
System.out.println(" [x] Sent 'lazy.brown.fox':'" + message + "'");
channel.basicPublish(EXCHANGE_NAME, "lazy.pink.rabbit", null, message.getBytes());
System.out.println(" [x] Sent 'lazy.pink.rabbit':'" + message + "'");
channel.basicPublish(EXCHANGE_NAME, "quick.brown.fox", null, message.getBytes());
System.out.println(" [x] Sent 'quick.brown.fox':'" + message + "'");
channel.basicPublish(EXCHANGE_NAME, "quick.orange.male.rabbit", null, message.getBytes());
System.out.println(" [x] Sent 'quick.orange.male.rabbit':'" + message + "'");
channel.basicPublish(EXCHANGE_NAME, "lazy.orange.male.rabbit", null, message.getBytes());
System.out.println(" [x] Sent 'lazy.orange.male.rabbit':'" + message + "'");
channel.close();
connection.close();
}
}

Code: C1, C2 and C_all
package example.topic;


import java.io.IOException;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;


public class ReceiveLogsTopic implements Runnable {


private static final String EXCHANGE_NAME = "topic_logs";


public static void main(String[] argv) throws Exception {
String[] c1Keys = {"*.orange.*", "*.*.rabbit"};
Thread c1 = new Thread(new ReceiveLogsTopic("c1", c1Keys));
c1.start();
String[] c2Keys = {"*.*.rabbit", "lazy.#"};
Thread c2 = new Thread(new ReceiveLogsTopic("c2", c2Keys));
c2.start();
String[] c3Keys = {"#"};
Thread c3 = new Thread(new ReceiveLogsTopic("c3_all", c3Keys));
c3.start();
}


private String threadName;
private String[] routingKeys;
public ReceiveLogsTopic(String name, String[] rountingKeys) {
this.threadName = name;
this.routingKeys = rountingKeys;
}
@Override
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();


for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}


System.out.println(" [*] "+threadName+", Waiting for messages. To exit press CTRL+C");


QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);


while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] "+threadName+", Received '" + routingKey + "':'" + message+ "'");
}
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Console Log..
[*] c1, Waiting for messages. To exit press CTRL+C
[*] c3_all, Waiting for messages. To exit press CTRL+C
[*] c2, Waiting for messages. To exit press CTRL+C
[x] c2, Received 'quick.orange.rabbit':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'quick.orange.rabbit':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'lazy.orange.elephant':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'quick.orange.fox':'Hello RabbitMQ Topic!'
[x] c2, Received 'lazy.orange.elephant':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'lazy.brown.fox':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'lazy.pink.rabbit':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'quick.brown.fox':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'quick.orange.male.rabbit':'Hello RabbitMQ Topic!'
[x] c2, Received 'lazy.brown.fox':'Hello RabbitMQ Topic!'
[x] c1, Received 'quick.orange.rabbit':'Hello RabbitMQ Topic!'
[x] c3_all, Received 'lazy.orange.male.rabbit':'Hello RabbitMQ Topic!'
[x] c1, Received 'lazy.orange.elephant':'Hello RabbitMQ Topic!'
[x] c2, Received 'lazy.pink.rabbit':'Hello RabbitMQ Topic!'
[x] c1, Received 'quick.orange.fox':'Hello RabbitMQ Topic!'
[x] c1, Received 'lazy.pink.rabbit':'Hello RabbitMQ Topic!'
[x] c2, Received 'lazy.orange.male.rabbit':'Hello RabbitMQ Topic!'

댓글 없음:

댓글 쓰기

ETL 솔루션 환경

ETL 솔루션 환경 하둡은 대용량 데이터를 값싸고 빠르게 분석할 수 있는 길을 만들어줬다. 통계분석 엔진인 “R”역시 하둡 못지 않게 관심을 받고 있다. 빅데이터 역시 데이터라는 점을 볼때 분산처리와 분석 그 이전에 데이터 품질 등 데이...