天天看點

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

繼上一次查應用的CPU飙高問題(http://www.cnblogs.com/hzmark/p/JVM_CPU.html)過去10天了。上次隻是定位到了是一個第三方包占用了大量的CPU使用,但沒有細緻的去查第三方包為什麼占用了這麼高的CPU,并且記憶體為什麼如此詭異。總的來說上一次排查帶來的收獲是熟悉了JVM的工具使用和大緻定位到了問題。

在上次排查問題之後,應用出現異常的頻率還是較高,終下定決心再查一次,而這次排查的重點落在記憶體方面。因為懷疑CPU偏高是因為記憶體的異常導緻頻繁的GC引起的。

首先是JVM記憶體的狀态:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

S0和S1交替的用滿,不停的在進行YGC。

使用jmap可以看到記憶體中那些對象使用了大量的記憶體:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
下面是列出記憶體占用的方法(下圖不是異常狀态的時候的現象):
記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

在異常狀态的情況下看到的現象是class name:[B占用好幾個G的記憶體,也就是byte數組占用了很大的記憶體。

結合上一次查CPU問題時定位到的問題代碼:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

仔細看出問題的線程:com.trilead.ssh2.StreamGobbler$GobblerThread.run

到這裡為止都是通過上次排查CPU問題可以推測出來的,其實仔細一點耐心一點順着com.trilead.ssh2.StreamGobbler$GobblerThread.run繼續往下分析能定位到具體問題,但是到這裡之後我才用了另一種方式去定位問題,那就是分析出現問題時的堆記憶體使用情況,下面是我的分析過程。

首先dump記憶體資料:

jmap -dump:format=b,file=xxx.bin 3230

其中xxx.bin是dump出來的檔案,3230是pid

之後使用mat打開該檔案(這邊不介紹MAT的使用,可以自己查找幫助文檔;我使用的是eclipse的mat插件,可以在官網上找到update的位址線上安裝就可以了http://download.eclipse.org/mat/1.3.1/update-site/)

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

這是一個概覽資訊,可以看到總共使用了618M記憶體,而有一塊直接占用了576M,那麼問題肯定出現在這塊記憶體中。

點到leak suspects會有一個系統分析的懷疑問題:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

和上一步我們的猜想是一緻的,問題出在最大的這個記憶體這裡。和排查CPU時得出的結論也是一緻的,問題出在第三方的包内。

點選detail内容可以看到具體是哪裡占用了記憶體。

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
通過with outgoing references檢視線程引用的對象,如下圖所示:
記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
在找到使用記憶體最大的對象,查找他的引用,可以看到代碼中是哪裡引用了這塊記憶體導緻無法被回收
記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

到這裡位置就可以知道是自己起的線程保持了對記憶體的引用導緻無法别回收。查自己寫的代碼已經可以精确的定位到問題。

(其實根據上次使用到一些JVM的工具已經可以發現問題,但是問了跟精确,順便學習記憶體排查,是以有了MAT分析的過程)

下面是定位到問題代碼之後的處理。

檢視具體的代碼:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)

1 package com.trilead.ssh2;
  2 
  3 import java.io.IOException;
  4 import java.io.InputStream;
  5 import java.io.InterruptedIOException;
  6 
  7 /**
  8  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
  9  * thread to constantly consume input from another InputStream. It uses a buffer
 10  * to store the consumed data. The buffer size is automatically adjusted, if needed.
 11  * <p>
 12  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
 13  * InputStreams with instances of this class, then you don't have to bother about
 14  * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
 15  * since all arriving data will be immediatelly consumed by the worker threads.
 16  * Also, as a side effect, the streams will be buffered (e.g., single byte
 17  * read() operations are faster).
 18  * <p>
 19  * Other SSH for Java libraries include this functionality by default in
 20  * their STDOUT and STDERR InputStream implementations, however, please be aware
 21  * that this approach has also a downside:
 22  * <p>
 23  * If you do not call the StreamGobbler's <code>read()</code> method often enough
 24  * and the peer is constantly sending huge amounts of data, then you will sooner or later
 25  * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
 26  * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
 27  * <p>
 28  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
 29  * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
 30  * 
 31  * @author Christian Plattner, [email protected]
 32  * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $
 33  */
 34 
 35 public class StreamGobbler extends InputStream
 36 {
 37     class GobblerThread extends Thread
 38     {
 39         public void run()
 40         {
 41             byte[] buff = new byte[8192];
 42 
 43             while (true)
 44             {
 45                 try
 46                 {
 47                     int avail = is.read(buff);
 48 
 49                     synchronized (synchronizer)
 50                     {
 51                         if (avail <= 0)
 52                         {
 53                             isEOF = true;
 54                             synchronizer.notifyAll();
 55                             break;
 56                         }
 57                         
 58                         int space_available = buffer.length - write_pos;
 59                         
 60                         if (space_available < avail)
 61                         {
 62                             /* compact/resize buffer */
 63 
 64                             int unread_size = write_pos - read_pos;
 65                             int need_space = unread_size + avail;
 66 
 67                             byte[] new_buffer = buffer;
 68 
 69                             if (need_space > buffer.length)
 70                             {
 71                                 int inc = need_space / 3;
 72                                 inc = (inc < 256) ? 256 : inc;
 73                                 inc = (inc > 8192) ? 8192 : inc;
 74                                 new_buffer = new byte[need_space + inc];
 75                             }
 76                             
 77                             if (unread_size > 0)
 78                                 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
 79 
 80                             buffer = new_buffer;
 81                             
 82                             read_pos = 0;
 83                             write_pos = unread_size;
 84                         }
 85                         
 86                         System.arraycopy(buff, 0, buffer, write_pos, avail);
 87                         write_pos += avail;
 88 
 89                         synchronizer.notifyAll();
 90                     }    
 91                 }
 92                 catch (IOException e)
 93                 {
 94                     synchronized (synchronizer)
 95                     {
 96                         exception = e;
 97                         synchronizer.notifyAll();
 98                         break;
 99                     }
100                 }
101             }
102         }
103     }
104 
105     private InputStream is;
106     private final GobblerThread t;
107 
108     private final Object synchronizer = new Object();
109 
110     private boolean isEOF = false;
111     private boolean isClosed = false;
112     private IOException exception = null;
113 
114     private byte[] buffer = new byte[2048];
115     private int read_pos = 0;
116     private int write_pos = 0;
117 
118     public StreamGobbler(InputStream is)
119     {
120         this.is = is;
121         t = new GobblerThread();
122         t.setDaemon(true);
123         t.start();
124     }
125 
126     public int read() throws IOException
127     {
128         synchronized (synchronizer)
129         {
130             if (isClosed)
131                 throw new IOException("This StreamGobbler is closed.");
132 
133             while (read_pos == write_pos)
134             {
135                 if (exception != null)
136                     throw exception;
137 
138                 if (isEOF)
139                     return -1;
140 
141                 try
142                 {
143                     synchronizer.wait();
144                 }
145                 catch (InterruptedException e)
146                 {
147                     throw new InterruptedIOException();
148                 }
149             }
150 
151             int b = buffer[read_pos++] & 0xff;
152 
153             return b;
154         }
155     }
156 
157     public int available() throws IOException
158     {
159         synchronized (synchronizer)
160         {
161             if (isClosed)
162                 throw new IOException("This StreamGobbler is closed.");
163 
164             return write_pos - read_pos;
165         }
166     }
167 
168     public int read(byte[] b) throws IOException
169     {
170         return read(b, 0, b.length);
171     }
172 
173     public void close() throws IOException
174     {
175         synchronized (synchronizer)
176         {
177             if (isClosed)
178                 return;
179             isClosed = true;
180             isEOF = true;
181             synchronizer.notifyAll();
182             is.close();
183         }
184     }
185 
186     public int read(byte[] b, int off, int len) throws IOException
187     {
188         if (b == null)
189             throw new NullPointerException();
190 
191         if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
192             throw new IndexOutOfBoundsException();
193 
194         if (len == 0)
195             return 0;
196 
197         synchronized (synchronizer)
198         {
199             if (isClosed)
200                 throw new IOException("This StreamGobbler is closed.");
201 
202             while (read_pos == write_pos)
203             {
204                 if (exception != null)
205                     throw exception;
206 
207                 if (isEOF)
208                     return -1;
209 
210                 try
211                 {
212                     synchronizer.wait();
213                 }
214                 catch (InterruptedException e)
215                 {
216                     throw new InterruptedIOException();
217                 }
218             }
219 
220             int avail = write_pos - read_pos;
221 
222             avail = (avail > len) ? len : avail;
223 
224             System.arraycopy(buffer, read_pos, b, off, avail);
225 
226             read_pos += avail;
227 
228             return avail;
229         }
230     }
231 }      

View Code

如果使用這個類之前知悉讀類的說明資訊是可以看明白這個類存在的問題的。當然也可以從源碼上看出來,我就是從源碼看出問題的,當時花的時間肯定比看說明耗費的時間長。

下面是産生問題的代碼:

記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
記一次查記憶體異常問題(續《記一次Web應用CPU偏高》)
1 class GobblerThread extends Thread
 2     {
 3         public void run()
 4         {
 5             byte[] buff = new byte[8192];
 6 
 7             while (true)
 8             {
 9                 try
10                 {
11                     int avail = is.read(buff);
12 
13                     synchronized (synchronizer)
14                     {
15                         if (avail <= 0)
16                         {
17                             isEOF = true;
18                             synchronizer.notifyAll();
19                             break;
20                         }
21                         
22                         int space_available = buffer.length - write_pos;
23                         
24                         if (space_available < avail)
25                         {
26                             /* compact/resize buffer */
27 
28                             int unread_size = write_pos - read_pos;
29                             int need_space = unread_size + avail;
30 
31                             byte[] new_buffer = buffer;
32 
33                             if (need_space > buffer.length)
34                             {
35                                 int inc = need_space / 3;
36                                 inc = (inc < 256) ? 256 : inc;
37                                 inc = (inc > 8192) ? 8192 : inc;
38                                 new_buffer = new byte[need_space + inc];
39                             }
40                             
41                             if (unread_size > 0)
42                                 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
43 
44                             buffer = new_buffer;
45                             
46                             read_pos = 0;
47                             write_pos = unread_size;
48                         }
49                         
50                         System.arraycopy(buff, 0, buffer, write_pos, avail);
51                         write_pos += avail;
52 
53                         synchronizer.notifyAll();
54                     }    
55                 }
56                 catch (IOException e)
57                 {
58                     synchronized (synchronizer)
59                     {
60                         exception = e;
61                         synchronizer.notifyAll();
62                         break;
63                     }
64                 }
65             }
66         }
67     }      

StreamGobbler類的buffer屬性是一個隻會不斷增長,且不會清空的byte數組,問題就出在這裡。

當一個Hive查詢的執行時間非常的長(比如出現資料傾斜導緻查詢進行不下去、執行任務很多需要排隊導緻時間邊長),那麼這個線程将持續很長時間,而它将hold這個buffer永遠不會被回收,雖然它裡面的内容再也不會被使用。

那麼這就是不合理的地方。StreamGobbler類的設計就是啟動一個線程将一個輸入流的内容不停的清空,讀到自己的緩存中。這樣的目的是不要造成流的堵塞,但這樣就不适用長任務且日志不停輸出的場景。

至此排查分析結束,原因就是第三方的StreamGobbler并不适合目前的場景。解決的辦法就是自己實作讀取流的線程及時的處理掉流中的内容,和StreamGobbler起一個線程去讀試一緻的,隻是不做緩存處理。

經過這次CPU飙高查到原因,基本已經熟悉了JVM工具的使用,CPU問題及記憶體問題的排查流程,算是有些收獲。看問題,排查定位問題,看源碼分析,最終解決掉問題的過程讓自己非常的“爽”,并且得到成長。

和各位博友一起成長,加油。

如果本文對您有幫助,點一下右下角的“推薦”