天天看点

Java多线程安全

多线程能够提高CPU的使用效率,为用户提供宏观上同时执行多个动作的效果,强大的功能背后也存在线程安全性问题。多个线程同时操作共享变量导致程序输出结果与设计者的初衷不符,编程过程中可以使用以下几种方式实现线程安全。

锁同步机制

在多线程中很常见的问题就是生产者与消费者问题,生产者负责生产对象并且将对象放到产品队列中,消费者负责从产品队列中取出并消费对象;产品队列是一个容量大小有限制的容器,如果生产出来的产品堆满了产品队列,生产者就需要等待有消费者取出产品,产品队列有空位后再放入新产品;如果产品队列里没有产品对象,消费者同样要等待生产者产生新产品加入队列。

生产者和消费者通常都被抽象成为线程对象,此时产品队列就是一个共享的变量,要保证多个生产者消费者线程能够正确使用产品队列就需要做互斥操作,也就是说在有某个生产者或消费者线程操作产品队列的时候,其他的线程都不能操作产品队列都要等待;同时还要做好产品队列的通知操作,也就是当有生产者放入新产品的时候要通知消费者可以继续取产品,当有消费者取出产品后也要通知生产者产品队列有了空位可以添加新产品。

public class SynchroinzedTest {
	
	public static void main(String[] args) {
		ProductQueue queue = new ProductQueue();
		new ProducerThread(queue).start();
		new ProducerThread(queue).start();
		new ProducerThread(queue).start();

		new ConsumerThread(queue).start();
		new ConsumerThread(queue).start();
		new ConsumerThread(queue).start();
	}
	
	static class ProductQueue {
		LinkedList<Object> list = new LinkedList<>();
		
		public synchronized void add(Object product) {
			while (list.size() >= 10) {
				ThreadUtils.wait(this);
			}
			list.addFirst(product);
			notify();
		}
		
		public synchronized Object remove() {
			while (list.isEmpty()) {
				ThreadUtils.wait(this);
			}
			notify();
			return list.removeLast();
		}
	}
	
	static class ProducerThread extends Thread {
		private ProductQueue queue;
		
		
		public ProducerThread(ProductQueue queue) {
			super();
			this.queue = queue;
		}

		@Override
		public void run() {
			while (true) {
				Object obj = new Object();
				System.out.println(ThreadUtils.getName() + ": Produce one object");
				queue.add(obj);
				ThreadUtils.sleep(500);
			}
		}
	}
	
	static class ConsumerThread extends Thread {
		private ProductQueue queue;
		
		public ConsumerThread(ProductQueue queue) {
			super();
			this.queue = queue;
		}

		@Override
		public void run() {
			while (true) {
				Object obj = queue.remove();
				System.out.println(ThreadUtils.getName() + ": Consume one object");
				ThreadUtils.sleep(2000);
			}
		}
	}
}	
           

ProductQueue就代表了产品队列对象,add()方法会向内部的链表头部添加新产品,remove()方法则负责从链表尾部删除产品并返回。这两个方法都是用了synchronized修饰,它们共同使用当前ProductQueue.this对象作为锁对象,如果有多个生产者或者消费者线程来访问产品队列,由于锁对象的存在每次只有当前获取到this锁的线程可以继续执行,其他的线程需要继续等待锁对象释放在参与锁竞争。

在add()和remove()方法内部还使用while循环来调用wait()方法使当前线程等待,前面介绍Thread接口的时候提到sleep()和wait()方法都可以让当前线程暂停,它们的区别又在哪里呢?sleep()方法会暂停当前线程但是依然会保持获取的锁对象,唤醒之后不需要再重新获取锁对象;而wait()方法在暂停线程后会释放锁对象,等到重新被唤醒再竞争锁对象。在add()方法中由于产品队列已经被填满,需要等待有消费者消费后空出新空间,而消费如果需要消费产品一定要在调用remove()方法时获取锁对象,因此这里就使用wait()方法来暂停线程并释放锁对象,从而让remove()方法有机会获得锁对象。

