博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊reactor-netty的PoolResources的两种模式
阅读量:6620 次
发布时间:2019-06-25

本文共 14350 字,大约阅读时间需要 47 分钟。

本文主要研究下reactor-netty的PoolResources的两种模式elastic及fixed。

LoopResources与PoolResources

TcpResources是个工具类,可以用来创建loopResources和poolResources。

loopResources

主要是创建NioEventLoopGroup,以及该group下面的workerCount个NioEventLoop(这里涉及两个参数,一个是worker thread count,一个是selector thread count)

  • DEFAULT_IO_WORKER_COUNT:如果环境变量有设置reactor.ipc.netty.workerCount,则用该值;没有设置则取Math.max(Runtime.getRuntime().availableProcessors(), 4)))
  • DEFAULT_IO_SELECT_COUNT:如果环境变量有设置reactor.ipc.netty.selectCount,则用该值;没有设置则取-1,表示没有selector thread
  • DEFAULT_MAX_PENDING_TASKS: 指定NioEventLoop的taskQueue的大小,Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE))
  • NioEventLoop继承了SingleThreadEventLoop,而SingleThreadEventLoop则继承SingleThreadEventExecutor,而其代理的executor是ThreadPerTaskExecutor,rejectHandler是RejectedExecutionHandlers.reject(),默认的taskQueue是LinkedBlockingQueue,其大小为Integer.MAX_VALUE

poolResources

主要是创建channelPools,类型是ConcurrentMap<SocketAddress, Pool>,这里主要研究下它的两种模式elastic及fixed

DefaultPoolResources

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.java

它实现了netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/ChannelPool.java的接口,重点看如下的几个方法:

@Override		public Future
acquire() { return acquire(defaultGroup.next().newPromise()); } @Override public Future
acquire(Promise
promise) { return pool.acquire(promise).addListener(this); } @Override public Future
release(Channel channel) { return pool.release(channel); } @Override public Future
release(Channel channel, Promise
promise) { return pool.release(channel, promise); } @Override public void close() { if(compareAndSet(false, true)) { pool.close(); } }复制代码

这里的几个接口基本是委托为具体的pool来进行操作,其实现主要有SimpleChannelPool及FixedChannelPool。

PoolResources.elastic(SimpleChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.java

/**	 * Create an uncapped {@link PoolResources} to provide automatically for {@link	 * ChannelPool}.	 * 

An elastic {@link PoolResources} will never wait before opening a new * connection. The reuse window is limited but it cannot starve an undetermined volume * of clients using it. * * @param name the channel pool map name * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources elastic(String name) { return new DefaultPoolResources(name, SimpleChannelPool::new); }复制代码

这个是TcpClient.create过程中,默认使用的方法,默认使用的是SimpleChannelPool,创建的是DefaultPoolResources

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.java

static 
T create(T previous, LoopResources loops, PoolResources pools, String name, BiFunction
onNew) { if (previous == null) { loops = loops == null ? LoopResources.create("reactor-" + name) : loops; pools = pools == null ? PoolResources.elastic(name) : pools; } else { loops = loops == null ? previous.defaultLoops : loops; pools = pools == null ? previous.defaultPools : pools; } return onNew.apply(loops, pools); }复制代码

SimpleChannelPool

netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java

/** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. * * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. * */public class SimpleChannelPool implements ChannelPool {    @Override    public final Future
acquire() { return acquire(bootstrap.config().group().next().
newPromise()); } @Override public Future
acquire(final Promise
promise) { checkNotNull(promise, "promise"); return acquireHealthyFromPoolOrNew(promise); } /** * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. * @param promise the promise to provide acquire result. * @return future for acquiring a channel. */ private Future
acquireHealthyFromPoolOrNew(final Promise
promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f, promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; } @Override public final Future
release(Channel channel) { return release(channel, channel.eventLoop().
newPromise()); } @Override public Future
release(final Channel channel, final Promise
promise) { checkNotNull(channel, "channel"); checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); } }); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; } @Override public void close() { for (;;) { Channel channel = pollChannel(); if (channel == null) { break; } channel.close(); } } //......} 复制代码

这个连接池的实现如果没有连接则会创建一个(没有限制),取出连接(连接池使用一个LIFO的Deque来维护Channel)的时候会检测连接的有效性。

PoolResources.fixed(FixedChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.java

/**	 * Default max connection, if -1 will never wait to acquire before opening new	 * connection in an unbounded fashion. Fallback to	 * available number of processors.	 */	int DEFAULT_POOL_MAX_CONNECTION =			Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections",			"" + Math.max(Runtime.getRuntime()			            .availableProcessors(), 8) * 2));	/**	 * Default acquisition timeout before error. If -1 will never wait to	 * acquire before opening new	 * connection in an unbounded fashion. Fallback to	 * available number of processors.	 */	long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(			"reactor.ipc.netty.pool.acquireTimeout",			"" + 45000));	/**	 * Create a capped {@link PoolResources} to provide automatically for {@link	 * ChannelPool}.	 * 

A Fixed {@link PoolResources} will open up to the given max number of * processors observed by this jvm (minimum 4). * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name) { return fixed(name, DEFAULT_POOL_MAX_CONNECTION); } /** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. *

A Fixed {@link PoolResources} will open up to the given max connection value. * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * @param maxConnections the maximum number of connections before starting pending * acquisition on existing ones * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name, int maxConnections) { return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT); } /** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. *

A Fixed {@link PoolResources} will open up to the given max connection value. * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * @param maxConnections the maximum number of connections before starting pending * @param acquireTimeout the maximum time in millis to wait for aquiring * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name, int maxConnections, long acquireTimeout) { if (maxConnections == -1) { return elastic(name); } if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections value must be strictly " + "positive"); } if (acquireTimeout != -1L && acquireTimeout < 0) { throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive"); } return new DefaultPoolResources(name, (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap, handler, checker, FixedChannelPool.AcquireTimeoutAction.FAIL, acquireTimeout, maxConnections, Integer.MAX_VALUE )); }复制代码

最后调用的fixed方法有三个参数,一个是name,一个是maxConnections,一个是acquireTimeout。可以看到这里创建的是FixedChannelPool。

FixedChannelPool

netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/FixedChannelPool.java

/** * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum * number of concurrent connections. */public class FixedChannelPool extends SimpleChannelPool {    @Override    public Future
acquire(final Promise
promise) { try { if (executor.inEventLoop()) { acquire0(promise); } else { executor.execute(new Runnable() { @Override public void run() { acquire0(promise); } }); } } catch (Throwable cause) { promise.setFailure(cause); } return promise; } private void acquire0(final Promise
promise) { assert executor.inEventLoop(); if (closed) { promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); return; } if (acquiredChannelCount < maxConnections) { assert acquiredChannelCount >= 0; // We need to create a new promise as we need to ensure the AcquireListener runs in the correct // EventLoop Promise
p = executor.newPromise(); AcquireListener l = new AcquireListener(promise); l.acquired(); p.addListener(l); super.acquire(p); } else { if (pendingAcquireCount >= maxPendingAcquires) { promise.setFailure(FULL_EXCEPTION); } else { AcquireTask task = new AcquireTask(promise); if (pendingAcquireQueue.offer(task)) { ++pendingAcquireCount; if (timeoutTask != null) { task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); } } else { promise.setFailure(FULL_EXCEPTION); } } assert pendingAcquireCount > 0; } } @Override public Future
release(final Channel channel, final Promise
promise) { ObjectUtil.checkNotNull(promise, "promise"); final Promise
p = executor.newPromise(); super.release(channel, p.addListener(new FutureListener
() { @Override public void operationComplete(Future
future) throws Exception { assert executor.inEventLoop(); if (closed) { // Since the pool is closed, we have no choice but to close the channel channel.close(); promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); return; } if (future.isSuccess()) { decrementAndRunTaskQueue(); promise.setSuccess(null); } else { Throwable cause = future.cause(); // Check if the exception was not because of we passed the Channel to the wrong pool. if (!(cause instanceof IllegalArgumentException)) { decrementAndRunTaskQueue(); } promise.setFailure(future.cause()); } } })); return promise; } @Override public void close() { executor.execute(new Runnable() { @Override public void run() { if (!closed) { closed = true; for (;;) { AcquireTask task = pendingAcquireQueue.poll(); if (task == null) { break; } ScheduledFuture
f = task.timeoutFuture; if (f != null) { f.cancel(false); } task.promise.setFailure(new ClosedChannelException()); } acquiredChannelCount = 0; pendingAcquireCount = 0; FixedChannelPool.super.close(); } } }); } //......}复制代码

这里的acquire,如果当前线程不是在eventLoop中,则放入队列中等待执行acquire0,这里可能撑爆eventLoop的taskQueue,不过其队列大小的值取Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)),默认是Integer.MAX_VALUE。

FixedChannelPool继承了SimpleChannelPool,并重写了acquire、release、close方法。它对获取连接进行了限制,主要有如下几个参数:

  • maxConnections 该值先从系统变量reactor.ipc.netty.pool.maxConnections取(如果设置为-1,表示无限制,回到elastic模式),如果没有设置,则取Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2,即核数与8的最大值的2倍。

  • acquireTimeout 该值先从系统变量reactor.ipc.netty.pool.acquireTimeout取(如果设置为-1,表示立即执行不等待),如果没有设置,则为45000毫秒

  • maxPendingAcquires 这里设置的是Integer.MAX_VALUE

  • AcquireTimeoutAction 这里设置为FixedChannelPool.AcquireTimeoutAction.FAIL,即timeoutTask为

timeoutTask = new TimeoutTask() {                    @Override                    public void onTimeout(AcquireTask task) {                        // Fail the promise as we timed out.                        task.promise.setFailure(TIMEOUT_EXCEPTION);                    }                };复制代码

如果当前连接超过maxConnections,则进入pendingAcquireQueue等待获取连接,而在进入pendingAcquireQueue之前,如果当前等待数量超过了maxPendingAcquires,则返回FULL_EXCEPTION(Too many outstanding acquire operations),这里设置的是Integer.MAX_VALUE,所以不会有这个异常。进入pendingAcquireQueue之后,还有一个acquireTimeout参数,即进入pendingAcquireQueue等待acquireTimeout时间,如果还没有获取到连接则返回TIMEOUT_EXCEPTION(Acquire operation took longer then configured maximum time)。

小结

默认TcpClient创建的PoolResources使用的是elastic模式,即连接池的实现是SimpleChannelPool,默认使用一个LIFO的Deque来维护Channel,如果从连接池取不到连接则会创建新的连接,上限应该是系统设置的能够打开的文件资源数量,超过则报SocketException: Too many open files。PoolResources还提供了FixedChannelPool实现,使用的是fixed模式,即限定了连接池最大连接数及最大等待超时,避免连接创建数量过多撑爆内存或者报SocketException: Too many open files异常。

注意,对于fixed模式,如果reactor.ipc.netty.pool.maxConnections设置为-1,则回退到elastic模式。

doc

转载地址:http://viupo.baihongyu.com/

你可能感兴趣的文章
初学者学习Linux之NFS
查看>>
Rabbitmq学习(一) Rabbitmq初探
查看>>
8月第一周B2B类网站排名:阿里巴巴持续领先
查看>>
IDC评述网:12月下旬国内域名注册商净增量Top10
查看>>
5月第一周全球域名解析商Top15:万网升至第7名
查看>>
架构优化 - 应用,MQ Broker,业务处理分层
查看>>
3月第3周网络安全报告:被篡改.COM网站占74.3%
查看>>
Spring Security之用户名+密码登录
查看>>
java JSplitPane设置比例
查看>>
批量操作Windows域用户
查看>>
shell脚本 接受用户参数 记录一下
查看>>
健脾祛湿的中成药有哪些?
查看>>
IIS下支持下载.exe文件
查看>>
CXF WebService Hello World
查看>>
市场调研报告:企业级信息防泄漏大趋势
查看>>
济南企业短信平台的价格如何?
查看>>
requirejs
查看>>
php printf() 输出格式化的字符串
查看>>
VS2013下的64位与32位程序配置
查看>>
浅谈C中的指针和数组(二)
查看>>