Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions unknow-server-nio/src/main/java/unknow/server/nio/NIOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ protected void onStartup() {
@SuppressWarnings("resource")
@Override
protected void selected(long now, SelectionKey key) throws IOException, InterruptedException {
ConnectionFactory factory = (ConnectionFactory) key.attachment();
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel socket = null;
try {
ConnectionFactory factory = (ConnectionFactory) key.attachment();
socket = ((ServerSocketChannel) key.channel()).accept();
workers.register(socket, factory);
} catch (IOException e) {
if (socket != null) {
while ((socket = ssc.accept()) != null) {
try {
workers.register(socket, factory);
} catch (IOException e) {
try {
socket.close();
} catch (IOException ex) {
e.addSuppressed(ex);
}
logger.warn("Failed to accept", e);
}
logger.warn("Failed to accept", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import unknow.server.nio.NIOWorkers.RoundRobin;
import unknow.server.nio.worker.RoundRobin;

/** builder for the an NIOServer */
public class NIOServerBuilder {
Expand Down Expand Up @@ -180,14 +180,15 @@ public final NIOServer build(String... arg) throws Exception {
return server;
}

@SuppressWarnings("resource")
private NIOWorkers createWorkers(int i, long selectTime, long closingTime, NIOServerListener l) throws IOException {
ExecutorService executor = getExecutor();
if (i == 1)
return new NIOWorker(0, executor, l, selectTime, closingTime);
NIOWorker[] w = new NIOWorker[i];
while (i > 0)
w[--i] = new NIOWorker(i, executor, l, selectTime, closingTime);
return new RoundRobin(w);
return RoundRobin.create(w);
}

protected ExecutorService getExecutor() {
Expand Down
18 changes: 16 additions & 2 deletions unknow-server-nio/src/main/java/unknow/server/nio/NIOWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected final void selected(long now, SelectionKey key) throws IOException {
try {
doWrite(co, now);
} catch (Exception e) {
if (co.next != co)
if (co.next != co && !isConnectionIssue(e))
logger.error("failed to write {}", co, e);
startClose(co, now);
key.cancel();
Expand All @@ -180,7 +180,7 @@ protected final void selected(long now, SelectionKey key) throws IOException {
try {
doRead(co, now);
} catch (Exception e) {
if (co.next != co)
if (co.next != co && !isConnectionIssue(e))
logger.error("failed to read {}", co, e);
startClose(co, now);
} finally {
Expand All @@ -189,6 +189,20 @@ protected final void selected(long now, SelectionKey key) throws IOException {
}
}

private static boolean isConnectionIssue(Exception e) {
if (e instanceof java.net.SocketException)
return true;
if (e instanceof java.nio.channels.ClosedChannelException)
return true;

String msg = e.getMessage();
if (msg == null)
return false;

msg = msg.toLowerCase();
return msg.contains("connection reset") || msg.contains("broken pipe") || msg.contains("forcibly closed");
}

private void doRead(NIOConnection co, long now) throws IOException {
int i = 0;
while (i++ < MAX_READS) {
Expand Down
47 changes: 0 additions & 47 deletions unknow-server-nio/src/main/java/unknow/server/nio/NIOWorkers.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collection;

import unknow.server.nio.NIOServer.ConnectionFactory;
Expand Down Expand Up @@ -44,50 +43,4 @@ public interface NIOWorkers {
* @return the workers
*/
Collection<NIOWorker> workers();

/**
* socket will register between workers in round robin
*
* @author unknow
*/
public static class RoundRobin implements NIOWorkers {
private final NIOWorker[] w;
private int o;

/** @param workers the workers */
public RoundRobin(NIOWorker[] workers) {
this.w = workers;
this.o = 0;
}

@Override
public synchronized void register(SocketChannel socket, ConnectionFactory factory) throws IOException {
w[o++].register(socket, factory);
if (o == w.length)
o = 0;
}

@Override
public void start() {
for (int i = 0; i < w.length; i++)
w[i].start();
}

@Override
public void stop() {
for (int i = 0; i < w.length; i++)
w[i].stop();
}

@Override
public void await() {
for (int i = 0; i < w.length; i++)
w[i].await();
}

@Override
public Collection<NIOWorker> workers() {
return Arrays.asList(w);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package unknow.server.nio.worker;

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collection;

import unknow.server.nio.NIOServer.ConnectionFactory;
import unknow.server.nio.NIOWorker;
import unknow.server.nio.NIOWorkers;

/**
* socket will register between workers in round robin
*
* @author unknow
*/
public abstract class AbstractNIOWorkers implements NIOWorkers {
protected final NIOWorker[] w;

/** @param workers the workers */
protected AbstractNIOWorkers(NIOWorker[] workers) {
this.w = workers;
}

protected abstract NIOWorker next();

@Override
public synchronized void register(SocketChannel socket, ConnectionFactory factory) throws IOException {
next().register(socket, factory);
}

@Override
public void start() {
for (int i = 0; i < w.length; i++)
w[i].start();
}

@Override
public void stop() {
for (int i = 0; i < w.length; i++)
w[i].stop();
}

@Override
public void await() {
for (int i = 0; i < w.length; i++)
w[i].await();
}

@Override
public Collection<NIOWorker> workers() {
return Arrays.asList(w);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package unknow.server.nio.worker;

import java.util.concurrent.atomic.AtomicInteger;

import unknow.server.nio.NIOWorker;
import unknow.server.nio.NIOWorkers;

/**
* socket will register between workers in round robin
*
* @author unknow
*/

public final class RoundRobin extends AbstractNIOWorkers implements NIOWorkers {
private final AtomicInteger o;

/** @param workers the workers */
private RoundRobin(NIOWorker[] workers) {
super(workers);
this.o = new AtomicInteger(0);
}

public static NIOWorkers create(NIOWorker[] w) {
if ((w.length & (w.length - 1)) == 0)
return new RoundRobinPow(w);
return new RoundRobin(w);
}

@Override
protected NIOWorker next() {
return w[o.getAndIncrement() % w.length];
}

public static final class RoundRobinPow extends AbstractNIOWorkers implements NIOWorkers {
private final AtomicInteger o;
private final int m;

/** @param workers the workers */
private RoundRobinPow(NIOWorker[] workers) {
super(workers);
this.o = new AtomicInteger(0);
this.m = workers.length - 1;
}

@Override
protected NIOWorker next() {
return w[o.getAndIncrement() & m];
}
}
}
Loading