搞了一段時間,hive2solr的job終于可以穩定的跑了,實作使用hive向solr插資料,主要是實作RecordWriter接口,重寫write方法和close方法。下面對遇到的問題一一列出:
1.資料覆寫問題,使用原子更新
參考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重複建構solrserver和solrtable對象問題,使用static在初始化的時候建構,後面直接調用
建構:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<code> </code><code>public</code> <code>static</code> <code>Map<Integer,SolrServer> solrServers = </code><code>new</code> <code>HashMap<Integer,SolrServer>();</code>
<code> </code><code>public</code> <code>static</code> <code>Map<Integer,SolrTable> solrTables = </code><code>new</code> <code>HashMap<Integer,SolrTable>();</code>
<code> </code><code>public</code> <code>static</code> <code>String[] iparray;</code>
<code> </code><code>public</code> <code>static</code> <code>String ipstring;</code>
<code> </code><code>public</code> <code>static</code> <code>String collec;</code>
<code> </code><code>static</code> <code>{</code>
<code> </code><code>LOG .warn(</code><code>"in SolrServerCustom start initialize ip maps"</code> <code>);</code>
<code> </code><code>ipstring = </code><code>"xxxx,xxxxxx"</code><code>;</code>
<code> </code><code>collec = </code><code>"userinfo"</code> <code>;</code>
<code> </code><code>LOG .warn(</code><code>"in SolrServerCustom ipstring and collec: "</code> <code>+ ipstring + </code><code>","</code> <code>+ collec );</code>
<code> </code><code>iparray = ipstring .split(</code><code>","</code> <code>);</code>
<code> </code><code>Arrays. sort( iparray);</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i< iparray. length;i++){</code>
<code> </code><code>String urlx = </code><code>"http://"</code> <code>+iparray [i]+</code><code>"/solr/"</code> <code>+ collec;</code>
<code> </code><code>solrServers.put(i, </code><code>new</code> <code>HttpSolrServer(urlx));</code>
<code> </code><code>solrTables.put(i, </code><code>new</code> <code>SolrTable(String.valueOf(i)));</code>
<code> </code><code>}</code>
<code> </code><code>LOG .warn(</code><code>"in SolrServerCustom end initialize ip maps,maps size "</code> <code>+ solrServers .size());</code>
<code> </code><code>LOG .warn(</code><code>"in SolrServerCustom end initialize ip mapsx,mapsx size "</code> <code>+ solrTables .size()); </code>
<code> </code><code>}</code>
引用:
21
22
23
24
25
26
27
28
29
30
31
<code> </code><code>public</code> <code>void</code> <code>write(Writable w) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>MapWritable map = (MapWritable) w;</code>
<code> </code><code>SolrInputDocument doc = </code><code>new</code> <code>SolrInputDocument();</code>
<code> </code><code>String key;</code>
<code> </code><code>String value;</code>
<code> </code><code>String newkey;</code>
<code> </code><code>int</code> <code>idx;</code>
<code> </code><code>for</code> <code>(</code><code>final</code> <code>Map.Entry<Writable, Writable> entry : map.entrySet()) {</code>
<code> </code><code>key = entry.getKey().toString();</code>
<code> </code><code>newkey = </code><code>this</code><code>.tableName + </code><code>"."</code> <code>+ entry.getKey().toString();</code>
<code> </code><code>value = entry.getValue().toString();</code>
<code> </code><code>if</code><code>(key.equals(</code><code>"id"</code><code>)){</code>
<code> </code><code>idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers); </code><code>//引用靜态屬性SolrServerCustom.solrServers</code>
<code> </code><code>table = SolrServerCustom.solrTables.get(idx); </code><code>//引用靜态屬性SolrServerCustom.solrTables</code>
<code> </code><code>table.setNumInputBufferRows(</code><code>this</code><code>.numInputBufferRows);</code>
<code> </code><code>}</code>
<code> </code><code>doc.addField(</code><code>"id"</code><code>,Integer.valueOf(value));</code>
<code> </code><code>}</code><code>else</code><code>{</code>
<code> </code><code>if</code> <code>(value.equals(</code><code>"(null)"</code><code>)){</code>
<code> </code><code>value = </code><code>""</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>setOper = </code><code>new</code> <code>LinkedHashMap<String,Object>();</code>
<code> </code><code>setOper.put(</code><code>"set"</code><code>,value);</code>
<code> </code><code>if</code><code>(!doc.keySet().contains(newkey)){</code>
<code> </code><code>doc.addField(newkey, setOper);</code>
<code> </code><code>} </code>
<code> </code><code>}</code>
<code> </code><code>table.save(doc);</code>
<code> </code><code>}</code>
3.代碼存在記憶體洩露問題
1)對象的聲明,放在循環外,并調整outbuffer的大小
現象:yarn map/reduce java heap滿導緻job hang
錯誤日志:
<code>2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded</code>
<code> </code><code>at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)</code>
<code> </code><code>at java.lang.StringBuilder.<init>(StringBuilder.java:68)</code>
<code> </code><code>at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.FileSinkOperator.processOp(FileSinkOperator.java:621)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.Operator.forward(Operator.java:793)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.SelectOperator.processOp(SelectOperator.java:87)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.TableScanOperator.processOp(TableScanOperator.java:92)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.MapOperator.process(MapOperator.java:540)</code>
<code> </code><code>at org.apache.hadoop.hive.ql.</code><code>exec</code><code>.mr.ExecMapper.map(ExecMapper.java:177)</code>
<code> </code><code>at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)</code>
<code> </code><code>at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)</code>
<code> </code><code>at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)</code>
<code> </code><code>at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)</code>
<code> </code><code>at java.security.AccessController.doPrivileged(Native Method)</code>
<code> </code><code>at javax.security.auth.Subject.doAs(Subject.java:396)</code>
<code> </code><code>at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)</code>
<code> </code><code>at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)</code>
2)try...catch....finally的使用(在finally中 clear buffer)
一開始沒有增加finally,導緻在異常發生時buffer會大于設定,最終導緻job記憶體用滿,hang住。
4.異常的處理
要求一個solrserver出錯,或者solr暫時不響應時程式不能退出,預設情況下異常向上抛出,最終導緻job失敗
比如:
<code>Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content </code><code>type</code> <code>application</code><code>/octet-stream</code> <code>but got text</code><code>/html</code><code>. <html></code>
<code><</code><code>head</code><code>><title>504 Gateway Time-out<</code><code>/title</code><code>><</code><code>/head</code><code>></code>
<code><body bgcolor=</code><code>"white"</code><code>></code>
<code><center><h1>504 Gateway Time-out<</code><code>/h1</code><code>><</code><code>/center</code><code>></code>
<code><hr><center>nginx</code><code>/1</code><code>.6.2<</code><code>/center</code><code>></code>
<code><</code><code>/body</code><code>></code>
<code><</code><code>/html</code><code>></code>
防止異常的抛出會造成runtime error導緻job失敗,catch異常後不做處理
<code> </code><code>public void flush(){</code>
<code> </code><code>try {</code>
<code> </code><code>if</code> <code>(!outputBuffer.isEmpty()) {</code>
<code> </code><code>server.add(outputBuffer);</code>
<code> </code><code>} catch(Exception e){</code>
<code> </code><code>LOG.warn(</code><code>"solrtable add error,Exception log is "</code> <code>+ e);</code>
<code> </code><code>}finally{</code>
<code> </code><code>outputBuffer.</code><code>clear</code><code>(); </code><code>//</code><code>在finally中清除buffer,否則會導緻buffer在異常抛出時一直遞增導緻jvm oom的問題</code>
5.commit問題,調用close方法時,隻有最後一個solrtable會close,開始時使用每插入一行就commit的方式,但是這種性能很差(大約50%的降低),後來在solrserver端控制commit
solrconfig.xml:
<code> </code><code><autoCommit></code>
<code> </code><code><!--<maxTime>${solr.autoCommit.maxTime:15000}<</code><code>/maxTime</code><code>>--></code>
<code> </code><code><maxDocs>15000<</code><code>/maxDocs</code><code>> </code><code>//</code><code>當記憶體索引數量達到指定值的時候,将記憶體的索引DUMP到硬碟中,并通知searcher類加載新的索引</code>
<code> </code><code><maxTime>1000<</code><code>/maxTime</code><code>> </code><code>//</code><code>每隔指定的時間段,自動的COMMIT記憶體中的索引資料,并通知Searcher類加載新的索引,以最先達到條件執行為準</code>
<code> </code><code><openSearcher></code><code>true</code><code><</code><code>/openSearcher</code><code>> </code><code>//</code><code>設定為</code><code>false</code><code>時,雖然commit會導緻index的變更flush到磁盤上,但是用戶端不會看到更新</code>
<code> </code><code><</code><code>/autoCommit</code><code>></code>
<code> </code>
<code> </code><code><autoSoftCommit></code>
<code> </code><code><maxTime>${solr.autoSoftCommit.maxTime:10000}<</code><code>/maxTime</code><code>></code>
<code> </code><code><</code><code>/autoSoftCommit</code><code>></code>
這裡autoCommit是指hard commit,如果不使用autoCommit也可以在add document時帶上commitWithin的參數autoSoftCommit和autoCommit類似,但是它是一個solf類型的commit,可以確定資料可見但是沒有把資料flush到磁盤,機器crash會導緻資料丢失。
save也導緻性能損耗,save會消耗6ms左右的時間,需要放到一個list中進行save操作(batch操作)
6.outbuffer的問題
初始的代碼,因為對用solrtable來說隻有一個入口(solrcloud時也一樣),這樣solrtable隻有一個執行個體,這裡用到了靜态變量,每個solrtable不能按自己的buffer進行操作
改成非靜态變量,并且使用靜态代碼塊初始化table和server,放到一個hashmap中,用的時候去取,保證隻有幾個的執行個體。否則如果在使用時進行執行個體化,每次的對象都不同,導緻buffer一直為1。
7.close的問題
如果設定了buffer,可能會導緻不能flush
<code>public</code> <code>void</code> <code>save(SolrInputDocument doc) {</code>
<code> </code><code>outputBuffer.add(doc); </code><code>//使用save放到buffer list中</code>
<code> </code><code>if</code> <code>(outputBuffer.size() >= numOutputBufferRows) { </code><code>//隻有list的大小>=設定的buffer大小時才會觸發flush的操作</code>
<code> </code><code>flush();</code>
<code>}</code>
而flush中會調用server.add(outputBuffer)操作。filesink關閉時調用SolrWriter.close
調用SolrTable的commit(commit中調用flush和server.commit),發現隻有最後一個table執行個體會調用commit.
解決方法,在SolrWriter.close中循環調用SolrTable.commit方法:
<code>public</code> <code>void</code> <code>close(</code><code>boolean</code> <code>abort) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>if</code> <code>(!abort) {</code>
<code> </code><code>Map<Integer,SolrTable> maps = </code><code>new</code> <code>SolrServerCustom().solrTable;</code>
<code> </code><code>for</code><code>(Map.Entry<Integer, SolrTable> entry:maps.entrySet()){</code>
<code> </code><code>entry.getValue().commit();</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>table.rollback();</code>
8.鎖的問題,從nginx端看到大量的302 ,solr日志看到有鎖的問題,調整參數,在solr啟動時釋放鎖
solr端日志:
<code>userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked </code><code>for</code> <code>write </code><code>for</code> <code>core userinfo</code>
解決:solrconfig.xml中設定
<code><unlockOnStartup></code><code>true</code><code><</code><code>/unlockOnStartup</code><code>></code>
原因:
org.apache.solr.core.SolrCore初始化時使用IndexWriter.isLocked(dir)判斷是否加鎖,如果已經加了鎖,則分為兩種情況,一種是在solrconfig.xml中配置了unlockOnStartup,會嘗試unlock,如果沒有配置unlockStartup,則會抛出Index locked for write for core異常
根據堆棧可以看對應代碼:
org.apache.solr.core.SolrCore 構造函數中會調用initIndex方法:
32
33
34
35
36
37
38
39
40
41
42
43
<code> </code><code>void</code> <code>initIndex(</code><code>boolean</code> <code>reload) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>String indexDir = getNewIndexDir();</code>
<code> </code><code>boolean</code> <code>indexExists = getDirectoryFactory().exists(indexDir);</code>
<code> </code><code>boolean</code> <code>firstTime;</code>
<code> </code><code>synchronized</code> <code>(SolrCore.</code><code>class</code><code>) {</code>
<code> </code><code>firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));</code>
<code> </code><code>}</code>
<code> </code><code>boolean</code> <code>removeLocks = solrConfig.unlockOnStartup; </code><code>// unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 預設為false</code>
<code> </code><code>initIndexReaderFactory();</code>
<code> </code><code>if</code> <code>(indexExists && firstTime && !reload) {</code>
<code> </code>
<code> </code><code>Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,</code>
<code> </code><code>getSolrConfig().indexConfig.lockType);</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>if</code> <code>(IndexWriter.isLocked(dir)) {</code>
<code> </code><code>if</code> <code>(removeLocks) {</code>
<code> </code><code>log.warn(</code>
<code> </code><code>logid</code>
<code> </code><code>+ </code><code>"WARNING: Solr index directory '{}' is locked. Unlocking..."</code><code>,</code>
<code> </code><code>indexDir);</code>
<code> </code><code>IndexWriter.unlock(dir); </code><code>//解鎖</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>log.error(logid</code>
<code> </code><code>+ </code><code>"Solr index directory '{}' is locked. Throwing exception"</code><code>,</code>
<code> </code><code>throw</code> <code>new</code> <code>LockObtainFailedException(</code>
<code> </code><code>"Index locked for write for core "</code> <code>+ name);</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>} </code><code>finally</code> <code>{</code>
<code> </code><code>directoryFactory.release(dir);</code>
<code> </code><code>}</code>
<code> </code><code>// Create the index if it doesn't exist.</code>
<code> </code><code>if</code><code>(!indexExists) {</code>
<code> </code><code>log.warn(logid+</code><code>"Solr index directory '"</code> <code>+ </code><code>new</code> <code>File(indexDir) + </code><code>"' doesn't exist."</code>
<code> </code><code>+ </code><code>" Creating new index..."</code><code>);</code>
<code> </code><code>SolrIndexWriter writer = SolrIndexWriter.create(</code><code>"SolrCore.initIndex"</code><code>, indexDir, getDirectoryFactory(), </code><code>true</code><code>,</code>
<code> </code><code>getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);</code>
<code> </code><code>writer.close();</code>
<code> </code><code>}</code>
9.tomcat的配置導緻的問題,每台機器兩個solr執行個體,其中一個一直不能啟動(在執行個體化core時會嘗試擷取鎖,這裡擷取鎖失敗,可以手動删除write.lock)
最終發現是兩個tomcat寫到了一個solr目錄裡面
<code>Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@</code><code>/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write</code><code>.lock</code>
<code> </code><code>at org.apache.lucene.store.Lock.obtain(Lock.java:89)</code>
<code> </code><code>at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710)</code>
<code> </code><code>at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77)</code>
<code> </code><code>at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)</code>
<code> </code><code>at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)</code>
<code> </code><code>at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)</code>
<code> </code><code>at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)</code>
<code> </code><code>... 12 </code><code>more</code>
10.部分job運作緩慢,其中一個job運作了11個小時。。
資料寫入時發生在mapoperator或者reduceoperator中,多少個map或者reduce就是多少個并發線程寫入。job隻有一個reduce,導緻寫入緩慢,調整reduce的數量到100(set mapreduce.job.reduces=100)後,性能大幅度提升,3kw資料導入時間由40916s下降到993s。
本文轉自菜菜光 51CTO部落格,原文連結:http://blog.51cto.com/caiguangguang/1612601,如需轉載請自行聯系原作者