<시작하며..>

apache mina에서 cpu 100% 치는 현상이 발생된 적이 있다. JDK 1.6.0_22 and mina 2.0.2에서는 더 이상 cpu가 치는 현상이 없었지만, old한 jdk 에서 동작하는 apache mina를 위해서 코드를 패치하여 해결했다고 한 상태이다. (2.0.3 패치)

NioProcessor 100% CPU usage on Linux (epoll selector bug)
https://issues.apache.org/jira/browse/DIRMINA-678

이 문제는 Jetty (http://jira.codehaus.org/browse/JETTY-937), jdk(http://bugs.sun.com/view_bug.do?bug_id=6693490, http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933) 와도 밀접한 연관이 있으므로 알아두면 좋을 것 같다.

참고

jdk 1.6.0_22 의 release note(http://www.oracle.com/technetwork/java/javase/6u22releasenotes-176121.html)를 봐도 특별한 내용은 없다.  그러나, 1.6.0_23의 버그 fix (http://www.oracle.com/technetwork/java/javase/2col/6u23bugfixes-191074.html) 의 내용 중 “6728542”은 관련 내용이 있다.  그 내용(http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6728542)은 다음과 같다. 아마도 이 부분을 수정한 것이 아닌가 싶다.

Epoll implementation of java.nio.channels.Selector
(EPollSelectorProvider) doesn't correctly packs
epoll_event struct for all platforms.

 

<내용>

jdk의 버그라 할지라도 문제를 해결했는지에 대한 것은 다른 이슈이다. 수많은 버그를 피해서 만들어둔 코드가 멋지기로 하다. (ibm jike 버그 회피 코드를 잘 짰었다.. ㅋ)

Mina 의 버그질라 및 Jetty, jdk 이슈는 아래 thread dump를 근거로 이야기를 하고 있다. select 하면서 loop에 빠지면서 cpu가 100%가 되는 현상이다. 버그질라에 나온 영어로는 spinning in tight loop 이다.

java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:215)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00002aaab3dd8760> (a sun.nio.ch.Util$1)
- locked <0x00002aaab3dd83a0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00002aaab3dd82e8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
 
<자바 및 jvm 소스>
java.net에 있는 jdk 6 update 22 의 linux 소스를 바탕으로 작성한다. 
(다운로드 받을 수 있는 곳 : http://download.java.net/jdk6/6u10/archive/)
java.nio.channels.Selector클래스의 select() 메서드가 호출되는 tree를 쫓아가본다.
public abstract class Selector {
public abstract int select(long timeout)    throws IOException;
}
-> 

abstract class SelectorImpl   extends AbstractSelector {

    public int select(long timeout)  throws IOException
    {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
         return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }

    private int lockAndDoSelect(long timeout) throws IOException {
    synchronized (this) {
        if (!isOpen())
        throw new ClosedSelectorException();
        synchronized (publicKeys) {
        synchronized (publicSelectedKeys) {
            return doSelect(timeout);
        }
        }
        }
    }

   protected abstract int doSelect(long timeout) throws IOException;
}

-> 
class EPollSelectorImpl  extends SelectorImpl {
….

// The poll object
EPollArrayWrapper pollWrapper;


protected int doSelect(long timeout)  throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
           pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

}

->

/**
* Manipulates a native array of epoll_event structs on Linux:
*
* typedef union epoll_data {
*     void *ptr;
*     int fd;
*     __uint32_t u32;
*     __uint64_t u64;
*  } epoll_data_t;
*
* struct epoll_event {
*     __uint32_t events; 
*     epoll_data_t data;
* };
*/

class EPollArrayWrapper {

int poll(long timeout) throws IOException {
    updateRegistrations();
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

private native int epollWait(long pollAddress, int numfds, long timeout,
                             int epfd) throws IOException;
}

->

EPollArrayWrapper.c


/* epoll_wait(2) man page */

typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;

struct epoll_event {
    __uint32_t events;  /* Epoll events */
    epoll_data_t data;  /* User data variable */
} __attribute__ ((__packed__));

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }
   
    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

static int iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout)
{
    jlong start, now;
    int remaining = timeout;
    struct timeval t;
    int diff;

    gettimeofday(&t, NULL);
    start = t.tv_sec * 1000 + t.tv_usec / 1000;

    for (;;) {
        int res = (*epoll_wait_func)(epfd, events, numfds, timeout);
        if (res < 0 && errno == EINTR) {
            if (remaining >= 0) {
                gettimeofday(&t, NULL);
                now = t.tv_sec * 1000 + t.tv_usec / 1000;
                diff = now - start;
                remaining -= diff;
                if (diff < 0 || remaining <= 0) {
                    return 0;
                }
                start = now;
            }
        } else {
            return res;
        }
    }
}



<mina쪽 소스>

mina쪽에서 어떻게 해서 무한 루프가 되는 시나리오는 다음과 같다.

select(timeout)를 호출하지만, 바로 jvm 코드에 의해 selected가 리턴된다. isDisposing() 메서드는 종료여부를 알린다. selector와 바인드된 Channel 이 종료되면 disposing 값은 true가 된다.
jvm 내부의 버그로 종료되지 못한 channel 또는 처리해야 할 channel이 존재함으로서, select()에서는 이벤트가 계속 반복되는 현상이 일어나. 계속 무한 루프가 도는 현상이 발생하게 된다.

 

# Apache Mina 2.0.0-RC1 소스

public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {

private static final long SELECT_TIMEOUT = 1000L;

private class Processor implements Runnable {
        public void run() {
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    int selected = select(SELECT_TIMEOUT);

                    nSessions += handleNewSessions();
                    updateTrafficMask();

                     if (selected > 0) {
                        process();
                    }

                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    nSessions -= removeSessions();
                    notifyIdleSessions(currentTime);

                    if (nSessions == 0) {
                        synchronized (lock) {
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                processor = null;
                                break;
                            }
                        }
                    }

                     if (isDisposing()) {
                        for (Iterator<T> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }
                        wakeup();
                     }
                } catch (Throwable t) {
                   …..

                }
            }

/**
* poll those sessions for the given timeout
* @param timeout milliseconds before the call timeout if no event appear
* @return The number of session ready for read or for write
* @throws Exception if some low level IO error occurs
*/
protected abstract int select(long timeout) throws Exception;

}

 

