天天看點

hive2solr問題小結

  搞了一段時間,hive2solr的job終于可以穩定的跑了,實作使用hive向solr插資料,主要是實作RecordWriter接口,重寫write方法和close方法。下面對遇到的問題一一列出:

1.資料覆寫問題,使用原子更新

參考:http://caiguangguang.blog.51cto.com/1652935/1599137

2.重複建構solrserver和solrtable對象問題,使用static在初始化的時候建構,後面直接調用

建構:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

<code>        </code><code>public</code> <code>static</code> <code>Map&lt;Integer,SolrServer&gt; solrServers = </code><code>new</code> <code>HashMap&lt;Integer,SolrServer&gt;();</code>

<code>        </code><code>public</code> <code>static</code> <code>Map&lt;Integer,SolrTable&gt; solrTables = </code><code>new</code> <code>HashMap&lt;Integer,SolrTable&gt;();</code>

<code>        </code><code>public</code> <code>static</code> <code>String[] iparray;</code>

<code>        </code><code>public</code> <code>static</code> <code>String ipstring;</code>

<code>        </code><code>public</code> <code>static</code> <code>String collec;</code>

<code>        </code><code>static</code> <code>{</code>

<code>               </code><code>LOG .warn(</code><code>"in SolrServerCustom start initialize ip maps"</code> <code>);</code>

<code>               </code><code>ipstring = </code><code>"xxxx,xxxxxx"</code><code>;</code>

<code>               </code><code>collec = </code><code>"userinfo"</code> <code>;</code>

<code>               </code><code>LOG .warn(</code><code>"in SolrServerCustom  ipstring and collec: "</code> <code>+ ipstring + </code><code>","</code> <code>+ collec );</code>

<code>               </code><code>iparray = ipstring .split(</code><code>","</code> <code>);</code>

<code>              </code><code>Arrays. sort( iparray);</code>

<code>               </code><code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i&lt; iparray. length;i++){</code>

<code>                     </code><code>String urlx = </code><code>"http://"</code> <code>+iparray [i]+</code><code>"/solr/"</code> <code>+ collec;</code>

<code>                      </code><code>solrServers.put(i, </code><code>new</code> <code>HttpSolrServer(urlx));</code>

<code>                      </code><code>solrTables.put(i, </code><code>new</code> <code>SolrTable(String.valueOf(i)));</code>

<code>              </code><code>}</code>

<code>               </code><code>LOG .warn(</code><code>"in SolrServerCustom end initialize ip maps,maps size "</code> <code>+ solrServers .size());</code>

<code>               </code><code>LOG .warn(</code><code>"in SolrServerCustom end initialize ip mapsx,mapsx size "</code> <code>+ solrTables .size()); </code>

<code>       </code><code>}</code>

引用:

21

22

23

24

25

26

27

28

29

30

31

<code> </code><code>public</code> <code>void</code> <code>write(Writable w) </code><code>throws</code> <code>IOException {</code>

<code>          </code><code>MapWritable map = (MapWritable) w;</code>

<code>          </code><code>SolrInputDocument doc = </code><code>new</code> <code>SolrInputDocument();</code>

<code>          </code><code>String key;</code>

<code>          </code><code>String value;</code>

<code>          </code><code>String newkey;</code>

<code>          </code><code>int</code> <code>idx;</code>

<code>          </code><code>for</code> <code>(</code><code>final</code> <code>Map.Entry&lt;Writable, Writable&gt; entry : map.entrySet()) {</code>

<code>               </code><code>key = entry.getKey().toString();</code>

<code>               </code><code>newkey = </code><code>this</code><code>.tableName + </code><code>"."</code> <code>+ entry.getKey().toString();</code>

<code>               </code><code>value = entry.getValue().toString();</code>

<code>               </code><code>if</code><code>(key.equals(</code><code>"id"</code><code>)){</code>

<code>                    </code><code>idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers); </code><code>//引用靜态屬性SolrServerCustom.solrServers</code>

<code>                    </code><code>table = SolrServerCustom.solrTables.get(idx); </code><code>//引用靜态屬性SolrServerCustom.solrTables</code>

<code>                    </code><code>table.setNumInputBufferRows(</code><code>this</code><code>.numInputBufferRows);</code>

<code>               </code><code>}</code>

<code>                    </code><code>doc.addField(</code><code>"id"</code><code>,Integer.valueOf(value));</code>

<code>               </code><code>}</code><code>else</code><code>{</code>

<code>                    </code><code>if</code> <code>(value.equals(</code><code>"(null)"</code><code>)){</code>

<code>                         </code><code>value = </code><code>""</code><code>;</code>

<code>                    </code><code>}</code>

<code>                    </code><code>setOper = </code><code>new</code> <code>LinkedHashMap&lt;String,Object&gt;();</code>

<code>                    </code><code>setOper.put(</code><code>"set"</code><code>,value);</code>

<code>                    </code><code>if</code><code>(!doc.keySet().contains(newkey)){</code>

<code>                         </code><code>doc.addField(newkey, setOper);</code>

<code>                    </code><code>}    </code>

<code>          </code><code>}</code>

