diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/ConnectionStats.java b/unknow-server-nio/src/main/java/unknow/server/nio/ConnectionStats.java deleted file mode 100644 index fa2a7b6d..00000000 --- a/unknow-server-nio/src/main/java/unknow/server/nio/ConnectionStats.java +++ /dev/null @@ -1,47 +0,0 @@ -package unknow.server.nio; - -/** - * statistics of a nio connection - */ -public class ConnectionStats { - private final boolean hasPengingWrite; - private final boolean closing; - private final long lastCheck; - private final long lastAction; - - public ConnectionStats(boolean hasPengingWrite, boolean closing, long lastCheck, long lastAction) { - this.hasPengingWrite = hasPengingWrite; - this.closing = closing; - this.lastCheck = lastCheck; - this.lastAction = lastAction; - } - - /** - * @return true if the connection has pending write - */ - public boolean hasPengingWrite() { - return hasPengingWrite; - } - - /** - * @return true if the connection is closing - */ - public boolean isClosing() { - return closing; - } - - /** - * @return nanotime of last connection check - */ - public long lastCheck() { - return lastCheck; - } - - /** - * @return nano time of last action on the connection - */ - public long lastAction() { - return lastAction; - } - -} diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnection.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnection.java index a1b422c0..0c01a2b6 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnection.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnection.java @@ -28,6 +28,8 @@ public final class NIOConnection extends NIOHandlerDelegate { private static final InetSocketAddress DISCONECTED = InetSocketAddress.createUnresolved("", 0); + private static final long SOFT_LIMIT = 1048576L; + /** Output stream */ protected final Out out; @@ -93,6 +95,10 @@ public void init(NIOConnection co, long now, SSLEngine sslEngine) throws IOExcep handler.init(co, now, sslEngine); } + public final boolean canWrite() { + return writes.remaining() < SOFT_LIMIT; + } + /** * add a buffers to the writing queue * @@ -157,9 +163,8 @@ public boolean isClosed() { protected final void beforeWrite(long now) throws IOException { ByteBuffer b; - while ((b = pending.poll()) != null) - writes.accept(b); - handler.transformWrite(writes, now); + while (writes.remaining() < SOFT_LIMIT && (b = pending.poll()) != null) + handler.transformWrite(b, writes, now); } @Override @@ -173,6 +178,9 @@ public final void onWrite(long now) throws IOException { else if (writeScheduled.compareAndSet(false, true)) key.interestOpsOr(SelectionKey.OP_WRITE); } + synchronized (out) { + out.notifyAll(); + } handler.onWrite(now); } @@ -251,7 +259,7 @@ public synchronized void write(byte[] b, int off, int len) throws IOException { if (len < BUF_SIZE) buf.put(b, off, len); else - h.write(ByteBuffer.wrap(Arrays.copyOfRange(b, off, off + len))); + tryWrite(ByteBuffer.wrap(Arrays.copyOfRange(b, off, off + len))); } else if (len == r) { buf.put(b, off, len); writeBuffer(); @@ -269,7 +277,19 @@ public synchronized void write(ByteBuffer b) throws IOException { if (h == null) throw new IOException("already closed"); writeBuffer(); - h.write(b); + tryWrite(b); + } + + private synchronized void tryWrite(ByteBuffer b) throws IOException { + NIOConnection co = h; + try { + while (!co.canWrite()) { + wait(); + } + } catch (@SuppressWarnings("unused") InterruptedException e) { + Thread.currentThread().interrupt(); + } + co.write(b); } @Override @@ -299,7 +319,7 @@ public synchronized void flush() throws IOException { private void writeBuffer() throws IOException { if (h == null || buf.position() == 0) return; - h.write(buf.flip()); + tryWrite(buf.flip()); buf = ByteBuffer.allocate(BUF_SIZE); } } diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnectionHandler.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnectionHandler.java index bb48b935..2a15c724 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnectionHandler.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOConnectionHandler.java @@ -55,11 +55,13 @@ default void onRead(ByteBuffer b, long now) throws IOException { // ok /** * called before some buffers are written * - * @param buffers buffers to be written + * @param in buffer to transform + * @param out buffers to be written * @param now nanoTime * @throws IOException on io exception */ - default void transformWrite(ByteBuffers buffers, long now) throws IOException { // ok + default void transformWrite(ByteBuffer in, ByteBuffers out, long now) throws IOException { + out.accept(in); } /** diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOHandlerDelegate.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOHandlerDelegate.java index 7331d0a9..0e0c4171 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOHandlerDelegate.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOHandlerDelegate.java @@ -35,8 +35,8 @@ public void onRead(ByteBuffer b, long now) throws IOException { } @Override - public void transformWrite(ByteBuffers buffers, long now) throws IOException { - handler.transformWrite(buffers, now); + public void transformWrite(ByteBuffer in, ByteBuffers buffers, long now) throws IOException { + handler.transformWrite(in, buffers, now); } @Override diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOSSLHandler.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOSSLHandler.java index 672cad8e..6c5285cf 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOSSLHandler.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOSSLHandler.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -23,7 +22,6 @@ public class NIOSSLHandler extends NIOHandlerDelegate { private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); private final SSLContext sslContext; - private final ByteBuffers rawOut; private NIOConnection co; private SSLEngine sslEngine; @@ -36,7 +34,6 @@ public class NIOSSLHandler extends NIOHandlerDelegate { public NIOSSLHandler(SSLContext sslContext, NIOConnectionHandler handler) { super(handler); this.sslContext = sslContext; - this.rawOut = new ByteBuffers(16); } @Override @@ -104,35 +101,22 @@ public void onRead(ByteBuffer b, long now) throws IOException { } @Override - public void transformWrite(ByteBuffers buffers, long now) throws IOException { - rawOut.accept(buffers); - while (!rawOut.isEmpty()) { - ByteBuffer out = ByteBuffer.allocate(packetBufferSize); - SSLEngineResult r = sslEngine.wrap(rawOut.buf, 0, rawOut.len, out); + public void transformWrite(ByteBuffer in, ByteBuffers writes, long now) throws IOException { + ByteBuffer net = ByteBuffer.allocate(packetBufferSize); + SSLEngineResult r = sslEngine.wrap(in, net); + logger.debug("wrap {}", r); + if (net.position() > 0) + handler.transformWrite(net.flip(), writes, now); + if (r.getStatus() == Status.CLOSED) + return; + while (in.hasRemaining() || r.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { + net = ByteBuffer.allocate(packetBufferSize); + r = sslEngine.wrap(in, net); + if (net.position() > 0) + handler.transformWrite(net.flip(), writes, now); logger.debug("wrap {}", r); - buffers.accept(out.flip()); - if (r.getStatus() == Status.CLOSED) { - AtomicInteger l = new AtomicInteger(0); - rawOut.drain(b -> l.getAndAdd(b.remaining())); - if (l.get() > 0) - logger.warn("{} remaining data {}", co, l); - break; - } - while (r.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { - out = ByteBuffer.allocate(packetBufferSize); - r = sslEngine.wrap(rawOut.buf, 0, rawOut.len, out); - buffers.accept(out.flip()); - logger.debug("wrap {}", r); - } - checkHandshake(r.getHandshakeStatus(), now); - rawOut.compact(); } - handler.transformWrite(buffers, now); - } - - @Override - public boolean hasPendingWrites() { - return !rawOut.isEmpty() || handler.hasPendingWrites(); + checkHandshake(r.getHandshakeStatus(), now); } @Override diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOServerListener.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOServerListener.java index 154d425b..71125f47 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOServerListener.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOServerListener.java @@ -10,6 +10,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import unknow.server.nio.NIOWorker.WorkerTask; + /** * nio server listener * @author unknow @@ -31,6 +33,30 @@ public interface NIOServerListener { */ void accepted(String name, NIOConnection h); + /** + * a task is accepted + * + * @param name worker name + * @param task the task + */ + void accepted(String name, WorkerTask task); + + /** + * a task is done + * + * @param name worker name + * @param task the task + */ + void done(String name, WorkerTask task); + + /** + * a connection is closing + * + * @param name worker name + * @param h client handler + */ + void closing(String name, NIOConnection h); + /** * a connection is closed * @@ -54,18 +80,27 @@ public interface NIOServerListener { */ void onSelect(String name, long now); - /** - * do nothing - */ - public static final NIOServerListener NOP = new NIOServerListener() { + public static class NOPListener implements NIOServerListener { @Override public void starting(NIOServer server) { // OK } + @Override + public void accepted(String name, WorkerTask task) { // OK + } + + @Override + public void done(String name, WorkerTask task) { // OK + } + @Override public void accepted(String name, NIOConnection h) { // OK } + @Override + public void closing(String name, NIOConnection h) { // OK + } + @Override public void closed(String name, NIOConnection h) { // OK } @@ -77,7 +112,12 @@ public void closing(NIOServer server, Exception e) { // OK @Override public void onSelect(String name, long now) { // ok } + } + /** + * do nothing + */ + public static final NIOServerListener NOP = new NOPListener() { @Override public String toString() { return "NIOServerListener.NOP"; @@ -87,7 +127,7 @@ public String toString() { /** * Listener that log all event */ - public static final NIOServerListener LOG = new NIOServerListener() { + public static final NIOServerListener LOG = new NOPListener() { private final Logger logger = LoggerFactory.getLogger(NIOServerListener.class); private final AtomicInteger c = new AtomicInteger(); @@ -102,9 +142,14 @@ public void accepted(String name, NIOConnection h) { logger.info("{} accepted {} ({})", name, h, c.incrementAndGet()); } + @Override + public void closing(String name, NIOConnection h) { + logger.info("{} closing {} ({})", name, h, c.decrementAndGet()); + } + @Override public void closed(String name, NIOConnection h) { - logger.info("{} closed {} ({})", name, h, c.decrementAndGet()); + logger.info("{} closed {} ({})", name, h, c.get()); } @Override @@ -154,12 +199,30 @@ public void starting(NIOServer server) { listeners[i].starting(server); } + @Override + public void accepted(String name, WorkerTask task) { + for (int i = 0; i < listeners.length; i++) + listeners[i].accepted(name, task); + } + + @Override + public void done(String name, WorkerTask task) { + for (int i = 0; i < listeners.length; i++) + listeners[i].done(name, task); + } + @Override public void accepted(String name, NIOConnection h) { for (int i = 0; i < listeners.length; i++) listeners[i].accepted(name, h); } + @Override + public void closing(String name, NIOConnection h) { + for (int i = 0; i < listeners.length; i++) + listeners[i].closing(name, h); + } + @Override public void closed(String name, NIOConnection h) { for (int i = 0; i < listeners.length; i++) diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorker.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorker.java index 11e5c9e0..944bc6a6 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorker.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorker.java @@ -8,14 +8,12 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -34,6 +32,7 @@ public final class NIOWorker extends NIOLoop implements NIOWorkers { private static final Logger logger = LoggerFactory.getLogger(NIOWorker.class); private static final int BUF_LEN = 64 * 1024; + private static final int MAX_READS = 8; /** executor for delegating task */ private final ExecutorService executor; @@ -156,12 +155,6 @@ private boolean closingTimeout(NIOConnection co, long now) { return false; } - public Future> connectionStats() { - StatCollector statCollector = new StatCollector(); - execute(statCollector); - return statCollector; - } - @Override protected final void selected(long now, SelectionKey key) throws IOException { NIOConnection co = (NIOConnection) key.attachment(); @@ -194,7 +187,8 @@ protected final void selected(long now, SelectionKey key) throws IOException { } private void doRead(NIOConnection co, long now) throws IOException { - while (true) { + int i = 0; + while (i++ < MAX_READS) { int l = co.channel.read(buf); if (l == -1) { co.key.interestOpsAnd(~SelectionKey.OP_READ); @@ -212,11 +206,11 @@ private void doRead(NIOConnection co, long now) throws IOException { } private void doWrite(NIOConnection co, long now) throws IOException { - if (co.writes.isEmpty()) - co.beforeWrite(now); + co.beforeWrite(now); long l = co.channel.write(co.writes.buf, 0, co.writes.len); logger.trace("{} writen {} {}", co, l, co.key); if (l > 0) { + co.writes.remaining -= l; co.onWrite(now); if (co.isClosed()) startClose(co, now); @@ -346,20 +340,4 @@ public void run() { } } } - - private class StatCollector extends CompletableFuture> implements WorkerTask { - @Override - public void run(long now) { - List list = new ArrayList<>(); - NIOConnection co = head; - while (co != null) { - list.add(new ConnectionStats(co.hasPendingWrites(), false, co.lastCheck, co.lastAction)); - co = co.next; - } - for (NIOConnection c : closing) - list.add(new ConnectionStats(c.hasPendingWrites(), true, c.lastCheck, c.lastAction)); - complete(list); - } - - } } \ No newline at end of file diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorkers.java b/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorkers.java index 444ab3f7..6eb1540c 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorkers.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/NIOWorkers.java @@ -41,6 +41,7 @@ public interface NIOWorkers { /** * list all workers + * @return the workers */ Collection workers(); diff --git a/unknow-server-nio/src/main/java/unknow/server/nio/PrometheusListener.java b/unknow-server-nio/src/main/java/unknow/server/nio/PrometheusListener.java index 190cd56d..e9d16fb5 100644 --- a/unknow-server-nio/src/main/java/unknow/server/nio/PrometheusListener.java +++ b/unknow-server-nio/src/main/java/unknow/server/nio/PrometheusListener.java @@ -1,94 +1,49 @@ package unknow.server.nio; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.prometheus.client.Collector; import io.prometheus.client.Counter; -import io.prometheus.client.GaugeMetricFamily; import io.prometheus.client.Histogram; +import unknow.server.nio.NIOServerListener.NOPListener; +import unknow.server.nio.NIOWorker.WorkerTask; -public class PrometheusListener implements NIOServerListener { - private static final Logger logger = LoggerFactory.getLogger(PrometheusListener.class); +public class PrometheusListener extends NOPListener { private static final Counter ACCEPTED = Counter.build("nio_connection_accepted", "Number of connection accepted").labelNames("name").register(); + private static final Counter CLOSING = Counter.build("nio_connection_closing", "Number of connection in closing state").labelNames("name").register(); private static final Counter CLOSED = Counter.build("nio_connection_closed", "Number of connection closed").labelNames("name").register(); private static final Histogram SELECT = Histogram.build("nio_select_time", "Duration of the selection process").labelNames("name").buckets(.0001, .0005, .001, .005, .01) .register(); + private static final Counter TASKS_ACCEPTED = Counter.build("nio_tasks_accepted", "Number of tasks accepted").labelNames("name").register(); + private static final Counter TASKS_DONE = Counter.build("nio_tasks_done", "Number of tasks done").labelNames("name").register(); - private static final Collection WORKERS = new ArrayList<>(); + public static final NIOServerListener INSTANCE = new PrometheusListener(); - static { - new WorkerCollector().register(); + @Override + public void accepted(String name, NIOConnection h) { + ACCEPTED.labels(name).inc(); } - public static final NIOServerListener INSTANCE = new PrometheusListener(); - @Override - public void starting(NIOServer server) { - synchronized (WORKERS) { - WORKERS.addAll(server.workers()); - } + public void accepted(String name, WorkerTask task) { + TASKS_ACCEPTED.labels(name).inc(); } @Override - public void accepted(String name, NIOConnection h) { - ACCEPTED.labels(name).inc(); + public void done(String name, WorkerTask task) { + TASKS_DONE.labels(name).inc(); } @Override - public void closed(String name, NIOConnection h) { - CLOSED.labels(name).inc(); + public void closing(String name, NIOConnection h) { + CLOSING.labels(name).inc(); } @Override - public void closing(NIOServer server, Exception e) { - synchronized (WORKERS) { - WORKERS.removeAll(server.workers()); - } + public void closed(String name, NIOConnection h) { + CLOSED.labels(name).inc(); } @Override public void onSelect(String name, long now) { SELECT.labels(name).observe((System.nanoTime() - now) / 1000_000_000.); } - - private static class WorkerCollector extends Collector { - - @Override - public List collect() { - List labels = Arrays.asList("name"); - GaugeMetricFamily closing = new GaugeMetricFamily("nio_worker_connection_closing", "Number of connection in closing state", labels); - GaugeMetricFamily writes = new GaugeMetricFamily("nio_worker_connection_writes", "Number of tasks on the worker", labels); - GaugeMetricFamily tasks = new GaugeMetricFamily("nio_worker_tasks", "Number of tasks on the worker", labels); - synchronized (WORKERS) { - for (NIOWorker w : WORKERS) { - labels = Arrays.asList(w.name); - closing.addMetric(labels, w.nbClosing()); - tasks.addMetric(labels, w.nbTask()); - try { - int i = 0; - for (ConnectionStats s : w.connectionStats().get()) { - if (s.hasPengingWrite()) - i++; - } - writes.addMetric(labels, i); - } catch (@SuppressWarnings("unused") InterruptedException e) { - Thread.currentThread().interrupt(); - return Collections.emptyList(); - } catch (ExecutionException e) { - logger.warn("Failed to get connection stats", e); - } - } - } - return Arrays.asList(closing, tasks, writes); - } - } } diff --git a/unknow-server-servlet/src/main/java/unknow/server/servlet/http2/Http2Processor.java b/unknow-server-servlet/src/main/java/unknow/server/servlet/http2/Http2Processor.java index fe5e7a02..485ce576 100644 --- a/unknow-server-servlet/src/main/java/unknow/server/servlet/http2/Http2Processor.java +++ b/unknow-server-servlet/src/main/java/unknow/server/servlet/http2/Http2Processor.java @@ -264,13 +264,10 @@ public void sendFrame(int type, int flags, int id, ByteBuffer data) throws IOExc byte[] b = data.array(); formatFrame(b, data.position(), size, type, flags, id); - synchronized (co) { - co.write(data); - } + co.write(data); if (logger.isTraceEnabled()) logger.trace(String.format("%s send frame: %02x, size: %s, flags: %02x, id: %s", co, type, size, flags, id)); - co.flush(); } @SuppressWarnings("resource") @@ -316,7 +313,6 @@ public void sendData(int id, ByteBuffer data, boolean done) throws IOException { sendFrame(0, 0, id, data); } sendFrame(0, done ? 0x1 : 0, id, data); - co.flush(); } public void sendWindowUpdate(int id, int window) throws IOException { diff --git a/unknow-server-util/src/main/java/unknow/server/util/io/ByteBuffers.java b/unknow-server-util/src/main/java/unknow/server/util/io/ByteBuffers.java index 6ee966a4..4f94142e 100644 --- a/unknow-server-util/src/main/java/unknow/server/util/io/ByteBuffers.java +++ b/unknow-server-util/src/main/java/unknow/server/util/io/ByteBuffers.java @@ -9,6 +9,7 @@ public class ByteBuffers implements Consumer { public ByteBuffer[] buf; public int len; + public long remaining; public ByteBuffers(int l) { buf = new ByteBuffer[l]; @@ -24,6 +25,7 @@ private void ensureCapacity(int l) { public void accept(ByteBuffer b) { ensureCapacity(len + 1); buf[len++] = b; + remaining += b.remaining(); } public void accept(ByteBuffers buffers) { @@ -32,6 +34,7 @@ public void accept(ByteBuffers buffers) { ensureCapacity(len + buffers.len); System.arraycopy(buffers.buf, 0, buf, len, buffers.len); len += buffers.len; + remaining += buffers.remaining; buffers.clear(); } @@ -43,13 +46,11 @@ public void clear() { for (int i = 0; i < len; i++) buf[i] = null; len = 0; + remaining = 0; } - public int remaining() { - int r = 0; - for (int i = 0; i < len; i++) - r += buf[i].remaining(); - return r; + public long remaining() { + return remaining; } /** @@ -84,6 +85,7 @@ public void drain(ConsumerWithException c) buf[i] = null; } len = 0; + remaining = 0; } /**