laitimes

JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused

author:Java coders take you to learn programming
  • 1. Overview
  • 2. Why Introduced ReducableFuture
  • 3. Features
  • 4. Source code tracking
  • 4. Summary
JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused

1. Overview

CompletableFuture is an implementation class introduced in jdk1.8. Expanded Future and ProfileStage, which is a Feature that can trigger some actions during the task completion phase. Simply put, asynchronous callbacks can be implemented.

Based on Spring Boot + MyBatis Plus + Vue & Element, the background management system + user mini program supports RBAC dynamic permissions, multi-tenancy, data permissions, workflows, three-party login, payment, SMS, marketplace and other functions.

Project Address: https://github.com/YunaiV/ruoyi-vue-pro

2. Why Introduced ReducableFuture

For jdk1.5's Future, although it provides the ability to process tasks asynchronously, the way to obtain the result is very inelegant, and it still needs to be blocked (or rotated). How can I avoid blocking? In fact, it is a registration callback.

The industry combines the observer pattern to implement asynchronous callbacks. That is, to notify the observer when the task is completed. Netty's ChannelFuture, for example, can implement asynchronous result processing by registering a listener.

Netty's ChannelFuture

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}
           

Register the listener via the addListener method. If the task completes, the notifyListeners notification is invoked.

CompletableFuture extends Future, introduces functional programming, and processes the result through callbacks.

Based on the idea of microservices, it is built in the B2C e-commerce scenario of project practice. The core technology stack is Spring Boot + Dubbo. In the future, Spring Cloud Alibaba will be re-formed.

Project Address: https://github.com/YunaiV/onemall

3. Features

CompletableFuture's functionality is mainly reflected in his CompleteStage.

Functions such as the following can be implemented

  • ThenCompose
  • thenCombine
  • thenAccept
  • ThenRun.
  • Consumption with return (thenApply)

The difference between consumption and operation:

Consume the results of the execution. Run is just running a specific task. Specific other functions can be viewed according to your needs.

CompletableFuture enables chained calls with the help of the MethodStage method. And you can choose synchronously or asynchronously.

Here's a simple example to experience his function.

public static void thenApply() {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
        try {
            //  Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("supplyAsync " + Thread.currentThread().getName());
        return "hello";
    }, executorService).thenApplyAsync(s -> {
        System.out.println(s + "world");
        return "hhh";
    }, executorService);
    cf.thenRunAsync(() -> {
        System.out.println("ddddd");
    });
    cf.thenRun(() -> {
        System.out.println("ddddsd");
    });
    cf.thenRun(() -> {
        System.out.println(Thread.currentThread());
        System.out.println("dddaewdd");
    });
}
           

Execution results

supplyAsync pool-1-thread-1
helloworld
ddddd
ddddsd
Thread[main,5,main]
dddaewdd
           

According to the results, we can see that the corresponding tasks will be executed in an orderly manner.

note:

If it is synchronous, cf.thenRun is executed. His thread of execution may be a main thread, or it may be a thread that executes the source task. If the thread executing the source task finished executing the task before the main call. Then the cf.thenRun method will be called by the main thread.

Here's a point where there are multiple dependent tasks for the same task:

  • If these dependent tasks are executed synchronously. Then if these tasks are executed by the current calling thread (main), it is ordered, and if the thread that is executing the source task is executed, then it will be executed in reverse order. Because the internal task data structure is LIFO.
  • If these dependent tasks are all executed asynchronously, then he executes the tasks through the asynchronous thread pool. The order in which tasks are executed is not guaranteed.

The above conclusion is obtained by reading the source code. Let's dive into the source code.

4. Source code tracking

Create a CompletedFuture

There are many ways to create it, and you can even just new one. Let's take a look at the supplyAsync asynchronously created method.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    if (e == null) throw new NullPointerException();
    return e;
}
           

Enter the reference Supply, a function with a return value. If it is an asynchronous method and an executor is passed, the incoming executor is used to execute the task. Otherwise, the public ForkJoin parallel thread pool is used, if parallelism is not supported, create a new thread to execute.

Here we need to note that ForkJoin performs tasks through a daemon thread. So there must be a non-guardian thread.

AsyncSupplyStage method

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}
           

Here a CompletedFuture is created for return.

Then construct an AsyncSupply and pass in the created CompletedFuture as a construct parameter.

Well, the execution of the task relies entirely on AsyncSupply.

AsyncSupply#run

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                d.completeValue(f.get());
            } catch (Throwable ex) {
                d.completeThrowable(ex);
            }
        }
        d.postComplete();
    }
}
           
  1. The method calls The Supplier's get method. and set the result to The ReducableFuture. We should be aware that these operations are called in asynchronous threads.
  2. The d.postComplete method is to notify the task execution to complete. Triggering the execution of subsequent dependent tasks is the key point in implementing the CompleteStage.

Before looking at the postComplete method, let's look at the logic for creating dependent tasks.

