2011년 5월 23일 월요일

Servlet 3.0 Async Processing for Tenfold Increase in Server Throughput

원문: http://www.javacodegeeks.com/2011/03/servlet-30-async-processing-for-tenfold.html

Server 처리량 10배 증가를 위한 Servlet 3.0 Async Processing

Servlet은 자바로 구현된 서버 사이드 로직에서 주요한 컴포넌트입니다. 그리고 새로운 3.0 스펙에세는  비동기 프로세스에 대하여  매우 흥미롭게 소개하고 있습니다.

비동기 프로세스는  Web Application을 매우 확장성 있게 하는데  활용될 수 있고, Web 2.0 사이트와 AJAX Application은 이 기능을 효율적으로 사용할 수 있을 것입니다.

JCG 파트너 중 한명 인 “Tomasz Nurkiewicz”가 최근에 쓴 아주 Nice한 Article  “How to use async Processing in order to increase your server’s throughput”에서 그가 어떻게 했는지 알아보도록 하겠습니다.
(관련 링크: http://nurkiewicz.blogspot.com/2011/03/tenfold-increase-in-server-throughput.html)

자바 Servelt 컨테이너가 특히 대량의 동시사용자 처리에 적합하지 않다는 것은 비밀도 아닙니다.  일반적으로 JVM이 처리할수 있는 동시 실행 Thead 개수 만큼 동시 연결 수를 제한하는 것이 Thead-per-Request 모델입니다.   모든 새 Thread는  상당한 수준의 메모리 footprint와 CUP 사용 증가를 초래하기 때문에 100~200 보다 많은 동시 사용자 처리는 말도 안되는 생각입니다. Servlet 3.0이전 시대에서는 말이지요.

이 문서에서 우리는 설정된 제한 속도내에서 확장성있고  강력한 파일 서버를 작성합니다. 두 번째 버전에서 하으웨어 추가없이 약간의 현명한 디자인적 고려만으로 Servlet 3.0 비동기 프로세스 기능을 활용하여 1/10의 스레드로 더 많은 부하를 처리할수 있게 될 것 입니다.

Token bucket algorithm
파일 서버 만들기, 우리는 의식적으로 자원을 관리하여야 합니다. 특히 네트워크 대역을 말입니다.  우리는 하나의 클라이언트가 전제의 트래픽을 소비하는 것을 원하지 않습니다.  우리는 런타임시, 사용자에 따라, 하루 시간 중 언제라도 동적으로 제한을 조절할 수 있기를 원합니다. 이런 우리의 요구는 “Token Bucket Algorithm”이 해결해 줄 것입니다.(Token Bucket Algoritm에 대하여 Wiki을 참고하십시오.)

Wiki에서 아주 잘 설명되어 있지만 우리는  필요에 의해서 알고리즘을 약간 조정하기 때문에 여기서 간단하게 설명합니다.
먼저 양동이(Bucket)가 있었습니다. 이 양동이에는 동일한 Token이 있었습니다. 각 Token은  원시데이터의 20kiB 에 해당합니다.
클라이언트가 파일을 호출할때마다 서버는 양동이로부터 Token을 하나 받으려고 합니다.  만일 성공한다면, 클라이언트에게 20kiB을 전송합니다.  만일 양동기가 비어 있어 Token을 서버가 획득하지 못한다면? 서버는 대기 합니다.

Token은 어디에서 오는가? 백그라운드 프로세스가 양동이를 수시로 채워줍니다. 만일 백그라운드 프로세스가 100ms마다 100 새로운 Token을 추가한다면 서버는 최대 20MiB/s의 전송 능력을 가지게 됩니다.
(1초에 10회, 1회에 100Token, 1 Token=20kiB, 10x100x20=20000kiB/s=20MiB/s)
모든 클라이언트는 서버를 공유합니다. 물론 양동이가 꽉찬경우(Full 1000 Tokens) 새로운 Token은 무시됩니다.

여기 TokenBucket Interface 코드를 작성하였습니다.
(whole source code is available on GitHub in global-bucket branch):

public interface TokenBucket {

   int TOKEN_PERMIT_SIZE = 1024 * 20;

   void takeBlocking() throws InterruptedException;
   void takeBlocking(int howMany) throws InterruptedException;

   boolean tryTake();
   boolean tryTake(int howMany);

}


takeBlocking()는 Token이활성화 될 때까지 동기 대기 하는 메서드입니다.
tryTake()은 Token이 활성화되어 있는 경우 “true”를 그렇지 않으면 “false”을 리턴합니다.
Bucket은 본질적으로 멀티 스레드이기 때문에  대기관련되어 정교한 메커니즘인 세마포어을 이용합니다.

@Service
@ManagedResource
public class GlobalTokenBucket extends TokenBucketSupport {

   private final Semaphore bucketSize = new Semaphore(0, false);

   private volatile int bucketCapacity = 1000;

   public static final int BUCKET_FILLS_PER_SECOND = 10;

   @Override
   public void takeBlocking(int howMany) throws InterruptedException {
       bucketSize.acquire(howMany);
   }

   @Override
   public boolean tryTake(int howMany) {
       return bucketSize.tryAcquire(howMany);
   }

}


bucketSize는 현재 양동이의 Token량을 나타냅니다.
bucketCapacity는 양동이의 최대크기를 제한합니다.
위 각각은 JMX을 이용해 수정이 가능합니다.

@ManagedAttribute
public int getBucketCapacity() {
   return bucketCapacity;
}

@ManagedAttribute
public void setBucketCapacity(int bucketCapacity) {
   isTrue(bucketCapacity >= 0);
   this.bucketCapacity = bucketCapacity;
}


Spring Framework을 사용하였습니다. Spring이 반드시 필요한것은 아니지만 몇가지 좋은 기능이 있어서 사용하였습니다. 주기적으로 양동이를 가득 채우는 백그라운드 프로세스 구현 예는 다음과 같습니다.

@Scheduled(fixedRate = 1000 / BUCKET_FILLS_PER_SECOND)
public void fillBucket() {
   final int releaseCount =
min(bucketCapacity / BUCKET_FILLS_PER_SECOND,
 bucketCapacity - bucketSize.availablePermits());
   bucketSize.release(releaseCount);
}


이 코드에는 주요한 멀티스레드 버그가 포함되어 있습니다. 그러나 이문서의 목적을 위해서 우리는 그 버그를 무시하도록 하겠습니다.
또한, 아래의 XML은 “applicationContext.xml”의 일부로 @Scheduled Annotation을 위해 설정이 필요한 부분입니다.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

       <context:component-scan base-package="com.blogspot.nurkiewicz.download" />
       <context:mbean-export/>

       <task:annotation-driven scheduler="bucketFillWorker"/>
       <task:scheduler id="bucketFillWorker" pool-size="1"/>

</beans>


우리는 file을 반환하는 실제 서블릿을 개발할수 있습니다. 저는 항상 대략 200kiB정도의 고정된 크기의 동일한 파일을 반환합니다.)

