Author: JD Health Zhang Na
1. The significance and challenges of concurrent programming
The meaning of concurrent programming is to make full use of each core of the processor to achieve the highest processing performance, which can make the program run faster. In order to improve the computing rate, the processor has also made a series of optimizations, such as:
1. Hardware upgrade: In order to balance the rate difference between the high-speed memory and the memory in the CPU by orders of magnitude and improve the overall performance, the traditional hardware memory architecture of multi-level cache is introduced to solve the problem, which brings the problem that the data exists in the cache and the main memory at the same time, and the cache consistency problem needs to be solved.
2. Processor optimization: mainly includes compiler reordering, instruction-level reordering, and memory system reordering. Through the reordering of three levels: single-threaded semantics, instruction-level parallel overlapping execution, and buffer loading and storage, the execution of instructions is reduced, thereby improving the overall running speed. The problem is that in a multi-threaded environment, the compiler and CPU instructions cannot recognize the data dependencies between multiple threads, which affects the program execution results.
The benefits of concurrent programming are enormous, but writing a thread-safe and efficient code requires managing operational access to mutable shared state, taking into account memory consistency, processor optimization, and instruction reordering. For example, when we use multiple threads to operate on the value of the same object, the value will be changed and the value will be out of sync, and the result may be very different from the theoretical value, and the object is not thread-safe. When multiple threads access a piece of data, the computation logic always behaves correctly regardless of the scheduling method used by the runtime environment or how the threads alternate executions, then the object is said to be thread-safe. Therefore, how to ensure thread safety in concurrent programming is an easily overlooked problem and a challenge.
Therefore, why there is a thread-safe problem, we must first understand two key issues:
1. How threads communicate with each other, that is, what mechanism is used by threads to exchange information.
2. How the threads are synchronized, that is, how the program controls the order of occurrence between different threads.
2. Java concurrent programming
Java concurrency uses a shared memory model, and communication between Java threads is always implicit, and the entire communication process is completely transparent to the programmer.
2.1 Java Memory Model
In order to balance the programmer's ability to maximize memory visibility (with more constraints on the compiler and processing) with improving computational performance (with as few constraints as possible on the compiler processor), JAVA defines the Java Memory Model (JMM), which stipulates that the compiler and processor can be optimized as long as the program execution result is not changed. Therefore, the main problem that JMM solves is to provide memory visibility guarantees by developing inter-thread communication specifications.
The JMM structure is shown in the following figure:
For shared variables, JMM specifies how and when a thread can see the value of a shared variable modified by other threads, and how to access the shared variable synchronously when necessary.
To control the interaction between working and main memory, the following specifications are defined:
•All variables are stored in Main Memory.
•Each thread has a private Local Memory, which stores a copy of the thread's read/write shared variables.
•All operations on variables by threads must be performed in local memory, not directly read and write to main memory.
•Different threads cannot directly access each other's local memory variables.
Eight operations are defined in the implementation:
1.lock: acts on the main memory, identifies the variable as a thread-exclusive state.
2.unlock: acts on the main memory to remove the exclusive state.
3.read: Functions as the main memory, transferring the value of a variable from the main memory to the working memory of the thread.
4.load: Acts on the working memory, and puts the variable value passed from the read operation into the variable copy of the working memory.
5.use: Functions as working memory, passing the value of a variable in working memory to the execution engine.
6.assign: A variable that acts as working memory, assigning a value received from the execution engine to working memory.
7.store: A variable that acts on working memory, and transfers the value of a variable in working memory to main memory.
8.write: Acting on the variables in the main memory, putting the value of the variables transmitted by the store operation into the variables in the main memory.
These actions meet the following principles:
•One of the read and load, store and write operations is not allowed to appear separately.
•Before performing an unlock operation on a variable, the variable must first be synchronized to the main memory (store and write operations).
2.2 Concurrency Keywords in Java
Based on the above rules, Java provides keywords such as volatile and synchronized to ensure thread safety, and the basic principle is to solve the concurrency problem by limiting processor optimization and using memory barriers. At the variable level, volatile is used to declare variables of any type, which are as atomic as basic data type variables and reference type variables; If the application requires a larger range of atomicity guarantees, synchronous block technology is required. The Java memory model provides lock and unlock operations to meet this need. The virtual machine provides the bytecode directives monitorenter and monitorexist to implicitly use these two operations, which are reflected in the Java code as the synchronized block keyword.
What these two words do: volatile only guarantees atomicity of read/write to a single volatile variable, while the mutexclusive execution of locks ensures that the execution of the entire critical block code is atomic. Functionally, locks are more powerful than volatile, and volatile has an advantage in scalability and execution performance.
2.3 Concurrent Containers and Utility Classes in Java
2.3.1 CopyOnWriteArrayList
CopyOnWriteArrayList will add a reentrant lock when operating an element, which ensures that the write operation is thread-safe, but every time an element is added or deleted, a new array needs to be copied, which is a large waste of space.
public E get(int index) {
return get(getArray(), index);
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
2.3.2 Collections.synchronizedList(new ArrayList<>());
In this way, a layer of synchronize control is added to the operation outsourcing of the list. It should be noted that when traversing the List, you have to manually do the overall synchronization control.
public void add(int index, E element) {
// SynchronizedList 就是在 List的操作外包加了一层synchronize同步控制
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
2.3.3 DisorderedQueue
Add nodes to the queue by performing non-blocking CAS operations in a circular CAS operation.
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是尾节点,CAS 将p的next指向newNode.
if (p.casNext(null, newNode)) {
if (p != t)
//tail指向真正尾节点
casTail(t, newNode);
return true;
}
}
else if (p == q)
// 说明p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以返回head节点
p = (t != (t = tail)) ? t : head;
else
// 向后查找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
3. Online cases
3.1 Problem Discovery
On the Internet hospital doctor's side, the doctor opens the IM chat page for consultation, and dozens of function buttons need to be loaded. During the anti-epidemic period in December 2022, the QPS was very high throughout the day, and the peak was 12 times that of normal days.
3.2 The detailed process of troubleshooting
The loading of the IM page of the doctor's consultation belongs to the golden process of the business, and each button on it is the entrance of a business line, so the alarms on the core logic use custom alarms, and this kind of alarm does not set convergence, and no matter what kind of abnormality including the number of buttons, the alarm will be immediately alarmed.
1. According to the alarm information, the following problems are found:
(1) No abnormal logs: Follow the logId of the abnormal logs, there are no abnormal logs in the process, and the buttons are inexplicably fewer.
(2) Cannot be reproduced: In the pre-release environment, if the same input parameters are used, the interface returns normally and cannot be reproduced.
2. Code analysis to narrow down the scope of exceptions:
Doctor consultation IM button processing is carried out in groups:
// 多个线程结果集合
List<DoctorDiagImButtonInfoDTO> multiButtonList = new ArrayList<>();
// 多线程并行处理
Future<List<DoctorDiagImButtonInfoDTO>> multiButtonFuture = joyThreadPoolTaskExecutor.submit(() -> {
List<DoctorDiagImButtonInfoDTO> multiButtonListTemp = new ArrayList<>();
buttonTypes.forEach(buttonType -> {
multiButtonListTemp.add(appButtonInfoMap.get(buttonType));
});
multiButtonList.addAll(multiButtonListTemp);
return multiButtonListTemp;
});
3. Add online observation of logs
Since concurrency scenarios are prone to sub-thread failures, add the necessary node logs to each sub-thread branch and observe the following:
(1) During the processing of the abnormal request, all sub-threads are processed normally
(2) The number of missing buttons is randomly equal to the number of buttons processed in the subthread
(3)初步判断是ArrayList并发addAll操作异常
4. Simulated reproduction
Use the ArrayList source code to simulate the reproduction of the problem:
(1) ArrayList source code analysis:
public boolean addAll(Collection<? extends E> c) {
Object[] a = c.toArray();
int numNew = a.length;
ensureCapacityInternal(size + numNew); // Increments modCount
//以当前size为起点,向数组中追加本次新增对象
System.arraycopy(a, 0, elementData, size, numNew);
//更新全局变量size的值,和上一步是非原子操作,引发并发问题的根源
size += numNew;
return numNew != 0;
}
private void ensureCapacityInternal(int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
}
ensureExplicitCapacity(minCapacity);
}
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
(2) Theoretical analysis
In the add operation of ArrayList, the operation of changing the size and adding data is not an atomic operation.
(3) Problem recurrence
Copy the source code to create a custom class, and add a pause to facilitate the reproduction of concurrency problems
public boolean addAll(Collection<? extends E> c) {
Object[] a = c.toArray();
int numNew = a.length;
//第1次停顿,获取当前size
try {
Thread.sleep(1000*timeout1);
} catch (InterruptedException e) {
e.printStackTrace();
}
ensureCapacityInternal(size + numNew); // Increments modCount
//第2次停顿,等待copy
try {
Thread.sleep(1000*timeout2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.arraycopy(a, 0, elementData, size, numNew);
//第3次停顿,等待size+=
try {
Thread.sleep(1000*timeout3);
} catch (InterruptedException e) {
e.printStackTrace();
}
size += numNew;
return numNew != 0;
}
3.3 Problem Solving
使用线程安全工具 Collections.synchronizedList 创建 ArrayList :
List<DoctorDiagImButtonInfoDTO> multiButtonList = Collections.synchronizedList(new ArrayList<>());
Normal after online observation.
3.4 Summarizing and Reflecting
The use of multithreading has become common, but thread-safe classes must be used for objects that work together with multiple threads.
In addition, there are a few soul questions to be clarified:
(1)JMM的灵魂:Happens-before 原则
(2) The soul of concurrency utilities: read/write volatile variables and CAS