Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public final class NIOConnection extends NIOHandlerDelegate {
private static final InetSocketAddress DISCONECTED = InetSocketAddress.createUnresolved("", 0);

private static final long SOFT_LIMIT = 1048576L;

/** Output stream */
protected final Out out;

Expand Down Expand Up @@ -93,6 +95,10 @@ public void init(NIOConnection co, long now, SSLEngine sslEngine) throws IOExcep
handler.init(co, now, sslEngine);
}

public final boolean canWrite() {
return writes.remaining() < SOFT_LIMIT;
}

/**
* add a buffers to the writing queue
*
Expand Down Expand Up @@ -157,9 +163,8 @@ public boolean isClosed() {

protected final void beforeWrite(long now) throws IOException {
ByteBuffer b;
while ((b = pending.poll()) != null)
writes.accept(b);
handler.transformWrite(writes, now);
while (writes.remaining() < SOFT_LIMIT && (b = pending.poll()) != null)
handler.transformWrite(b, writes, now);
}

@Override
Expand All @@ -173,6 +178,9 @@ public final void onWrite(long now) throws IOException {
else if (writeScheduled.compareAndSet(false, true))
key.interestOpsOr(SelectionKey.OP_WRITE);
}
synchronized (out) {
out.notifyAll();
}
handler.onWrite(now);
}

Expand Down Expand Up @@ -251,7 +259,7 @@ public synchronized void write(byte[] b, int off, int len) throws IOException {
if (len < BUF_SIZE)
buf.put(b, off, len);
else
h.write(ByteBuffer.wrap(Arrays.copyOfRange(b, off, off + len)));
tryWrite(ByteBuffer.wrap(Arrays.copyOfRange(b, off, off + len)));
} else if (len == r) {
buf.put(b, off, len);
writeBuffer();
Expand All @@ -269,7 +277,19 @@ public synchronized void write(ByteBuffer b) throws IOException {
if (h == null)
throw new IOException("already closed");
writeBuffer();
h.write(b);
tryWrite(b);
}

private synchronized void tryWrite(ByteBuffer b) throws IOException {
NIOConnection co = h;
try {
while (!co.canWrite()) {
wait();
}
} catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
co.write(b);
}

@Override
Expand Down Expand Up @@ -299,7 +319,7 @@ public synchronized void flush() throws IOException {
private void writeBuffer() throws IOException {
if (h == null || buf.position() == 0)
return;
h.write(buf.flip());
tryWrite(buf.flip());
buf = ByteBuffer.allocate(BUF_SIZE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ default void onRead(ByteBuffer b, long now) throws IOException { // ok
/**
* called before some buffers are written
*
* @param buffers buffers to be written
* @param in buffer to transform
* @param out buffers to be written
* @param now nanoTime
* @throws IOException on io exception
*/
default void transformWrite(ByteBuffers buffers, long now) throws IOException { // ok
default void transformWrite(ByteBuffer in, ByteBuffers out, long now) throws IOException {
out.accept(in);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public void onRead(ByteBuffer b, long now) throws IOException {
}

@Override
public void transformWrite(ByteBuffers buffers, long now) throws IOException {
handler.transformWrite(buffers, now);
public void transformWrite(ByteBuffer in, ByteBuffers buffers, long now) throws IOException {
handler.transformWrite(in, buffers, now);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
Expand All @@ -23,7 +22,6 @@ public class NIOSSLHandler extends NIOHandlerDelegate {
private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);

private final SSLContext sslContext;
private final ByteBuffers rawOut;

private NIOConnection co;
private SSLEngine sslEngine;
Expand All @@ -36,7 +34,6 @@ public class NIOSSLHandler extends NIOHandlerDelegate {
public NIOSSLHandler(SSLContext sslContext, NIOConnectionHandler handler) {
super(handler);
this.sslContext = sslContext;
this.rawOut = new ByteBuffers(16);
}

@Override
Expand Down Expand Up @@ -104,35 +101,22 @@ public void onRead(ByteBuffer b, long now) throws IOException {
}

@Override
public void transformWrite(ByteBuffers buffers, long now) throws IOException {
rawOut.accept(buffers);
while (!rawOut.isEmpty()) {
ByteBuffer out = ByteBuffer.allocate(packetBufferSize);
SSLEngineResult r = sslEngine.wrap(rawOut.buf, 0, rawOut.len, out);
public void transformWrite(ByteBuffer in, ByteBuffers writes, long now) throws IOException {
ByteBuffer net = ByteBuffer.allocate(packetBufferSize);
SSLEngineResult r = sslEngine.wrap(in, net);
logger.debug("wrap {}", r);
if (net.position() > 0)
handler.transformWrite(net.flip(), writes, now);
if (r.getStatus() == Status.CLOSED)
return;
while (in.hasRemaining() || r.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
net = ByteBuffer.allocate(packetBufferSize);
r = sslEngine.wrap(in, net);
if (net.position() > 0)
handler.transformWrite(net.flip(), writes, now);
logger.debug("wrap {}", r);
buffers.accept(out.flip());
if (r.getStatus() == Status.CLOSED) {
AtomicInteger l = new AtomicInteger(0);
rawOut.drain(b -> l.getAndAdd(b.remaining()));
if (l.get() > 0)
logger.warn("{} remaining data {}", co, l);
break;
}
while (r.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
out = ByteBuffer.allocate(packetBufferSize);
r = sslEngine.wrap(rawOut.buf, 0, rawOut.len, out);
buffers.accept(out.flip());
logger.debug("wrap {}", r);
}
checkHandshake(r.getHandshakeStatus(), now);
rawOut.compact();
}
handler.transformWrite(buffers, now);
}

@Override
public boolean hasPendingWrites() {
return !rawOut.isEmpty() || handler.hasPendingWrites();
checkHandshake(r.getHandshakeStatus(), now);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import unknow.server.nio.NIOWorker.WorkerTask;

/**
* nio server listener
* @author unknow
Expand All @@ -31,6 +33,30 @@ public interface NIOServerListener {
*/
void accepted(String name, NIOConnection h);

/**
* a task is accepted
*
* @param name worker name
* @param task the task
*/
void accepted(String name, WorkerTask task);

/**
* a task is done
*
* @param name worker name
* @param task the task
*/
void done(String name, WorkerTask task);

/**
* a connection is closing
*
* @param name worker name
* @param h client handler
*/
void closing(String name, NIOConnection h);

/**
* a connection is closed
*
Expand All @@ -54,18 +80,27 @@ public interface NIOServerListener {
*/
void onSelect(String name, long now);

/**
* do nothing
*/
public static final NIOServerListener NOP = new NIOServerListener() {
public static class NOPListener implements NIOServerListener {
@Override
public void starting(NIOServer server) { // OK
}

@Override
public void accepted(String name, WorkerTask task) { // OK
}

@Override
public void done(String name, WorkerTask task) { // OK
}

@Override
public void accepted(String name, NIOConnection h) { // OK
}

@Override
public void closing(String name, NIOConnection h) { // OK
}

@Override
public void closed(String name, NIOConnection h) { // OK
}
Expand All @@ -77,7 +112,12 @@ public void closing(NIOServer server, Exception e) { // OK
@Override
public void onSelect(String name, long now) { // ok
}
}

/**
* do nothing
*/
public static final NIOServerListener NOP = new NOPListener() {
@Override
public String toString() {
return "NIOServerListener.NOP";
Expand All @@ -87,7 +127,7 @@ public String toString() {
/**
* Listener that log all event
*/
public static final NIOServerListener LOG = new NIOServerListener() {
public static final NIOServerListener LOG = new NOPListener() {
private final Logger logger = LoggerFactory.getLogger(NIOServerListener.class);

private final AtomicInteger c = new AtomicInteger();
Expand All @@ -102,9 +142,14 @@ public void accepted(String name, NIOConnection h) {
logger.info("{} accepted {} ({})", name, h, c.incrementAndGet());
}

@Override
public void closing(String name, NIOConnection h) {
logger.info("{} closing {} ({})", name, h, c.decrementAndGet());
}

@Override
public void closed(String name, NIOConnection h) {
logger.info("{} closed {} ({})", name, h, c.decrementAndGet());
logger.info("{} closed {} ({})", name, h, c.get());
}

@Override
Expand Down Expand Up @@ -154,12 +199,30 @@ public void starting(NIOServer server) {
listeners[i].starting(server);
}

@Override
public void accepted(String name, WorkerTask task) {
for (int i = 0; i < listeners.length; i++)
listeners[i].accepted(name, task);
}

@Override
public void done(String name, WorkerTask task) {
for (int i = 0; i < listeners.length; i++)
listeners[i].done(name, task);
}

@Override
public void accepted(String name, NIOConnection h) {
for (int i = 0; i < listeners.length; i++)
listeners[i].accepted(name, h);
}

@Override
public void closing(String name, NIOConnection h) {
for (int i = 0; i < listeners.length; i++)
listeners[i].closing(name, h);
}

@Override
public void closed(String name, NIOConnection h) {
for (int i = 0; i < listeners.length; i++)
Expand Down
Loading
Loading