@WebServlet(urlPatterns = "/*", name="downloadServletHandler")
public class DownloadServlet extends HttpRequestHandlerServlet {}

@Service
public class DownloadServletHandler implements HttpRequestHandler {

   private static final Logger log =
       LoggerFactory.getLogger(DownloadServletHandler.class);

   @Resource
   private TokenBucket tokenBucket;

   @Override
   public void handleRequest(HttpServletRequest request,
      HttpServletResponse response) throws ServletException, IOException {

       final File file = new File("/home/dev/tmp/ehcache-1.6.2.jar");
       final BufferedInputStream input =
            new BufferedInputStream(new FileInputStream(file));
       try {
           response.setContentLength((int) file.length());
           sendFile(request, response, input);
       } catch (InterruptedException e) {
           log.error("Download interrupted", e);
       } finally {
           input.close();
       }
   }

   private void sendFile(HttpServletRequest request,
     HttpServletResponse response, BufferedInputStream input)
     throws IOException, InterruptedException {
       byte[] buffer = new byte[TokenBucket.TOKEN_PERMIT_SIZE];
       final ServletOutputStream outputStream = response.getOutputStream();
       for (int count = input.read(buffer); count > 0; count = input.read(buffer)) {
           tokenBucket.takeBlocking();
           outputStream.write(buffer, 0, count);
       }
   }
}


