繼上一次查應用的CPU飙高問題(http://www.cnblogs.com/hzmark/p/JVM_CPU.html)過去10天了。上次隻是定位到了是一個第三方包占用了大量的CPU使用,但沒有細緻的去查第三方包為什麼占用了這麼高的CPU,并且記憶體為什麼如此詭異。總的來說上一次排查帶來的收獲是熟悉了JVM的工具使用和大緻定位到了問題。
在上次排查問題之後,應用出現異常的頻率還是較高,終下定決心再查一次,而這次排查的重點落在記憶體方面。因為懷疑CPU偏高是因為記憶體的異常導緻頻繁的GC引起的。
首先是JVM記憶體的狀态:

S0和S1交替的用滿,不停的在進行YGC。
使用jmap可以看到記憶體中那些對象使用了大量的記憶體:
在異常狀态的情況下看到的現象是class name:[B占用好幾個G的記憶體,也就是byte數組占用了很大的記憶體。
結合上一次查CPU問題時定位到的問題代碼:
仔細看出問題的線程:com.trilead.ssh2.StreamGobbler$GobblerThread.run
到這裡為止都是通過上次排查CPU問題可以推測出來的,其實仔細一點耐心一點順着com.trilead.ssh2.StreamGobbler$GobblerThread.run繼續往下分析能定位到具體問題,但是到這裡之後我才用了另一種方式去定位問題,那就是分析出現問題時的堆記憶體使用情況,下面是我的分析過程。
首先dump記憶體資料:
jmap -dump:format=b,file=xxx.bin 3230
其中xxx.bin是dump出來的檔案,3230是pid
之後使用mat打開該檔案(這邊不介紹MAT的使用,可以自己查找幫助文檔;我使用的是eclipse的mat插件,可以在官網上找到update的位址線上安裝就可以了http://download.eclipse.org/mat/1.3.1/update-site/)
這是一個概覽資訊,可以看到總共使用了618M記憶體,而有一塊直接占用了576M,那麼問題肯定出現在這塊記憶體中。
點到leak suspects會有一個系統分析的懷疑問題:
和上一步我們的猜想是一緻的,問題出在最大的這個記憶體這裡。和排查CPU時得出的結論也是一緻的,問題出在第三方的包内。
點選detail内容可以看到具體是哪裡占用了記憶體。
到這裡位置就可以知道是自己起的線程保持了對記憶體的引用導緻無法别回收。查自己寫的代碼已經可以精确的定位到問題。
(其實根據上次使用到一些JVM的工具已經可以發現問題,但是問了跟精确,順便學習記憶體排查,是以有了MAT分析的過程)
下面是定位到問題代碼之後的處理。
檢視具體的代碼:
1 package com.trilead.ssh2;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.InterruptedIOException;
6
7 /**
8 * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
9 * thread to constantly consume input from another InputStream. It uses a buffer
10 * to store the consumed data. The buffer size is automatically adjusted, if needed.
11 * <p>
12 * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
13 * InputStreams with instances of this class, then you don't have to bother about
14 * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
15 * since all arriving data will be immediatelly consumed by the worker threads.
16 * Also, as a side effect, the streams will be buffered (e.g., single byte
17 * read() operations are faster).
18 * <p>
19 * Other SSH for Java libraries include this functionality by default in
20 * their STDOUT and STDERR InputStream implementations, however, please be aware
21 * that this approach has also a downside:
22 * <p>
23 * If you do not call the StreamGobbler's <code>read()</code> method often enough
24 * and the peer is constantly sending huge amounts of data, then you will sooner or later
25 * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
26 * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
27 * <p>
28 * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
29 * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
30 *
31 * @author Christian Plattner, [email protected]
32 * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $
33 */
34
35 public class StreamGobbler extends InputStream
36 {
37 class GobblerThread extends Thread
38 {
39 public void run()
40 {
41 byte[] buff = new byte[8192];
42
43 while (true)
44 {
45 try
46 {
47 int avail = is.read(buff);
48
49 synchronized (synchronizer)
50 {
51 if (avail <= 0)
52 {
53 isEOF = true;
54 synchronizer.notifyAll();
55 break;
56 }
57
58 int space_available = buffer.length - write_pos;
59
60 if (space_available < avail)
61 {
62 /* compact/resize buffer */
63
64 int unread_size = write_pos - read_pos;
65 int need_space = unread_size + avail;
66
67 byte[] new_buffer = buffer;
68
69 if (need_space > buffer.length)
70 {
71 int inc = need_space / 3;
72 inc = (inc < 256) ? 256 : inc;
73 inc = (inc > 8192) ? 8192 : inc;
74 new_buffer = new byte[need_space + inc];
75 }
76
77 if (unread_size > 0)
78 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
79
80 buffer = new_buffer;
81
82 read_pos = 0;
83 write_pos = unread_size;
84 }
85
86 System.arraycopy(buff, 0, buffer, write_pos, avail);
87 write_pos += avail;
88
89 synchronizer.notifyAll();
90 }
91 }
92 catch (IOException e)
93 {
94 synchronized (synchronizer)
95 {
96 exception = e;
97 synchronizer.notifyAll();
98 break;
99 }
100 }
101 }
102 }
103 }
104
105 private InputStream is;
106 private final GobblerThread t;
107
108 private final Object synchronizer = new Object();
109
110 private boolean isEOF = false;
111 private boolean isClosed = false;
112 private IOException exception = null;
113
114 private byte[] buffer = new byte[2048];
115 private int read_pos = 0;
116 private int write_pos = 0;
117
118 public StreamGobbler(InputStream is)
119 {
120 this.is = is;
121 t = new GobblerThread();
122 t.setDaemon(true);
123 t.start();
124 }
125
126 public int read() throws IOException
127 {
128 synchronized (synchronizer)
129 {
130 if (isClosed)
131 throw new IOException("This StreamGobbler is closed.");
132
133 while (read_pos == write_pos)
134 {
135 if (exception != null)
136 throw exception;
137
138 if (isEOF)
139 return -1;
140
141 try
142 {
143 synchronizer.wait();
144 }
145 catch (InterruptedException e)
146 {
147 throw new InterruptedIOException();
148 }
149 }
150
151 int b = buffer[read_pos++] & 0xff;
152
153 return b;
154 }
155 }
156
157 public int available() throws IOException
158 {
159 synchronized (synchronizer)
160 {
161 if (isClosed)
162 throw new IOException("This StreamGobbler is closed.");
163
164 return write_pos - read_pos;
165 }
166 }
167
168 public int read(byte[] b) throws IOException
169 {
170 return read(b, 0, b.length);
171 }
172
173 public void close() throws IOException
174 {
175 synchronized (synchronizer)
176 {
177 if (isClosed)
178 return;
179 isClosed = true;
180 isEOF = true;
181 synchronizer.notifyAll();
182 is.close();
183 }
184 }
185
186 public int read(byte[] b, int off, int len) throws IOException
187 {
188 if (b == null)
189 throw new NullPointerException();
190
191 if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
192 throw new IndexOutOfBoundsException();
193
194 if (len == 0)
195 return 0;
196
197 synchronized (synchronizer)
198 {
199 if (isClosed)
200 throw new IOException("This StreamGobbler is closed.");
201
202 while (read_pos == write_pos)
203 {
204 if (exception != null)
205 throw exception;
206
207 if (isEOF)
208 return -1;
209
210 try
211 {
212 synchronizer.wait();
213 }
214 catch (InterruptedException e)
215 {
216 throw new InterruptedIOException();
217 }
218 }
219
220 int avail = write_pos - read_pos;
221
222 avail = (avail > len) ? len : avail;
223
224 System.arraycopy(buffer, read_pos, b, off, avail);
225
226 read_pos += avail;
227
228 return avail;
229 }
230 }
231 }
View Code
如果使用這個類之前知悉讀類的說明資訊是可以看明白這個類存在的問題的。當然也可以從源碼上看出來,我就是從源碼看出問題的,當時花的時間肯定比看說明耗費的時間長。
下面是産生問題的代碼:
1 class GobblerThread extends Thread
2 {
3 public void run()
4 {
5 byte[] buff = new byte[8192];
6
7 while (true)
8 {
9 try
10 {
11 int avail = is.read(buff);
12
13 synchronized (synchronizer)
14 {
15 if (avail <= 0)
16 {
17 isEOF = true;
18 synchronizer.notifyAll();
19 break;
20 }
21
22 int space_available = buffer.length - write_pos;
23
24 if (space_available < avail)
25 {
26 /* compact/resize buffer */
27
28 int unread_size = write_pos - read_pos;
29 int need_space = unread_size + avail;
30
31 byte[] new_buffer = buffer;
32
33 if (need_space > buffer.length)
34 {
35 int inc = need_space / 3;
36 inc = (inc < 256) ? 256 : inc;
37 inc = (inc > 8192) ? 8192 : inc;
38 new_buffer = new byte[need_space + inc];
39 }
40
41 if (unread_size > 0)
42 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
43
44 buffer = new_buffer;
45
46 read_pos = 0;
47 write_pos = unread_size;
48 }
49
50 System.arraycopy(buff, 0, buffer, write_pos, avail);
51 write_pos += avail;
52
53 synchronizer.notifyAll();
54 }
55 }
56 catch (IOException e)
57 {
58 synchronized (synchronizer)
59 {
60 exception = e;
61 synchronizer.notifyAll();
62 break;
63 }
64 }
65 }
66 }
67 }
StreamGobbler類的buffer屬性是一個隻會不斷增長,且不會清空的byte數組,問題就出在這裡。
當一個Hive查詢的執行時間非常的長(比如出現資料傾斜導緻查詢進行不下去、執行任務很多需要排隊導緻時間邊長),那麼這個線程将持續很長時間,而它将hold這個buffer永遠不會被回收,雖然它裡面的内容再也不會被使用。
那麼這就是不合理的地方。StreamGobbler類的設計就是啟動一個線程将一個輸入流的内容不停的清空,讀到自己的緩存中。這樣的目的是不要造成流的堵塞,但這樣就不适用長任務且日志不停輸出的場景。
至此排查分析結束,原因就是第三方的StreamGobbler并不适合目前的場景。解決的辦法就是自己實作讀取流的線程及時的處理掉流中的内容,和StreamGobbler起一個線程去讀試一緻的,隻是不做緩存處理。
經過這次CPU飙高查到原因,基本已經熟悉了JVM工具的使用,CPU問題及記憶體問題的排查流程,算是有些收獲。看問題,排查定位問題,看源碼分析,最終解決掉問題的過程讓自己非常的“爽”,并且得到成長。
和各位博友一起成長,加油。
如果本文對您有幫助,點一下右下角的“推薦”