thenAcceptAsync method

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.uniAccept(this, f, null)) {
        # 1
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}
           

Mentioned above. ThenAcceptAsync is used to consume PackagedFuture. The method calls uniAcceptStage.

uniAcceptStage logic:

  1. Construct a CompletedFuture, primarily for chained calls.
  2. If it is an asynchronous task, it is returned directly. Because the source task is triggered after the end of the asynchronous thread to execute the corresponding logic.
  3. If it is a synchronous task (e==null), the d.uniAccept method is called. The logic of this method here is: if the source task is completed, call f and return true. Otherwise enter the if code block (Mark 1).
  4. If it is an asynchronous task, go directly to if (Mark 1).

Mark1 Logic:

  1. Construct a UniAccept and put it on the stack. Optimistic lock implementation is implemented here via CAS.
  2. Call the c.tryFire method.
final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniAccept(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    return d.postFire(a, mode);
}
           
  1. The d.uniAccept method is called. In fact, the method determines whether the source task is completed, and if it completes, it executes the dependent task, otherwise it returns false.
  2. If the dependent task has already been executed, calling d.postFire is mainly the follow-up processing of Fire. The logic is different depending on the pattern.

Here is a brief explanation, in fact, the mode has synchronous asynchronous, and iterative. Iteration avoids infinite recursion.

Here we emphasize the third parameter of the d.uniAccept method.

If it is an asynchronous call (mode>0), null is passed in. Otherwise this is passed in.

Look at the code below for the difference. c does not call the c.claim method for null.

try {
    if (c != null && !c.claim())
        return false;
    @SuppressWarnings("unchecked") S s = (S) r;
    f.accept(s);
    completeNull();
} catch (Throwable ex) {
    completeThrowable(ex);
}

final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        if (e == null)
            return true;
        executor = null; // disable
        e.execute(this);
    }
    return false;
}
           

The claim method is logical:

  • If the asynchronous thread is null. Explain the synchronization, then directly return true. Finally, the upper-level function calls f.accept(s) to execute the task synchronously.
  • If the asynchronous thread is not null, then use the asynchronous thread to execute this.

The run task for this is as follows. That is, the tryFire method is called synchronously on the asynchronous thread. To achieve its purpose of being executed by an asynchronous thread.

public final void run(){ 
   tryFire(ASYNC); 
}
           

After reading the above logic, we basically understand the logic of dependent tasks.

In fact, it is to first determine whether the source task is completed, and if it is completed, the task has been executed directly in the corresponding thread (if it is synchronous, it is processed in the current thread, otherwise it is processed in an asynchronous thread)

If the task is not completed, it is returned directly, because after the task is completed, it will trigger the call of dependent tasks through postComplete.

PostComplete method

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
           

It is called after the source task completes.

In fact, the logic is very simple, that is, iterates over the dependent tasks of the stack. Call the h.tryFire method. NESTED is all about avoiding recursive dead loops. Because FirePost will call postComplete. If it is NESTED, it is not called.

The contents of the stack are actually added when dependent tasks are created. We have already mentioned it above.

4. Summary

Basically the above source code has been analyzed logic.

Since it involves operations such as asynchrony, we need to sort it out (here for fully asynchronous tasks):

  1. After successfully creating a CompletedFuture, the corresponding task will be executed through an asynchronous thread.
  2. If the PackagedFuture also has dependent tasks (asynchronous), the tasks are added to the PackagedFuture stack and saved. for subsequent completions to execute dependent tasks.
Of course, creating a dependent task doesn't just add it to the stack. If the source task has finished executing at the time the dependent task was created, the current thread triggers the asynchronous thread that depends on the task to process the dependent task directly. It also tells the stack that other dependent task source tasks have been completed.

The main thing is to consider the reuse of code. So the logic is relatively difficult to understand.

The postComplete method is called by the source task thread after it has finished executing the source task. It may also be called after a dependent task thread.

The main way to perform dependent tasks is to rely on the TryFire method. Because this method can be triggered by many different types of threads, the logic is also a little winding. (Other dependent task threads, source task threads, currently dependent task threads)

  • If it is a currently dependent task thread, the dependent task is executed and other dependent tasks are notified.
  • If it is a source task thread, and other dependent task threads, the task is converted to a dependent thread for execution. There is no need to notify other dependent tasks to avoid dead recursion.

I have to say that Doug Lea's coding is really art. The reusability of the code is now logical.

Interviewer: Why is database connection expensive, and where are the resources consumed?

2022-04-11 19:19 · Mini Program builds a station

This article mainly wants to explore the details of connecting to a database, especially in web applications to use the database to connect to the pool, so as not to re-establish the connection every time a request is sent. For this question, the answer is the same, establishing a database connection is time-consuming, but how much time is this time consuming, and in what ways is it time consumed?

