Netty源码解析一:客户端的连接源码分析

    科技2025-09-04  24

    本篇博客不去具体分析Netty的使用,而是在能够使用netty的基础上去进一步分析其源码。 本文主要对客户端连接过程进行分析。

    首先,在客户端建立起与服务器端连接的代码如下:

    public static void main(String[] args) { // 创建一个线程组 NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); // 创建一个用来初始化的bs Bootstrap bootstrap = new Bootstrap(); // 设置bs中的参数 bootstrap.group(eventExecutors) // 初始化创建NioSocketChannel的ChannelFactory .channel(NioSocketChannel.class) // 将匿名内部类的第一个Channelhandler添加进管道中 .handler(new ChannelInitializer<SocketChannel>() { // 重写ChannelInitializer中的方法 @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyClientHandler()); } }); try { // 真正开始创建相应的对象 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { eventExecutors.shutdownGracefully(); } }

    1、NioEventLoopGroup初始化

    由NioEventLoopGroup eventExecutors = new NioEventLoopGroup();开始,先对与NioEventLoopGroup的初始化过程做源码追溯。

    首先进入其父类MultithreadEventLoopGroup中执行静态代码块。

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); public static InternalLogger getInstance(Class<?> clazz) { return getInstance(clazz.getName()); } public static InternalLogger getInstance(String name) { return getDefaultFactory().newInstance(name); } // 利用反射调用父类的构造方法 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // 线程组中默认线程数的设置DEFAULT_EVENT_LOOP_THREADS为Cpu核心数*2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }

    调用MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object…)方法

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // 继续调用 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 省略检测错误的代码 // 创建执行器的数组,用于存放NioEventLoop,数组大小为上述指定的线程数 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 将初始数组元素的EventExecutor类型转变为NioEventLoop类型,可以继续追溯 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 省略不重要代码 } } // 继续追溯 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }

    对于上述代码中的 chooser = chooserFactory.newChooser(children);继续追溯DefaultEventExecutorChooserFactory#newChooser() 方法。

    @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { // 如果线程数是2的平方,则使用PowerOfTwoEventExecutorChooser if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }

    两个Chooser都重写了next() 方法,用于NioEventLoopGroup使用该对象进行NIOEventLoop的选取。

    2、链式调用

    先对于bs的链式调用参数设置进行源码的追溯。

    参数设置中一共包括三个部分:

    设置EventLoopGroup

    指定要创建Channel的类型

    设置处理数据的handler

    (1) group方法分析

    调用AbstractBootstrap#group( ) public B group(EventLoopGroup group) { // 省略检测空添加的代码 this.group = group;// 将设置的NioEventLoopGroup赋值给创建的Bootstrap对象的中属性 return self(); // 设置完成后返回Bootstrap对象 }

    纯粹的用来将group对象设置到bs的类属性this.group中。

    (2)channel() 方法分析

    调用AbstractBootstrap#channel() public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }

    其中重点查看ReflectiveChannelFactory对象,该对象实现了ChannelFactory接口。

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; // 反射创建对象 public ReflectiveChannelFactory(Class<? extends T> clazz) { // 非空检查 ObjectUtil.checkNotNull(clazz, "clazz"); try { // 获得类对象的构造器方法 this.constructor = clazz.getConstructor();// 此时 clazz 就是链式调用时传入的NIOSocketChannel的类对象 } catch (NoSuchMethodException e) { // 省略异常处理代码 } } // 实现的ChannelFactory中的方法 @Override public T newChannel() { try { // 使用获取的类对象的构造器方法创建类对象 return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } }

    而将该实现类对象传入接口的构造方法中,就是返回一个实现了接口的类。

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { // 省略检测代码 this.channelFactory = channelFactory; return self(); }

    channel方法主要是 利用传入的类对象 构造创建该类对象的channelFactory的实现类,并将该实现类赋值给bs的类属性变量this.channelFactory。

    此处工厂类中向外提供的方法应为newChannel() 方法。

    (3)handler() 方法分析

    调用AbstractBootstrap#handler(ChannelHandler) public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); }

    将传入的handler赋值给bs中的this.handler属性。

    此时需要注意的是handler方法要求传入的是一个ChannelHandler对象,但是我们传入的匿名内部类却是一个ChannelInitializer类型的对象。

    .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyClientHandler()); } });

    那么该类究竟有什么玄机呢?首先,去查看ChannelInitilizer类的定义。

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

    该类是实现了ChannelInboundHandlerAdapter,属于ChannelInitializer类型的对象。其次该类中的方法代码为:

    // 要实现的抽象方法 protected abstract void initChannel(C ch) throws Exception; @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); removeState(ctx); } else { // Called initChannel(...) before which is the expected behavior, so just forward the event. ctx.fireChannelRegistered(); } } /** * 客户端建立连接后第一个调用的方法 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // 调用重写的initChannel() 方法,向管道中添加handler后,移除自身。 if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } @SuppressWarnings("unchecked") private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { // 调用重写的方法,去向管道中添加handler initChannel((C) ctx.channel()); } catch (Throwable cause) { } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { // 添加完成将自己从调用链中删除 pipeline.remove(this); } } return true; } return false; }

    可以看到在handler中添加一个ChannelInitializer类型的对象的目的就是,将其作为初始handler,用它来添加其他的handler,添加完成后再将这个初始的handler删除。


    以上为客户端bs的链式调用过程分析,这段代码的主要作用即是初始化bs中的各个属性值。

    group -> this.groupchannel -> this.channelFactoryhandler -> this.handler

    3、启动客户端

    bs的初始化过程完成了之后,设置的channel、handler、NioEventGroup都是如何启动的呢?

    这就要关注ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();,所有的对象都是在此处进行启动执行的。

    (1)channel创建分析

    该方法的源码追溯如下:调用Bootstrap#connect(java.lang.String, int)

    public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { // 省略不重要的校验代码 return doResolveAndConnect(remoteAddress, config.localAddress()); }

    注:这里出现我们并不熟悉的类名 config,这也是bs中的一个属性。

    private final BootstrapConfig config = new BootstrapConfig(this);

    该属性主要用来存储链式调用时设置的配置的参数,比如NioEventLoopGroup、handler、channelFactory以及连接地址等参数。

    重点分析doResolveAndConnect() 方法。

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { // 需要重点分析的方法,此处可以观察一下ChannelFuture和Channel的区别 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); // 判断线程是否执行完成 if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } // 线程执行完成就去实现真正的连接方法 return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); // 没有完全执行完成,添加一个事件监听器,此处就实现了Netty中Future-Listener机制 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }

    对于上述代码中的final ChannelFuture regFuture = initAndRegister();进行分析。

    final ChannelFuture initAndRegister() { Channel channel = null; try { // 该方法仍然在Bs中,此时便使用了链式调用时初始化的channelFactory的newChannel() 方法 进行channel的创建 channel = channelFactory.newChannel(); // 初始化创建的channel,该方法此前还未进行追溯,在此处重点分析 init(channel); } catch (Throwable t) { // 省略异常处理代码 } // 该方法需要重点进行追溯,查看NioEventLoopGroup是如何创建ChannelFuture的,注意此处传入了创建后的Channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

    重点分析channel = channelFactory.newChannel();

    @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }

    此时调用的即是NIOSocketChannel的构造器方法进行channel的创建。

    可以继续追溯一下NIOSocketChannel的构造原理以便于了解创建channel后,拥有了什么。

    追溯NioSocketChannel的源码。 public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); // 被调用 public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } // 继续调用 private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } 此时需对于provider.openSocketChannel()进行追溯,但是SelectorProvider是一个抽象类,openSocketChannel()是一个抽象方法,所以需要去调用其实现类的方法SelectorProviderImpl#openSocketChannel() public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); } SocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); this.fd = Net.socket(true); this.fdVal = IOUtil.fdVal(this.fd); this.state = 0; }

    至此,newSocket(provider)方法执行完成,创建了一个SocketChannel对象。然后使用该类继续去调用重载的构造器方法。

    // 由this(newSocket(provider));继续调用 public NioSocketChannel(SocketChannel socket) { this(null, socket); } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket);// 调用父类中的构造器方法 config = new NioSocketChannelConfig(this, socket.socket()); }

    需要继续调用父类中的构造器方法。io.netty.channel.nio.AbstractNioByteChannel#AbstractNioByteChannel

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ);// 设置监听事件类型为读事件,并继续调用父类中的方法 }

    设置监听事件类型为读事件,并继续调用AbstractNioByteChannel的父类中的方法。AbstractNioChannel#AbstractNioChannel()

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // 创建的Channel默认设置为不阻塞状态 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { // 省略异常处理代码 } }

    继续调用AbstractNioChannel的父类中的方法AbstractChannel#AbstractChannel(io.netty.channel.Channel)

    protected AbstractChannel(Channel parent) { this.parent = parent; // 创建channel的id id = newId(); // 创建一个Unsafe对象 unsafe = newUnsafe(); // 创建一个ChannelPipeline对象 pipeline = newChannelPipeline(); }

    现阶段先追溯到此处,到此处可以得知,创建NIOSocketChannel是对于java的SocketChannel的一个封装,同时在NIOSocketChannel对象中还创建了一个Unsafe对象和一个ChannelPipline对象。

    Unsafe对象创建的分析

    Unsafe对象其实是对Java底层Socket操作的封装对象,是沟通Netty和Java的重要桥梁。

    创建NIOSocketChannel时调用的newUnsafe();方法的实际调用为AbstractNioByteChannel中的newUnsafe:

    @Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }

    创建的NioSocketChannelUnsafe是实现了Unsafe接口的一个对象,在此将Unsafe接口中的方法列举如下,以便于将其和Socket操作相关联。

    ChannelPipline的创建分析

    ChannelPipline的底层实现其实是一个以ChannelHandlerContext为节点的双向链表,主要用来存放handler,一个pipline与一个Channel相关联。

    在以上创建channel的过程代码中,有创建pipline的方法 pipeline = newChannelPipeline();,对该方法进行追溯。

    protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); // 将创建的channel对象传入 }

    调用DefaultChannelPipeline#DefaultChannelPipeline的构造方法

    protected DefaultChannelPipeline(Channel channel) { // 将创建的channel作为pipline的一个属性进行赋值 this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); // 管道初始被创建时有头结点和尾结点 tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }

    需要对于头结点和尾结点的构造由一个了解。、

    即对于

    tail = new TailContext(this); head = new HeadContext(this);

    进行追溯

    TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, TailContext.class); setAddComplete(); } HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); }

    以上两个handler进行构建时都会去调用父类构造方法。

    // super去调用父类的构造方法 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); // 设置处理器的inbound和outbound类型 // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }

    通过以上的Channel的创建,已经创建了一个包含Unsafe对象和pipline对象的NIOSocketChannel对象。

    重点分析init(channel);方法

    该方法对已经创建的channel对象做进行一步的初始化操作。

    @Override @SuppressWarnings("unchecked") void init(Channel channel) { // 从创建的NioChannel中获取管道,该方法无需追溯 ChannelPipeline p = channel.pipeline(); // 将链式调用时初始化的handler参数添加进行创建的管道中 p.addLast(config.handler()); setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_》ARRAY)); }

    追溯一下config.handler(),查看设置好的handler添加操作是如何进行的。

    io.netty.bootstrap.AbstractBootstrapConfig#handler

    public final ChannelHandler handler() { return bootstrap.handler(); } final ChannelHandler handler() { return handler; }

    (2) channel注册

    重点分析config().group().register(channel);

    首先需要知道的是config().group()就是已经创建的NIoEventLoopGroup,因为它已经通过链式调用的方法设置到了sb中所以可以获取到。

    调用MultithreadEventLoopGroup#register(io.netty.channel.Channel)方法: @Override public ChannelFuture register(Channel channel) { // next() 方法用来在NioEventLoopGroup中选取一个NioEventLoop,具体的调用逻辑参考下面的代码 // 重点追溯一下NioEventLoop的register方法 return next().register(channel); } @Override public EventLoop next() { return (EventLoop) super.next(); } @Override public EventExecutor next() { return chooser.next(); } io.netty.channel.SingleThreadEventLoop#register(Channel) @Override public ChannelFuture register(Channel channel) { // 此处将注册的Channel转换成了ChannelPromise return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // channel.unsafe()是为了调用Java底层的socket操作 promise.channel().unsafe().register(this, promise); return promise; } io.netty.channel.AbstractChannel.AbstractUnsafe#register @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略校验代码 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { // 传入的是包装了channel的Channelpromise register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { // 省略异常处理代码 } } } private void register0(ChannelPromise promise) { try { // 标识是否为第一次注册 boolean firstRegistration = neverRegistered; // 开始注册 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 将创建的通道注册到NIoEventLoop上的selector上去 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // 省略异常处理过程 } } }

    config().group().register(channel);方法主要用来将创建好的Channel绑定到创建的NIOEventLoopGroup中的NIOEventLoop中的selector上去。


    (3) 连接分析

    ​ 至此io.netty.bootstrap.Bootstrap#doResolveAndConnect() 方法中的final ChannelFuture regFuture = initAndRegister();代码分析完成,获取到了Channel,并且channel中包含了pipline、unsafe,并将该channel注册到了selector上。接下来需要将客户端的channel连接到服务器端。

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } } private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ...... doConnect(resolveFuture.getNow(), localAddress, promise); ..... return promise; } // Wait until the name resolution is finished. resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; } private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { // nio的连接函数 channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }

    调用io.netty.channel.AbstractChannel#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)

    @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }

    调用io.netty.channel.DefaultChannelPipeline#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)方法

    @Override public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }

    调用io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)

    @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return connect(remoteAddress, null, promise); } @Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // 查找下一个outBound类型的handler final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { // 重点追溯该方法 next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null, false); } return promise; }

    io.netty.channel.AbstractChannelHandlerContext#invokeConnect

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }

    io.netty.channel.DefaultChannelPipeline.HeadContext#connect

    @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { // 看到使用unsafe进行调用就可以知道调用了底层的javaSocket函数进行连接了 unsafe.connect(remoteAddress, localAddress, promise); }

    io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect()

    @Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; }

    io.netty.channel.socket.nio.NioSocketChannel#doConnect

    @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } } private void doBind0(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { SocketUtils.bind(javaChannel(), localAddress); } else { SocketUtils.bind(javaChannel().socket(), localAddress); } }
    Processed: 0.010, SQL: 9