<code>          </code><code>table.save(doc);</code>

<code>     </code><code>}</code>

3.代碼存在記憶體洩露問題

1)對象的聲明,放在循環外,并調整outbuffer的大小

現象:yarn map/reduce  java heap滿導緻job hang

錯誤日志:

<code>2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded</code>

<code>        </code><code>at java.lang.AbstractStringBuilder.&lt;init&gt;(AbstractStringBuilder.java:45)</code>

<code>        </code><code>at java.lang.StringBuilder.&lt;init&gt;(StringBuilder.java:68)</code>

<code>        </code><code>at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.FileSinkOperator.processOp(FileSinkOperator.java:621)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.Operator.forward(Operator.java:793)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.SelectOperator.processOp(SelectOperator.java:87)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.TableScanOperator.processOp(TableScanOperator.java:92)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.MapOperator.process(MapOperator.java:540)</code>

<code>        </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.mr.ExecMapper.map(ExecMapper.java:177)</code>

<code>        </code><code>at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)</code>

<code>        </code><code>at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)</code>

<code>        </code><code>at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)</code>

<code>        </code><code>at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)</code>

<code>        </code><code>at java.security.AccessController.doPrivileged(Native Method)</code>

<code>        </code><code>at javax.security.auth.Subject.doAs(Subject.java:396)</code>

<code>        </code><code>at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)</code>

<code>              </code><code>at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)</code>

2)try...catch....finally的使用(在finally中 clear buffer)

一開始沒有增加finally,導緻在異常發生時buffer會大于設定,最終導緻job記憶體用滿,hang住。

4.異常的處理

要求一個solrserver出錯,或者solr暫時不響應時程式不能退出,預設情況下異常向上抛出,最終導緻job失敗

比如:

<code>Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content </code><code>type</code> <code>application</code><code>/octet-stream</code> <code>but got text</code><code>/html</code><code>. &lt;html&gt;</code>

<code>&lt;</code><code>head</code><code>&gt;&lt;title&gt;504 Gateway Time-out&lt;</code><code>/title</code><code>&gt;&lt;</code><code>/head</code><code>&gt;</code>

<code>&lt;body bgcolor=</code><code>"white"</code><code>&gt;</code>

<code>&lt;center&gt;&lt;h1&gt;504 Gateway Time-out&lt;</code><code>/h1</code><code>&gt;&lt;</code><code>/center</code><code>&gt;</code>

<code>&lt;hr&gt;&lt;center&gt;nginx</code><code>/1</code><code>.6.2&lt;</code><code>/center</code><code>&gt;</code>

<code>&lt;</code><code>/body</code><code>&gt;</code>

<code>&lt;</code><code>/html</code><code>&gt;</code>

 防止異常的抛出會造成runtime error導緻job失敗,catch異常後不做處理