wait()/notify()方法都是继承自Object类的方法,也就是说Java中的对象都可以作为锁对象存在,而且能够支持锁同步操作。在调用wait()方法为何外面要用while循环来检测等待条件呢,直接使用if判断来检测可不可以呢?查看Object.wait()方法的注释会发现下面这段话:

As in the one argument version, interrupts and spurious wakeups are possible, 
and this method should always be used in a loop:
synchronized (obj) {
while (condition does not hold)
 obj.wait();
 	// Perform action appropriate to condition
 }
           

这段注释的意思是中断操作和伪唤醒操作都会导致wait()方法返回,而这两种情况下可能等待条件并没有被破坏,因而在wait()返回之后还要重新判断当前的等待条件是否满足。

ProductQueue的add() 和remove()方法最后都会调用notify()方法来唤醒一个线程继续执行,Object中其实还有一个notifyAll()的方法,用来唤醒所有等待在锁对象上的线程。生产者和消费者问题里由于一次只能生产或者消费一个产品,后续只需要唤醒一个生产者或者消费者线程,即使把所有的线程都唤醒也只有一个能够执行,这里就只调用了notify()唤醒一个线程。

锁同步机制能够很好的保证多线程操作的安全性,但是锁机制引入了锁对象,锁竞争机制,线程等待队列等等,这些都需要消耗内存和CPU资源,特别是在切换线程后由于没有获得锁对象浪费了系统分配给线程的时间片资源,因而锁同步机制是一种相对昂贵的线程安全机制。

不可变对象

共享变量在多线程操作会出现语义错误问题很大部分是因为共同修改共享变量导致的,如果共享变量是不可变的,没有线程能够修改它的值,所有的线程都只是读取它的值,那么就不会存在线程不安全问题。在Java中有不少类就被设计为不可变对象,它们可以安全的在对个线程之间共享。比如String类对象就是不可以修改的,只要对String做修改就会产生一个新的String对象。

String text = “abc1234”;

String tmp = text;

text = text.replace(“abc”, “xyz”);

System.out.println(text == tmp); // false

如果能够确定共享变量目前并不需要修改,可以将共享变量做成不可变对象,所有的线程都只是共享的读取它的内容,这样不需要锁同步机制就保证了共享变量的多线程安全。

线程本地量

前面讨论的都是共享变量导致的线程不安全性,如果不共享变量那是不是就不存在线程不安全问题了呢,不共享的变量只会在当前线程里操作,相当于单线程操作,自然也就不存在问题了。Java当中为开发者提供了ThreadLocal线程本地量的机制,当修改线程本地量的时候实际上操作的只是绑定在当前线程的变量,其他线程的变量依然保持不变。

public class ThreadLocalTest {
	private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
		@Override
		protected Integer initialValue() {
			return 0;
		}
	};
	public static void main(String[] args) {
		for (int i = 0; i < 50; i++) {
			new MyThread().start();
		}
	}
	
	static class MyThread extends Thread {
		@Override
		public void run() {
			for (int i = 0; i < 200; i++) {
				int x = threadLocal.get();
				x++;
				threadLocal.set(x);
			}
			
			System.out.println(threadLocal.get());
		}
	}
}
           

上面的代码定义了共享变量threadLocal,它会在多个线程中被获取被执行,但是最终每个线程打印出来的结果都是200,并没有发生多线程语义问题。ThreadLocal内部是如何实现的呢,查看JDK的Thread实现代码会发现它拥有一个ThreadLocal.ThreadMap的属性变量。

public class Thread implements Runnable {
    ThreadLocal.ThreadLocalMap threadLocals = null;
}
           

threadLocals成员并不是在Thread类中被创建的,而是在ThreadLocal类中被创建的,当用户第一次从ThreadLocal对象中获取本地变量的时候会查看当前Thread内部是否已经创建了ThreadLocalMap对象,如果没创建就会new ThreadLocalMap()并赋值给Thread.threadLocals属性。

