天天看点

java异步转同步_java线程异步转同步。

最近有个项目在压测,TPS有点低。做了一些日志异步批量落地和redis数据预热后,TPS稍微提高了点,但还是没达标。

研究了下项目的系统设计和技术栈。

用的分布式服务架构,其中服务端使用Netty。而客户端为了同步获取响应结果,采用了socket短连接模式。

为了使系统的性能有所提升,决定客户端也改用Netty框架并采用长连接的方式。

Netty是Jboss开源的一款非常优秀的异步通信框架。目前很多主流的开源项目有使用Netty开发的,Dubbo/RocketMQ/Apache Synapse等。

但Netty本身似乎没有提供同步等待响应的接口或方法。

使用过Dubbo的都知道我们平常使用的dubbo发布/订阅端就是同步获取响应报文,虽然它本身是基于Netty开发。

为了知道dubbo怎么实现netty响应结果异步转同步,看了点dubbo的源码。

发现dubbo是使用future+lock+condition实现的。这里就不作展开了,有兴趣可以看一下Dubbo的DefaultFuture这个类。

这里记录一下网上看到的一些异步转同步的方法。

首先定义一个业务操作类,纯粹处理业务。

public class TaskService {

public String getNumber() {

return UUID.randomUUID().toString();

}

}

然后写一个线程执行接口,

public interface Executor {

default void supplyAsync() {}

default void supplyAsync(Callback callback) {}

default void supplySync(){}

@FunctionalInterface

public interface Callback{

void call(Object o);

}

}

接下来,写异步操作。

public class AsyncExecutor implements Executor {

private TaskService service = new TaskService();

public void supplyAsync() {

this.supplyAsync(null);

}

public void supplyAsync(Callback callback) {

// 另起线程异步执行。

new Thread(() -> {

System.out.println("running async task...");

try {

// 子线程睡眠,主线程不休息。

Thread.sleep(2000);

} catch (InterruptedException e) {

}

String num = service.getNumber();

// 如果有回调,则调用回调函数。

if (callback != null){

callback.call(num);

}

}).start();

}

}

测试代码:

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

Executor executor;

executor = new AsyncExecutor();

executor.supplyAsync((o) -> {

System.out.println(String.format("get number[%s] by async.", o));

});

System.err.println("\n=============== main thread over ===============\n");

}

输出:

Connected to the target VM, address: '127.0.0.1:53015', transport: 'socket'

=============== start main thread ===============

=============== main thread over ===============

running async task...

get number[de720331-2ca3-4b33-8dee-4a7d26ede037] by async.

Disconnected from the target VM, address: '127.0.0.1:53015', transport: 'socket'

我们可以看到,子线程在睡眠了2秒的时候,主线程并没有等待子线程执行完,而是继续往下执行。

接下来,我们要将异步响应改成同步的。

public abstract class AbstractSyncExecutor implements Executor {

// 异步执行器

public AsyncExecutor executor = new AsyncExecutor();

public void supplySync() {

executor.supplyAsync(this::process);

await();

}

protected void await() {}

protected abstract void process(Object o);

}

第一种:Synchronized + wait + notify

public class SynchronizedExecutor extends AbstractSyncExecutor {

@Override

protected void process(Object o) {

System.out.println(String.format("get number[%s] by synchronized.", o));

synchronized (this) {

notify();

}

}

@Override

public void await() {

synchronized (this) {

try {

// 主线程调用wait阻塞等待,直到回调方法调用notify或者notifyAll唤醒。

wait();

} catch (InterruptedException e) {

}

}

}

}

测试代码

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

Executor executor;

executor = new SynchronizedExecutor();

executor.supplySync();

System.err.println("\n=============== main thread over ===============\n");

}

执行结果

Connected to the target VM, address: '127.0.0.1:53235', transport: 'socket'

=============== start main thread ===============

running async task...

get number[05c5db70-ec3b-4411-b080-21e5b4cddf79] by synchronized.

=============== main thread over ===============

Disconnected from the target VM, address: '127.0.0.1:53235', transport: 'socket'

可以看到内容已经顺序输出了。

第二种:reentrantLock + condition

public class ReentrantLockExecutor extends AbstractSyncExecutor {

private Lock lock = new ReentrantLock();

private Condition condition;

public ReentrantLockExecutor() {

this.condition = lock.newCondition();

}

@Override

protected void process(Object o) {

System.out.println(String.format("get number[%s] by lockAndCondition.", o));

lock.lock();

try {

condition.signal();

} finally {

lock.unlock();

}

}

@Override

protected void await() {

lock.lock();

try {

// 阻塞等待直到回调函数唤醒

condition.await();

} catch (Exception e) {

} finally {

lock.unlock();

}

}

}

测试代码

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

Executor executor;

executor = new ReentrantLockExecutor();

executor.supplySync();

System.err.println("\n=============== main thread over ===============\n");

}

执行结果