<code>     </code><code>public void flush(){</code>

<code>          </code><code>try {</code>

<code>               </code><code>if</code> <code>(!outputBuffer.isEmpty()) {</code>

<code>                    </code><code>server.add(outputBuffer);</code>

<code>          </code><code>} catch(Exception e){</code>

<code>               </code><code>LOG.warn(</code><code>"solrtable add error,Exception log is "</code> <code>+ e);</code>

<code>          </code><code>}finally{</code>

<code>               </code><code>outputBuffer.</code><code>clear</code><code>(); </code><code>//</code><code>在finally中清除buffer,否則會導緻buffer在異常抛出時一直遞增導緻jvm oom的問題</code>

5.commit問題,調用close方法時,隻有最後一個solrtable會close,開始時使用每插入一行就commit的方式,但是這種性能很差(大約50%的降低),後來在solrserver端控制commit

solrconfig.xml:

<code>     </code><code>&lt;autoCommit&gt;</code>

<code>       </code><code>&lt;!--&lt;maxTime&gt;${solr.autoCommit.maxTime:15000}&lt;</code><code>/maxTime</code><code>&gt;--&gt;</code>

<code>         </code><code>&lt;maxDocs&gt;15000&lt;</code><code>/maxDocs</code><code>&gt; </code><code>//</code><code>當記憶體索引數量達到指定值的時候,将記憶體的索引DUMP到硬碟中,并通知searcher類加載新的索引</code>

<code>        </code><code>&lt;maxTime&gt;1000&lt;</code><code>/maxTime</code><code>&gt; </code><code>//</code><code>每隔指定的時間段,自動的COMMIT記憶體中的索引資料,并通知Searcher類加載新的索引,以最先達到條件執行為準</code>

<code>       </code><code>&lt;openSearcher&gt;</code><code>true</code><code>&lt;</code><code>/openSearcher</code><code>&gt;  </code><code>//</code><code>設定為</code><code>false</code><code>時,雖然commit會導緻index的變更flush到磁盤上,但是用戶端不會看到更新</code>

<code>     </code><code>&lt;</code><code>/autoCommit</code><code>&gt;</code>

<code>   </code> 

<code>     </code><code>&lt;autoSoftCommit&gt;</code>

<code>       </code><code>&lt;maxTime&gt;${solr.autoSoftCommit.maxTime:10000}&lt;</code><code>/maxTime</code><code>&gt;</code>

<code>     </code><code>&lt;</code><code>/autoSoftCommit</code><code>&gt;</code>

這裡autoCommit是指hard commit,如果不使用autoCommit也可以在add document時帶上commitWithin的參數autoSoftCommit和autoCommit類似,但是它是一個solf類型的commit,可以確定資料可見但是沒有把資料flush到磁盤,機器crash會導緻資料丢失。

save也導緻性能損耗,save會消耗6ms左右的時間,需要放到一個list中進行save操作(batch操作)

6.outbuffer的問題

初始的代碼,因為對用solrtable來說隻有一個入口(solrcloud時也一樣),這樣solrtable隻有一個執行個體,這裡用到了靜态變量,每個solrtable不能按自己的buffer進行操作

改成非靜态變量,并且使用靜态代碼塊初始化table和server,放到一個hashmap中,用的時候去取,保證隻有幾個的執行個體。否則如果在使用時進行執行個體化,每次的對象都不同,導緻buffer一直為1。

7.close的問題

如果設定了buffer,可能會導緻不能flush

<code>public</code> <code>void</code> <code>save(SolrInputDocument doc) {</code>

<code>     </code><code>outputBuffer.add(doc); </code><code>//使用save放到buffer list中</code>

<code>     </code><code>if</code> <code>(outputBuffer.size() &gt;= numOutputBufferRows) { </code><code>//隻有list的大小&gt;=設定的buffer大小時才會觸發flush的操作</code>

<code>         </code><code>flush();</code>

<code>}</code>

而flush中會調用server.add(outputBuffer)操作。filesink關閉時調用SolrWriter.close

調用SolrTable的commit(commit中調用flush和server.commit),發現隻有最後一個table執行個體會調用commit.

解決方法,在SolrWriter.close中循環調用SolrTable.commit方法:

<code>public</code> <code>void</code> <code>close(</code><code>boolean</code> <code>abort) </code><code>throws</code> <code>IOException {</code>

<code>     </code><code>if</code> <code>(!abort) {</code>

<code>         </code><code>Map&lt;Integer,SolrTable&gt; maps = </code><code>new</code> <code>SolrServerCustom().solrTable;</code>

<code>         </code><code>for</code><code>(Map.Entry&lt;Integer, SolrTable&gt; entry:maps.entrySet()){</code>

<code>             </code><code>entry.getValue().commit();</code>

<code>         </code><code>}</code>

<code>     </code><code>} </code><code>else</code> <code>{</code>

<code>         </code><code>table.rollback();</code>

8.鎖的問題,從nginx端看到大量的302 ,solr日志看到有鎖的問題,調整參數,在solr啟動時釋放鎖

solr端日志:

<code>userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked </code><code>for</code> <code>write </code><code>for</code> <code>core userinfo</code>

解決:solrconfig.xml中設定

<code>&lt;unlockOnStartup&gt;</code><code>true</code><code>&lt;</code><code>/unlockOnStartup</code><code>&gt;</code>

原因:

org.apache.solr.core.SolrCore初始化時使用IndexWriter.isLocked(dir)判斷是否加鎖,如果已經加了鎖,則分為兩種情況,一種是在solrconfig.xml中配置了unlockOnStartup,會嘗試unlock,如果沒有配置unlockStartup,則會抛出Index locked for write for core異常

根據堆棧可以看對應代碼:

org.apache.solr.core.SolrCore 構造函數中會調用initIndex方法:

32

33

34

35

36

37

38

39

40

41

42

43

<code>  </code><code>void</code> <code>initIndex(</code><code>boolean</code> <code>reload) </code><code>throws</code> <code>IOException {</code>

<code>      </code><code>String indexDir = getNewIndexDir();</code>

<code>      </code><code>boolean</code> <code>indexExists = getDirectoryFactory().exists(indexDir);</code>

<code>      </code><code>boolean</code> <code>firstTime;</code>

<code>      </code><code>synchronized</code> <code>(SolrCore.</code><code>class</code><code>) {</code>

<code>        </code><code>firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));</code>

<code>      </code><code>}</code>

<code>      </code><code>boolean</code> <code>removeLocks = solrConfig.unlockOnStartup; </code><code>// unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 預設為false</code>

<code>      </code><code>initIndexReaderFactory();</code>

<code>      </code><code>if</code> <code>(indexExists &amp;&amp; firstTime &amp;&amp; !reload) {</code>

<code>       </code> 

<code>        </code><code>Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,</code>

<code>            </code><code>getSolrConfig().indexConfig.lockType);</code>

<code>        </code><code>try</code> <code>{</code>

<code>          </code><code>if</code> <code>(IndexWriter.isLocked(dir)) {</code>

<code>            </code><code>if</code> <code>(removeLocks) {</code>

<code>              </code><code>log.warn(</code>

<code>                  </code><code>logid</code>

<code>                      </code><code>+ </code><code>"WARNING: Solr index directory '{}' is locked.  Unlocking..."</code><code>,</code>

<code>                  </code><code>indexDir);</code>

<code>              </code><code>IndexWriter.unlock(dir); </code><code>//解鎖</code>

<code>            </code><code>} </code><code>else</code> <code>{</code>

<code>              </code><code>log.error(logid</code>

<code>                  </code><code>+ </code><code>"Solr index directory '{}' is locked.  Throwing exception"</code><code>,</code>

<code>              </code><code>throw</code> <code>new</code> <code>LockObtainFailedException(</code>

<code>                  </code><code>"Index locked for write for core "</code> <code>+ name);</code>

<code>            </code><code>}</code>

<code>           </code> 

<code>        </code><code>} </code><code>finally</code> <code>{</code>

<code>          </code><code>directoryFactory.release(dir);</code>

<code>        </code><code>}</code>

<code>      </code><code>// Create the index if it doesn't exist.</code>

<code>      </code><code>if</code><code>(!indexExists) {</code>

<code>        </code><code>log.warn(logid+</code><code>"Solr index directory '"</code> <code>+ </code><code>new</code> <code>File(indexDir) + </code><code>"' doesn't exist."</code>

<code>                </code><code>+ </code><code>" Creating new index..."</code><code>);</code>

<code>        </code><code>SolrIndexWriter writer = SolrIndexWriter.create(</code><code>"SolrCore.initIndex"</code><code>, indexDir, getDirectoryFactory(), </code><code>true</code><code>,</code>

<code>                                                        </code><code>getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);</code>

<code>        </code><code>writer.close();</code>

<code>  </code><code>}</code>

9.tomcat的配置導緻的問題,每台機器兩個solr執行個體,其中一個一直不能啟動(在執行個體化core時會嘗試擷取鎖,這裡擷取鎖失敗,可以手動删除write.lock)

最終發現是兩個tomcat寫到了一個solr目錄裡面

<code>Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@</code><code>/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write</code><code>.lock</code>

<code>     </code><code>at org.apache.lucene.store.Lock.obtain(Lock.java:89)</code>

<code>     </code><code>at org.apache.lucene.index.IndexWriter.&lt;init&gt;(IndexWriter.java:710)</code>

<code>     </code><code>at org.apache.solr.update.SolrIndexWriter.&lt;init&gt;(SolrIndexWriter.java:77)</code>

<code>     </code><code>at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)</code>

<code>     </code><code>at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)</code>

<code>     </code><code>at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)</code>

<code>     </code><code>at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)</code>

<code>     </code><code>... 12 </code><code>more</code>

10.部分job運作緩慢,其中一個job運作了11個小時。。

資料寫入時發生在mapoperator或者reduceoperator中,多少個map或者reduce就是多少個并發線程寫入。job隻有一個reduce,導緻寫入緩慢,調整reduce的數量到100(set mapreduce.job.reduces=100)後,性能大幅度提升,3kw資料導入時間由40916s下降到993s。

本文轉自菜菜光 51CTO部落格,原文連結:http://blog.51cto.com/caiguangguang/1612601,如需轉載請自行聯系原作者