HttpRequestHandlerServlet이 사용되었습니다. 파일중 20kiB을 읽고, 양동이에서 Token을 가져오고, 클라이언트에게 chuck을 보내고, 파일끝 까지 반복하는 것을 쉽게 할수 있게 해주기 때문입니다.
믿거나 말거나, 이것은 실제 작동합니다! 아무 문제 없습니다 얼마나 많은 클라이언트가 이 서버에 동시 접속하여도, 총 대역폭은 20MiB을 초과하지 않습니다.

So what if we had a separate bucket for each client? Instead of one semaphore – a map? Each client has a separate independent bandwidth limit, so there is no risk of starvation. But there is even more:
some clients might be more privileged, having bigger or no limit at all,
some might be black listed, resulting in connection rejection or very low throughput
banning IPs, requiring authentication, cookie/user agent verification, etc.
we might try to correlate concurrent requests coming from the same client and use the same bucket for all of them to avoid cheating by opening several connections. We might also reject subsequent connections
and much more...

Our bucket interface grows allowing the implementation to take advantage of the new possibilities (see branch per-request-synch):

public interface TokenBucket {

   void takeBlocking(ServletRequest req) throws InterruptedException;
   void takeBlocking(ServletRequest req, int howMany) throws InterruptedException;

   boolean tryTake(ServletRequest req);
   boolean tryTake(ServletRequest req, int howMany);

   void completed(ServletRequest req);
}


public class PerRequestTokenBucket extends TokenBucketSupport {

   private final ConcurrentMap<Long, Semaphore> bucketSizeByRequestNo = new ConcurrentHashMap<Long, Semaphore>();

   @Override
   public void takeBlocking(ServletRequest req, int howMany) throws InterruptedException {
       getCount(req).acquire(howMany);
   }

   @Override
   public boolean tryTake(ServletRequest req, int howMany) {
       return getCount(req).tryAcquire(howMany);
   }

   @Override
   public void completed(ServletRequest req) {
       bucketSizeByRequestNo.remove(getRequestNo(req));
   }

   private Semaphore getCount(ServletRequest req) {
       final Semaphore semaphore = bucketSizeByRequestNo.get(getRequestNo(req));
       if (semaphore == null) {
           final Semaphore newSemaphore = new Semaphore(0, false);
           bucketSizeByRequestNo.putIfAbsent(getRequestNo(req), newSemaphore);
           return newSemaphore;
       } else {
           return semaphore;
       }
   }

   private Long getRequestNo(ServletRequest req) {
       final Long reqNo = (Long) req.getAttribute(REQUEST_NO);
       if (reqNo == null) {
           throw new IllegalAccessError("Request # not found in: " + req);
       }
       return reqNo;
   }
}


구현코드는 세마포어가 Map으로 대체된 것을 제외하고는 매우 유사합니다(full class here) .
Completed() 메서드를 호출하는 것은 매우 중요합니다, 그렇지 않으면 Map은 지속적으로 자라서 메모리 leak을 야기할수 있습니다. 토큰 버킷 구현 또한 다운로드 서블릿은 거의 동일 합니다. 많이 변경되지 않습니다. 이제 Stress 테스트할 준비가 되었습니다.

Throughput Testing

JMeter을 사용하였습니다. 20분 동안 테스트가 진행되었고,  6초에 하나씩 10분에 100개의 스레드를 생성하였습니다. 이후 10분동안 100개의 동시접속을 유지하였습니다.


Important note: I artificially lowered the number of HTTP worker threads to 10 in Tomcat (7.0.10 tested). This is a far from real configuration, but I wanted to emphasize some phenomena that occur with high load compared to server capabilities. With default pool size I would need several client machines running distributed JMeter session to generate enough traffic. If you have a server farm or couple of servers in the cloud (as opposed to my 3-year-old laptop), I would be delighted to see the results in more realistic environment.

