/** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. * */ publicinterfaceEventLoopGroupextendsEventExecutorGroup{ @Override EventLoop next();
} // 1 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownew IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override publicvoidoperationComplete(Future<Object> future)throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ voidexecute(Runnable command); }
The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.
/** * A {@link Channel} that accepts an incoming connection attempt and creates * its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is * a good example. */ publicinterfaceServerChannelextendsChannel{ // This is a tag interface. }
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup){ super.group(parentGroup); if (childGroup == null) { thrownew NullPointerException("childGroup"); } if (this.childGroup != null) { thrownew IllegalStateException("childGroup set already"); } this.childGroup = childGroup; returnthis; }
.channel(NioServerSocketChannel.class) 方法,接受的是一个 class 对象,一般接受 class 对象大多数与反射有关系
/** * The {@link Class} which is used to create {@link Channel} instances from. * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your * {@link Channel} implementation has no no-args constructor. */ public B channel(Class<? extends C> channelClass){ if (channelClass == null) { thrownew NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
进入 channelFactory 方法
/** * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} * is not working for you because of some more complex needs. If your {@link Channel} implementation * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for * simplify your code. */ @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory){ return channelFactory((ChannelFactory<C>) channelFactory); }
如果有无参数的构造方法推荐使用,这样可以简化代码
Q:为什么必须要有无参数构造方法呢?
A : 一般来说,获取一个实例如下生成,所以必须有无参数构造方法
Class class= Class.forName(className); Object object = class.newInstance(); // 只能调用无参构造函数
我们在来看看 NioServerSocketChannel
A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses NIO selector based implementation to accept new connections.
/** * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. */ public ServerBootstrap childHandler(ChannelHandler childHandler){ if (childHandler == null) { thrownew NullPointerException("childHandler"); } this.childHandler = childHandler; returnthis; }
V get()throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }
文档:
A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.
voidshowSearch(final String target)throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call(){ return searcher.search(target); } }); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
JUC.FutureTask
The FutureTask class is an implementation of Future that implements Runnable, and so may be executed by an Executor. For example, the above construction with submit could be replaced by:
FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call(){ return searcher.search(target); }}); executor.execute(future);
/** * Adds the specified listener to this future. The * specified listener is notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listener is notified immediately. */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/** * Adds the specified listeners to this future. The * specified listeners are notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listeners are notified immediately. */ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/** * Removes the first occurrence of the specified listener from this future. * The specified listener is no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listener is not associated with this future, this method * does nothing and returns silently. */ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/** * Removes the first occurrence for each of the listeners from this future. * The specified listeners are no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listeners are not associated with this future, this method * does nothing and returns silently. */ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 等待Future完成 Future<V> sync()throws InterruptedException;
io.netty.channel public interface ChannelFuture extends Future
The result of an asynchronous Channel I/O operation.
All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a ChannelFuture instance which gives you the information about the result or status of the I/O operation.
A ChannelFuture is either uncompleted or completed. When an I/O operation begins, a new future object is created. The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled because the I/O operation is not finished yet. If the I/O operation is finished either successfully, with failure, or by cancellation, the future is marked as completed with more specific information, such as the cause of the failure. Please note that even failure and cancellation belong to the completed state.
Various methods are provided to let you check if the I/O operation has been completed, wait for the completion, and retrieve the result of the I/O operation. It also allows you to add ChannelFutureListeners so you can get notified when the I/O operation is completed.
推荐使用监听器而不是等待的方法
// BAD - NEVER DO THIS @Override publicvoidchannelRead(ChannelHandlerContext ctx, Object msg){ ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... }
// BAD - NEVER DO THIS Bootstrap b = ...; ChannelFuture f = b.connect(...); f.awaitUninterruptibly(10, TimeUnit.SECONDS); if (f.isCancelled()) { // Connection attempt cancelled by user } elseif (!f.isSuccess()) { // You might get a NullPointerException here because the future // might not be completed yet. f.cause().printStackTrace(); } else { // Connection established successfully }
// GOOD Bootstrap b = ...; // Configure the connect timeout option. b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); ChannelFuture f = b.connect(...); f.awaitUninterruptibly();
// Now we are sure the future is completed. assert f.isDone();
if (f.isCancelled()) { // Connection attempt cancelled by user } elseif (!f.isSuccess()) { f.cause().printStackTrace(); } else { // Connection established successfully }
private ChannelFuture doBind(final SocketAddress localAddress){ final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }