最近有个项目在压测,TPS有点低。做了一些日志异步批量落地和redis数据预热后,TPS稍微提高了点,但还是没达标。
研究了下项目的系统设计和技术栈。
用的分布式服务架构,其中服务端使用Netty。而客户端为了同步获取响应结果,采用了socket短连接模式。
为了使系统的性能有所提升,决定客户端也改用Netty框架并采用长连接的方式。
Netty是Jboss开源的一款非常优秀的异步通信框架。目前很多主流的开源项目有使用Netty开发的,Dubbo/RocketMQ/Apache Synapse等。
但Netty本身似乎没有提供同步等待响应的接口或方法。
使用过Dubbo的都知道我们平常使用的dubbo发布/订阅端就是同步获取响应报文,虽然它本身是基于Netty开发。
为了知道dubbo怎么实现netty响应结果异步转同步,看了点dubbo的源码。
发现dubbo是使用future+lock+condition实现的。这里就不作展开了,有兴趣可以看一下Dubbo的DefaultFuture这个类。
这里记录一下网上看到的一些异步转同步的方法。
首先定义一个业务操作类,纯粹处理业务。
public class TaskService {
public String getNumber() {
return UUID.randomUUID().toString();
}
}
然后写一个线程执行接口,
public interface Executor {
default void supplyAsync() {}
default void supplyAsync(Callback callback) {}
default void supplySync(){}
@FunctionalInterface
public interface Callback{
void call(Object o);
}
}
接下来,写异步操作。
public class AsyncExecutor implements Executor {
private TaskService service = new TaskService();
public void supplyAsync() {
this.supplyAsync(null);
}
public void supplyAsync(Callback callback) {
// 另起线程异步执行。
new Thread(() -> {
System.out.println("running async task...");
try {
// 子线程睡眠,主线程不休息。
Thread.sleep(2000);
} catch (InterruptedException e) {
}
String num = service.getNumber();
// 如果有回调,则调用回调函数。
if (callback != null){
callback.call(num);
}
}).start();
}
}
测试代码:
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
Executor executor;
executor = new AsyncExecutor();
executor.supplyAsync((o) -> {
System.out.println(String.format("get number[%s] by async.", o));
});
System.err.println("\n=============== main thread over ===============\n");
}
输出:
Connected to the target VM, address: '127.0.0.1:53015', transport: 'socket'
=============== start main thread ===============
=============== main thread over ===============
running async task...
get number[de720331-2ca3-4b33-8dee-4a7d26ede037] by async.
Disconnected from the target VM, address: '127.0.0.1:53015', transport: 'socket'
我们可以看到,子线程在睡眠了2秒的时候,主线程并没有等待子线程执行完,而是继续往下执行。
接下来,我们要将异步响应改成同步的。
public abstract class AbstractSyncExecutor implements Executor {
// 异步执行器
public AsyncExecutor executor = new AsyncExecutor();
public void supplySync() {
executor.supplyAsync(this::process);
await();
}
protected void await() {}
protected abstract void process(Object o);
}
第一种:Synchronized + wait + notify
public class SynchronizedExecutor extends AbstractSyncExecutor {
@Override
protected void process(Object o) {
System.out.println(String.format("get number[%s] by synchronized.", o));
synchronized (this) {
notify();
}
}
@Override
public void await() {
synchronized (this) {
try {
// 主线程调用wait阻塞等待,直到回调方法调用notify或者notifyAll唤醒。
wait();
} catch (InterruptedException e) {
}
}
}
}
测试代码
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
Executor executor;
executor = new SynchronizedExecutor();
executor.supplySync();
System.err.println("\n=============== main thread over ===============\n");
}
执行结果
Connected to the target VM, address: '127.0.0.1:53235', transport: 'socket'
=============== start main thread ===============
running async task...
get number[05c5db70-ec3b-4411-b080-21e5b4cddf79] by synchronized.
=============== main thread over ===============
Disconnected from the target VM, address: '127.0.0.1:53235', transport: 'socket'
可以看到内容已经顺序输出了。
第二种:reentrantLock + condition
public class ReentrantLockExecutor extends AbstractSyncExecutor {
private Lock lock = new ReentrantLock();
private Condition condition;
public ReentrantLockExecutor() {
this.condition = lock.newCondition();
}
@Override
protected void process(Object o) {
System.out.println(String.format("get number[%s] by lockAndCondition.", o));
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
@Override
protected void await() {
lock.lock();
try {
// 阻塞等待直到回调函数唤醒
condition.await();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
测试代码
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
Executor executor;
executor = new ReentrantLockExecutor();
executor.supplySync();
System.err.println("\n=============== main thread over ===============\n");
}
执行结果
Connected to the target VM, address: '127.0.0.1:53254', transport: 'socket'
=============== start main thread ===============
running async task...
get number[6ad17c2c-a3ce-4fc6-a03a-fdb7d84acf7e] by lockAndCondition.
=============== main thread over ===============
Disconnected from the target VM, address: '127.0.0.1:53254', transport: 'socket'
第三种:countDownLatch
public class CountDownLatchExecutor extends AbstractSyncExecutor {
// 假设每笔调用都创建一个CountDownLatchExecutor,那么从发起到响应只算一次操作,这里设置为1就可以了。
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void process(Object o) {
System.out.println(String.format("get number[%s] by countDownLatch.", o));
// latch count - 1 变成0, 主线程继续执行
latch.countDown();
}
@Override
protected void await() {
try {
// 阻塞直到latch count=0
latch.await();
} catch (InterruptedException e) {
latch.countDown();
}
}
}
测试代码
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
Executor executor;
executor = new CountDownLatchExecutor();
executor.supplySync();
System.err.println("\n=============== main thread over ===============\n");
}
执行结果
Connected to the target VM, address: '127.0.0.1:53325', transport: 'socket'
=============== start main thread ===============
running async task...
get number[aa51089e-6f74-4ae7-b463-6a04ca73adcf] by countDownLatch.
=============== main thread over ===============
Disconnected from the target VM, address: '127.0.0.1:53325', transport: 'socket'
第四种:CyclicBarrier
public class CyclicBarrierExecutor extends AbstractSyncExecutor {
// 假设每笔调用都创建一个CountDownLatchExecutor
CyclicBarrier barrier = new CyclicBarrier(2);
@Override
protected void process(Object o) {
try {
System.out.println(String.format("get number[%s] by cyclicBarrier.", o));
// await线程数量=2,当前线程被唤醒
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void await() {
try {
// await线程数为1,等待直至所有线程都到达
barrier.await();
} catch (Exception e) {
}
}
}
与CountDownLatch相反。CyclicBarrier是做加操作。当await线程达到初始parties数时,当前线程就被唤醒。我们需要在主线程await一次,回调线程await一次,然后主线程唤醒。即:CyclicBarrier的栅栏数parties设置为2。
测试代码
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
Executor executor;
executor = new CyclicBarrierExecutor();
executor.supplySync();
System.err.println("\n=============== main thread over ===============\n");
}
执行结果
Connected to the target VM, address: '127.0.0.1:53527', transport: 'socket'
=============== start main thread ===============
running async task...
get number[2409d6c4-7bb2-4838-a6ef-3ce42e4a18c7] by cyclicBarrier.
=============== main thread over ===============
Disconnected from the target VM, address: '127.0.0.1:53527', transport: 'socket'
第五种:Future + countDownLatch。
public class SyncFuture implements Future {
private CountDownLatch latch = new CountDownLatch(1);
private T resp;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (this.resp != null) {
return true;
}
return false;
}
@Override
public T get() throws InterruptedException, ExecutionException {
latch.await();
return this.resp;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (latch.await(timeout, unit)) {
return this.resp;
}
return null;
}
public void set(T resp) {
this.resp = resp;
latch.countDown();
}
}
public class FutureExecutor extends AbstractSyncExecutor {
private SyncFuture future;
public FutureExecutor(SyncFuture future) {
this.future = future;
}
@Override
public void process(Object o) {
future.set(o);
}
}
测试代码
public static void main(String[] args) throws Exception {
System.err.println("\n=============== start main thread ===============\n");
SyncFuture future = new SyncFuture<>();
new FutureExecutor(future).supplySync();
// Object resp = futureExecutor.get();
// Object resp = futureExecutor.get(1, TimeUnit.SECONDS);
Object resp = future.get(3, TimeUnit.SECONDS);
System.out.println(String.format("get number[%s] by futureAndCountDownLatch.", resp));
System.err.println("\n=============== main thread over ===============\n");
}
执行结果
Connected to the target VM, address: '127.0.0.1:53590', transport: 'socket'
=============== start main thread ===============
running async task...
get number[942e6c15-1eb2-4572-9a1f-dcb7d262387b] by futureAndCountDownLatch.
=============== main thread over ===============
Disconnected from the target VM, address: '127.0.0.1:53590', transport: 'socket'
前面四种方式都大同小异。主线程阻塞等待,子线程的回调函数里面唤醒主线程。
只有最后一种不太一样。而Dubbo就是使用最后一种方式,只是把其中的countDownLatch换成了condition+lock。