Connected to the target VM, address: '127.0.0.1:53254', transport: 'socket'

=============== start main thread ===============

running async task...

get number[6ad17c2c-a3ce-4fc6-a03a-fdb7d84acf7e] by lockAndCondition.

=============== main thread over ===============

Disconnected from the target VM, address: '127.0.0.1:53254', transport: 'socket'

第三种:countDownLatch

public class CountDownLatchExecutor extends AbstractSyncExecutor {

// 假设每笔调用都创建一个CountDownLatchExecutor,那么从发起到响应只算一次操作,这里设置为1就可以了。

private CountDownLatch latch = new CountDownLatch(1);

@Override

public void process(Object o) {

System.out.println(String.format("get number[%s] by countDownLatch.", o));

// latch count - 1 变成0, 主线程继续执行

latch.countDown();

}

@Override

protected void await() {

try {

// 阻塞直到latch count=0

latch.await();

} catch (InterruptedException e) {

latch.countDown();

}

}

}

测试代码

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

Executor executor;

executor = new CountDownLatchExecutor();

executor.supplySync();

System.err.println("\n=============== main thread over ===============\n");

}

执行结果

Connected to the target VM, address: '127.0.0.1:53325', transport: 'socket'

=============== start main thread ===============

running async task...

get number[aa51089e-6f74-4ae7-b463-6a04ca73adcf] by countDownLatch.

=============== main thread over ===============

Disconnected from the target VM, address: '127.0.0.1:53325', transport: 'socket'

第四种:CyclicBarrier

public class CyclicBarrierExecutor extends AbstractSyncExecutor {

// 假设每笔调用都创建一个CountDownLatchExecutor

CyclicBarrier barrier = new CyclicBarrier(2);

@Override

protected void process(Object o) {

try {

System.out.println(String.format("get number[%s] by cyclicBarrier.", o));

// await线程数量=2,当前线程被唤醒

barrier.await();

} catch (Exception e) {

e.printStackTrace();

}

}

@Override

protected void await() {

try {

// await线程数为1,等待直至所有线程都到达

barrier.await();

} catch (Exception e) {

}

}

}

与CountDownLatch相反。CyclicBarrier是做加操作。当await线程达到初始parties数时,当前线程就被唤醒。我们需要在主线程await一次,回调线程await一次,然后主线程唤醒。即:CyclicBarrier的栅栏数parties设置为2。

测试代码

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

Executor executor;

executor = new CyclicBarrierExecutor();

executor.supplySync();

System.err.println("\n=============== main thread over ===============\n");

}

执行结果

Connected to the target VM, address: '127.0.0.1:53527', transport: 'socket'

=============== start main thread ===============

running async task...

get number[2409d6c4-7bb2-4838-a6ef-3ce42e4a18c7] by cyclicBarrier.

=============== main thread over ===============

Disconnected from the target VM, address: '127.0.0.1:53527', transport: 'socket'

第五种:Future  + countDownLatch。

public class SyncFuture implements Future {

private CountDownLatch latch = new CountDownLatch(1);

private T resp;

@Override

public boolean cancel(boolean mayInterruptIfRunning) {

return false;

}

@Override

public boolean isCancelled() {

return false;

}

@Override

public boolean isDone() {

if (this.resp != null) {

return true;

}

return false;

}

@Override

public T get() throws InterruptedException, ExecutionException {

latch.await();

return this.resp;

}

@Override

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

if (latch.await(timeout, unit)) {

return this.resp;

}

return null;

}

public void set(T resp) {

this.resp = resp;

latch.countDown();

}

}

public class FutureExecutor extends AbstractSyncExecutor {

private SyncFuture future;

public FutureExecutor(SyncFuture future) {

this.future = future;

}

@Override

public void process(Object o) {

future.set(o);

}

}

测试代码

public static void main(String[] args) throws Exception {

System.err.println("\n=============== start main thread ===============\n");

SyncFuture future = new SyncFuture<>();

new FutureExecutor(future).supplySync();

// Object resp = futureExecutor.get();

// Object resp = futureExecutor.get(1, TimeUnit.SECONDS);

Object resp = future.get(3, TimeUnit.SECONDS);

System.out.println(String.format("get number[%s] by futureAndCountDownLatch.", resp));

System.err.println("\n=============== main thread over ===============\n");

}

执行结果

Connected to the target VM, address: '127.0.0.1:53590', transport: 'socket'

=============== start main thread ===============

running async task...

get number[942e6c15-1eb2-4572-9a1f-dcb7d262387b] by futureAndCountDownLatch.

=============== main thread over ===============

Disconnected from the target VM, address: '127.0.0.1:53590', transport: 'socket'

前面四种方式都大同小异。主线程阻塞等待,子线程的回调函数里面唤醒主线程。

只有最后一种不太一样。而Dubbo就是使用最后一种方式,只是把其中的countDownLatch换成了condition+lock。