Remembering how many HTTP worker threads are available in Tomcat, response times over time are far from satisfactory:


Please note the plateau at the beginning of the test: after about a minute (hint: when the number of concurrent connections exceeds 10) response times are skyrocketing to stabilize at around 10 seconds after 10 minutes (number of concurrent connections reaches one hundred). Once again: the same behavior would occur with 100 worker threads and 1000 concurrent connections – it's just a matter of scale. The response latencies graph (time between sending request and receiving first lines of response) clears any doubts:


Below the magical number of 10 threads our application responds almost instantly. This is really important for clients as receiving only headers (especially Content-Type and Content-Length) allows them to more accurately inform the user what is going on. So what is the reason of Tomcat waiting with the response? No magic here really. We have only 10 threads and each connection requires one thread, so Tomcat (and any other pre-Servlet 3.0 container) handles 10 clients while the remaining 90 are... queued. The moment one of the 10 lucky ones is done, one connection from the queue is taken. This explains average 9 second latency whilst the servlet needs only 1 second to serve the request (200 kiB with 20 kiB/s limit). If you are still not convinced, Tomcat provides nice JMX indicators showing how many threads are occupied and how many requests are queued:


With traditional servlets there is nothing we can do. Throughput is horrible but increasing the total number of threads is not an option (think: from 100 to 1000). But you don't actually need a profiler to discover that threads aren't the true bottleneck here. Look carefully at DownloadServletHandler, where do you think most of the time is spent? Reading a file? Sending data back to the client? No, the servlet waits... And then waits even more. Non-productively hanging on semaphore – thankfully CPU is not harmed, but what if it was implemented using busy waiting? Luckily Tomcat 7 finally supports...

Servlet 3.0 asynchronous processing

We are this close to increase our server capacity by an order of magnitude, but some non-trivial changes are required (see master branch). First, download servlet needs to be marked as asynchronous (OK, this is still trivial):

@WebServlet(urlPatterns = "/*", name="downloadServletHandler", asyncSupported = true)
public class DownloadServlet extends HttpRequestHandlerServlet {}


The main change occurs in download handler. Instead of sending the whole file in a loop with lots of waiting (takeBlocking()) involved, we are splitting the loop into separate iterations, each wrapped inside Callable. Now we will utilize a small thread pool that will be shared by all awaiting connections. Each task in the pool is very simple: instead of waiting for a token, it asks for it in a non-blocking fashion (tryTake()). If the token is available, piece of the file is sent to the client (sendChunkWorthOneToken()). If the token is not available (bucket is empty), nothing happens. No matter whether the token was available or not, the task resubmits itself to the queue for further processing (this is essentially very fancy, multi-threaded loop). Because there is only one pool, the task lands at the end of the queue allowing other connections to be served.

@Service
public class DownloadServletHandler implements HttpRequestHandler {

   @Resource
   private TokenBucket tokenBucket;

   @Resource
   private ThreadPoolTaskExecutor downloadWorkersPool;

   @Override
   public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
       final File file = new File("/home/dev/tmp/ehcache-1.6.2.jar");
       response.setContentLength((int) file.length());
       final BufferedInputStream input = new BufferedInputStream(new FileInputStream(file));
       final AsyncContext asyncContext = request.startAsync(request, response);
       downloadWorkersPool.submit(new DownloadChunkTask(asyncContext, input));
   }

   private class DownloadChunkTask implements Callable<Void> {

       private final BufferedInputStream fileInputStream;
       private final byte[] buffer = new byte[TokenBucket.TOKEN_PERMIT_SIZE];
       private final AsyncContext ctx;

       public DownloadChunkTask(AsyncContext ctx, BufferedInputStream fileInputStream) throws IOException {
           this.ctx = ctx;
           this.fileInputStream = fileInputStream;
       }

       @Override
       public Void call() throws Exception {
           try {
               if (tokenBucket.tryTake(ctx.getRequest())) {
                   sendChunkWorthOneToken();
               } else
                   downloadWorkersPool.submit(this);
           } catch (Exception e) {
               log.error("", e);
               done();
           }
           return null;
       }

       private void sendChunkWorthOneToken() throws IOException {
           final int bytesCount = fileInputStream.read(buffer);
           ctx.getResponse().getOutputStream().write(buffer, 0, bytesCount);
           if (bytesCount < buffer.length)
               done();
           else
               downloadWorkersPool.submit(this);
       }

       private void done() throws IOException {
           fileInputStream.close();
           tokenBucket.completed(ctx.getRequest());
           ctx.complete();
       }
   }

}