This article takes connecting to a MySQL database as an example, because the MySQL database is open source and its communication protocol is public, so we are able to analyze in detail the entire process of establishing a connection.

In this article, the analysis of resource consumption is mainly concentrated on the network, of course, resources also include memory, CPU and other computing resources, the programming language used is Java, but it is not excluded that the programming language will also have a certain impact.

First, let's take a look at the Java code that connects to the database, as follows:

Class.forName("com.mysql.jdbc.Driver");

String name = "xttblog2";
String password = "123456";
String url = "jdbc:mysql://172.16.100.131:3306/xttblog2";
Connection conn = DriverManager.getConnection(url, name, password);
// 之后程序终止,连接被强制关闭
           

The entire connection establishment process is then analyzed via "Wireshark" as follows:

JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused

Wireshark grabs the bag

In the connection process shown in the figure above, it can be seen that MySQL's communication protocol is based on the TCP transport protocol, and the protocol is a binary protocol, not a text protocol similar to HTTP, where the process of establishing a connection is as follows:

  • Step 1: Establish a TCP connection, achieved through a three-way handshake;
  • Step 2: The server sends the client "handshake message", and the client responds to the handshake message;
  • Step 3: The client "sends the authentication package" for user authentication, after successful authentication, the server returns an OK response, and then starts executing commands;

After the user authentication is successful, some connection variables are set, such as character set, whether the transaction is automatically committed, etc., during which there will be multiple data interactions. After you have completed these steps, you will not perform operations such as real data queries and updates.

In the test of this article, only 5 lines of code were used to establish a connection, but no operation was performed through the connection, so after the program was executed, the connection was not closed through Connection.close(), but because the program was executed, the process was terminated, resulting in an abnormal closure of the connection to the database, so the RST message of TCP would appear at the end. In this simplest code, no additional connection properties are set, so the time spent setting properties can be considered minimal (in fact, although we did not set any properties, the driver still set the character set, transaction autocommit, etc., depending on the specific driver implementation), so the time used for the entire connection can be considered to be the minimum. However, it can be seen from the statistics that when the last TCP RST packet is not included (because the message does not require the server to return any response), but there are still "7" round trips between the client and the server, "that is, to complete a connection, it can be considered that the data needs to go back and forth between the client and the server at least 7 times", from the time point of view, from the beginning of the TCP three handshakes, to the final connection forced disconnection (excluding the last RST packet), a total of the cost:

10.416042 - 10.190799 = 0.225243s = **225.243ms**!!!
           

This means that it takes 225ms to establish a database connection, which can be considered the least, of course, "the time spent may be affected by network conditions, database server performance, and whether the application code is efficient", but here is just the simplest example, which is enough to illustrate the problem!

Since the above program terminated abnormally, but in a normal application, the connection closure is generally done through Connection.close(), the code is as follows:

Class.forName("com.mysql.jdbc.Driver");

String name = "shine_user";
String password = "123";
String url = "jdbc:mysql://172.16.100.131:3306/clever_mg_test";
Connection conn = DriverManager.getConnection(url, name, password);
conn.close();
           

In this case, the situation changes, mainly reflected in the disconnection from the database connection, as shown in the following figure:

JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused

Network grab packets

  • Step 1: At this time, in the MySQL communication protocol stage, the client sends a request to close the connection without waiting for a response from the server;
  • Step 2: TCP disconnects, 4 waves to complete the disconnection;

Here is the complete completion from the establishment of the database connection to the shutdown, the whole process took:

747.284311 - 747.100954 = 0.183357s = 183.357ms
           

There may also be an impact on the network condition here, which is less than the above 225ms, but it is also almost at the level of 200ms.

So the problem is, imagine this scenario, for a daily active 20,000 website, assuming that each user will only send 5 requests, then a day is 100,000 requests, for the establishment of database connections, we conservatively calculate as 150ms, then the time spent in a day to establish a database connection is (not including performing queries and update operations):

100000 * 150ms = 15000000ms = 15000s = 250min = 4.17h
           

That is to say, the time spent on establishing database connections every day has reached "4 hours", so database connection pooling is a must, and when the daily activity increases, the use of database connection pool alone can not fully guarantee that your service can run normally, and other solutions need to be considered:

  • cache
  • Precompilation of SQL
  • Load balancing
  • ……

Of course, this is not the main content of this article, "the core idea I want to expound in this article is only one, database connections are really time-consuming, so don't make connections frequently" .

I spent 2 months to sort out a set of JAVA development technical data, covering Java basics, distributed, microservices and other mainstream technical materials, including large factory face experience, learning notes, source code handouts, project practice, and explanatory videos.

JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused
JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused
JD Side: Tell us about the implementation principle and usage scenario of CompletableFuture? I was confused

Hope to help some friends who want to improve their abilities through self-study, get information, scan the code and pay attention to it

Remember to forward + follow + private message

Private Message Reply【2022 Interview Information】

Receive more learning materials