// ThreadLocal的源代码
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // 获取当前Thread.threadLocals属性对象
     if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
         if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
   }
   return setInitialValue(); // 如果Thread.threadLocals属性为空,初始化
}

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); //  获取当前Thread.threadLocals属性对象
    if (map != null)
        map.set(this, value);
    else
       createMap(t, value); // 创建ThreadLocallMap对象
    return value;
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
           

ThreadLocalMap类是一个类似于HashMap的数据结构,但是它支持的操作比HashMap要少,可以认为是一个精简版的HashMap。ThreadLocalMap内部会包含一个ThreadLocalMap.Entry类型的数组,ThreadLocalMap.Entry类包含两个属性,ThreadLocal类型的key键属性,Object类型的value值属性。这样当ThreadLocal线程本地量对象获取属性的时候就从当前所在线程Thread.threadLocals的Map对象中拿出键为ThreadLocal对象的对应的值。

线程封闭

共享对象由于在多个线程之间操作容易出现语义问题,如果共享变量只在一个线程之中操作,其他线程想要操作共享变量都委托给这个单独的线程来执行,这种处理方式就称作线程封闭。在Android系统中最常见的线程封闭实例就是主线程UI操作,Android规定所有的UI界面更新操作必须要全部都要在主线程中进行,在子线程中操作UI对象会抛出异常,属于非法操作。

其实仔细考虑就会发现UI对象也是一种共享变量,它里面的内容和普通的变量一样都是可以被修改的,如果多个线程同时修改UI对象的内容,为了避免前面出现的语义性错误必须要使用锁同步机制确保正确的更新操作;但是Android系统规定了所有的UI操作都必须在UI线程中执行,其他的线程想访问UI对象都会委托UI线程来执行,这就是Android UI对象的线程封闭。

在Android系统中有一个封装了Looper的线程类HandlerThread,它在启动的时候会在内部开启一个消息循环队列,其他线程向消息队列投递消息,所有的消息处理都可以在在HandlerThread线程里执行,现在就用该类负责共享数据的递增操作,不需要任何锁同步机制就能保证线程安全性。

public class IncrementThread extends HandlerThread {
    private static final String TAG = "IncrementThread";

    public IncrementThread() {
        super("IncrementThread");
    }

    private int count = 0;

    public void increment() {
        for (int i = 0; i < 200; i++) {
            count++;
        }
        print();
    }

    public void print() {
        Log.e(TAG, "count = " + count);
    }
}
           

上面定义了IncrementThread继承自HandlerThread,它的内部有一个count变量,这个变量只有IncrementThread变量可以访问修改,其他线程如果想要修改它必须向IncrementThread线程发起修改请求。Android中的Handler可以绑定不同的线程的Looper对象,这样Handler就能够向不同的线程发送消息,并且Handler.handleMessage()方法会在开启Looper对象的线程执行。

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    private IncrementThread mIncrementThread;
    private Handler mIncrementHandler;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mIncrementThread = new IncrementThread();
        mIncrementThread.start();
        mIncrementHandler = new Handler(mIncrementThread.getLooper()) {
            @Override
            public void handleMessage(Message msg) {
                if (msg.what == 0) {
                    mIncrementThread.increment();
                }
            }
        };
    }

    class UserThread extends Thread {
        @Override
        public void run() {
            mIncrementHandler.sendEmptyMessage(0);
        }
    }

    public void startCount(View view) {
        for (int i = 0; i < 100; i++) {
            new UserThread().start();
        }
    }
}
           

MainActivity 当中会有startCount()方法,用户点击按钮后会触发,在这里会创建100个UserThread线程,每个UserThread都会想IncrementThread发起一次增加count变量的请求,多次测试会发现count每次增长都是20000,没有出现增长值丢失的情况。

继续阅读