I am leaving the details of Servlet 3.0 API, there are plenty of less sophisticated examples throughout the Internet. Just remember to call startAsync() and work with returned AsyncContext instead of plain request and response.

BTW creating a thread pool using Spring is childishly easy (and we get nice thread names as opposed to Executors and ExecutorService):

That's right, one thread is enough to serve one hundred concurrent clients. See for yourself (the amount of HTTP worker threads is still 10 and yes, the scale is in milliseconds).

Response Times over Time

Response Latencies over Time

As you can see, response times when one hundred clients are downloading a file concurrently are only about 5% higher compared to the system with almost no load. Also response latencies aren't particularly harmed by increasing load. I can't push the server even further due to my limited hardware resources, but I have reasons to believe that this simple application would handle even twice as more connection: both HTTP threads and download worker thread weren't fully utilized during the whole test. This also means that we have increased our server capacity 10 times without even using all the threads!

Hope you enjoyed this article. Of course not every use case can be scaled so easily, but next time you'll notice your servlet is mainly waiting – don't waste HTTP threads and consider servlet 3.0 asynchronous processing. And test, measure and compare! The complete application source codes are available (look at different branches), including JMeter test plan.

Areas of improvement

There are still several places that require attention and improvement. If you want to, don't hesitate, fork, modify and test:
  • While profiling I discovered that in more than 80% of executions DownloadChunkTask does not acquire a token and only reschedules itself. This is an awful waste of CPU time that can be fixed quite easily (how?)
  • Consider opening a file and sending content length in a worker thread rather than in an HTTP thread (before starting asynchronous context)
  • How can one implement global limit on top of bandwidth limits per request? You have at least couple of choice: either limit the size of download workers pool queue and reject executions or wrap PerRequestTokenBucket with reimplemented GlobalTokenBucket (decorator pattern)
  • TokenBucket.tryTake() method does clearly violate Command-query separation principle. Could you suggest how it should look like to follow it? Why it is so hard?
  • I am aware that my test constantly reads the same small file, so the I/O performance impact is minimal. But in real life scenario some caching layer would have certainly be applied on top of disk storage. So the difference is not that big (now the application uses very small amount of memory, lots of place for cache).


Lessons Learned
  • Loopback interface is not infinitely fast. In fact on my machine localhost was incapable of processing more than 80 MiB/s.
  • I don't use plain request object as a key in bucketSizeByRequestNo. First of all, there are no guarantees on equals() and hashCode() for this interface. And more importantly – read the next point...
  • With servlets 3.0 when processing the request you have to call completed() explicitly to flush and close the connection. Obviously after calling this method request and response objects are useless. What wasn't obvious (and I learned that the hard why) is that Tomcat reuses request objects (pooling) and some of their contents for subsequent connections. This means that the following code is incorrect and dangerous, possibly resulting in accessing/corrupting other requests' attributes or even session (?!?)


ctx.complete();
ctx.getRequest().getAttribute("SOME_KEY")


That's it. A very nice tutorial on increasing a server's throughput by using Servlet 3.0 async processing by Tomasz Nurkiewicz, one of our JCG partners. Don't forget to share!

댓글 없음:

댓글 쓰기

블록체인 개요 및 오픈소스 동향

블록체인(block chain) 블록체인은 공공 거래장부이며 가상 화폐로 거래할때 발생할때 발생할 수 있는 해킹을 막는 기술. 분산 데이터베이스의 한 형태로, 지속적으로 성장하는 데이터 기록 리스트로서 분산 노드의 운영자에 의한 임의 조작이 불가...