public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
    private final Selector selector;

@Override
protected int select() throws Exception {
    return selector.select();
}


}

 
그래서 2011년 Apache Mina 2.0.3 에서는 old jvm을 위한 패치가 이루어졌다. 
Mina 버그질라 (https://issues.apache.org/jira/browse/DIRMINA-678)의 Attachment에 있는
diff 파일을 참조할 수 있다.
image
핵심 코드만 따로 보면, 아래 소스를 보면 된다. 
Selector 안에 Channel이 문제가 있는 것(connection이 broken)일 때는 정리하면 되고, 
이미 연결되어 있어 있는 Channel에 대해서는 selector를 새거로 바꾸고 예전꺼는 정리하는 구조
(NioProcessor.registerNewSelector())로 바꿔놨다.
특별히 이 버그에 대해서 또다른 내용이 등록되어 있지 않아서 잘 동작되는 것 같다. 
# Apache Mina 2.0.4 (수정된 패치 내역이 있는 부분)

public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {

 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);

private class Processor implements Runnable {
        public void run() {
            assert (processorRef.get() == this);

            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    // This select has a timeout so that we can manage
                    // idle session when we get out of the select every
                    // second. (note : this is a hack to avoid creating
                    // a dedicated thread).
                   long t0 = System.currentTimeMillis();
                    int selected = select(SELECT_TIMEOUT);
                    long t1 = System.currentTimeMillis();
                    long delta = (t1 - t0);

                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                        // Last chance : the select() may have been
                        // interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");

                            // we can reselect immediately
                            // set back the flag to false
                            wakeupCalled.getAndSet(false);

                            continue;
                        } else {
                            LOG.warn("Create a new selector. Selected is 0, delta = "
                                            + (t1 - t0));
                            // Ok, we are hit by the nasty epoll
                            // spinning.
                            // Basically, there is a race condition
                            // which causes a closing file descriptor not to be
                            // considered as available as a selected channel, but
                            // it stopped the select. The next time we will
                            // call select(), it will exit immediately for the same
                            // reason, and do so forever, consuming 100%
                            // CPU.
                            // We have to destroy the selector, and
                            // register all the socket on a new one.
                           registerNewSelector();
                        }

                       // Set back the flag to false
                        wakeupCalled.getAndSet(false);

                        // and continue the loop
                        continue;
                    }

                    // Manage newly created session first
                    nSessions += handleNewSessions();

                    updateTrafficMask();

                    // Now, if we have had some incoming or outgoing events,
                    // deal with them
                    if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        process();
                    }

                    // Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);

                    // And manage removed sessions
                    nSessions -= removeSessions();

                    // Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);

                    // Get a chance to exit the infinite loop if there are no
                    // more sessions on this Processor
                    if (nSessions == 0) {
                        processorRef.set(null);
                       
                        if (newSessions.isEmpty() && isSelectorEmpty()) {
                            // newSessions.add() precedes startupProcessor
                            assert (processorRef.get() != this);
                            break;
                        }
                       
                        assert (processorRef.get() != this);
                       
                        if (!processorRef.compareAndSet(null, this)) {
                            // startupProcessor won race, so must exit processor
                            assert (processorRef.get() != this);
                            break;
                        }
                       
                        assert (processorRef.get() == this);
                    }

                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<S> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }

                        wakeup();
                    }
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

 

public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
    /** The selector associated with this processor */
    private Selector selector;

@Override
protected void registerNewSelector() throws IOException {
    synchronized (selector) {
        Set<SelectionKey> keys = selector.keys();

        // Open a new selector
        Selector newSelector = Selector.open();

        // Loop on all the registered keys, and register them on the new selector
        for (SelectionKey key : keys) {
            SelectableChannel ch = key.channel();

            // Don't forget to attache the session, and back !
            NioSession session = (NioSession)key.attachment();
            SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
            session.setSelectionKey( newKey );
        }

        // Now we can close the old selector and switch it
        selector.close();
        selector = newSelector;
    }
}

 

 

마치며..

jvm의 버그로 인해서 회피하는 Apache mina 코드와 연관된 jvm 소스를 살펴보았다. 원천적으로 jvm 코드의 수정이 됨으로서 문제가 해결되었지만, jvm 버그가 언제 고쳐질지 모르거나 old jvm을 쓰면서 이슈가 생기는 것을 막기 위해서 회피하는 코드를 만들었다. 책임감이 강한 멋쟁이 mina commitor.. 잘 되기를 바란다.

Posted by '김용환'
,