Sqoop可以在HDFS/Hive和關系型資料庫之間進行資料的導入導出,其中主要使用了import和export這兩個工具。這兩個工具非常強大,提供了很多選項幫助我們完成資料的遷移和同步。比如,下面兩個潛在的需求:
業務資料存放在關系資料庫中,如果資料量達到一定規模後需要對其進行分析或同統計,單純使用關系資料庫可能會成為瓶頸,這時可以将資料從業務資料庫資料導入(import)到Hadoop平台進行離線分析。
對大規模的資料在Hadoop平台上進行分析以後,可能需要将結果同步到關系資料庫中作為業務的輔助資料,這時候需要将Hadoop平台分析後的資料導出(export)到關系資料庫。
這裡,我們介紹Sqoop完成上述基本應用場景所使用的import和export工具,通過一些簡單的例子來說明這兩個工具是如何做到的。
工具通用選項
import和export工具有些通用的選項,如下表所示:
<b>選項</b>
<b>含義說明</b>
<code>--connect <jdbc-uri></code>
指定JDBC連接配接字元串
<code>--connection-manager <class-name></code>
指定要使用的連接配接管理器類
<code>--driver <class-name></code>
指定要使用的JDBC驅動類
<code>--hadoop-mapred-home <dir></code>
指定$HADOOP_MAPRED_HOME路徑
<code>--help</code>
列印用法幫助資訊
<code>--password-file</code>
設定用于存放認證的密碼資訊檔案的路徑
<code>-P</code>
從控制台讀取輸入的密碼
<code>--password <password></code>
設定認證密碼
<code>--username <username></code>
設定認證使用者名
<code>--verbose</code>
列印詳細的運作資訊
<code>--connection-param-file <filename></code>
可選,指定存儲資料庫連接配接參數的屬性檔案
資料導入工具import
import工具,是将HDFS平台外部的結構化存儲系統中的資料導入到Hadoop平台,便于後續分析。我們先看一下import工具的基本選項及其含義,如下表所示:
<code>--append</code>
将資料追加到HDFS上一個已存在的資料集上
<code>--as-avrodatafile</code>
将資料導入到Avro資料檔案
<code>--as-sequencefile</code>
将資料導入到SequenceFile
<code>--as-textfile</code>
将資料導入到普通文本檔案(預設)
<code>--boundary-query <statement></code>
邊界查詢,用于建立分片(InputSplit)
<code>--columns <col,col,col…></code>
從表中導出指定的一組列的資料
<code>--delete-target-dir</code>
如果指定目錄存在,則先删除掉
<code>--direct</code>
使用直接導入模式(優化導入速度)
<code>--direct-split-size <n></code>
分割輸入stream的位元組大小(在直接導入模式下)
<code>--fetch-size <n></code>
從資料庫中批量讀取記錄數
<code>--inline-lob-limit <n></code>
設定内聯的LOB對象的大小
<code>-m,--num-mappers <n></code>
使用n個map任務并行導入資料
<code>-e,--query <statement></code>
導入的查詢語句
<code>--split-by <column-name></code>
指定按照哪個列去分割資料
<code>--table <table-name></code>
導入的源表表名
<code>--target-dir <dir></code>
導入HDFS的目标路徑
<code>--warehouse-dir <dir></code>
HDFS存放表的根路徑
<code>--where <where clause></code>
指定導出時所使用的查詢條件
<code>-z,--compress</code>
啟用壓縮
<code>--compression-codec <c></code>
指定Hadoop的codec方式(預設gzip)
<code>--null-string <null-string></code>
果指定列為字元串類型,使用指定字元串替換值為null的該類列的值
<code>--null-non-string <null-string></code>
如果指定列為非字元串類型,使用指定字元串替換值為null的該類列的值
下面,我們通過執行個體來說明,在實際中如何使用這些選項。
将MySQL資料庫中整個表資料導入到Hive表
<code>1</code>
将MySQL資料庫workflow中project表的資料導入到Hive表中。
将MySQL資料庫中多表JION後的資料導入到HDFS
這裡,使用了<code>--query</code>選項,不能同時與<code>--table</code>選項使用。而且,變量$CONDITIONS必須在WHERE語句之後,供Sqoop程序運作指令過程中使用。上面的<code>--target-dir</code>指向的其實就是Hive表存儲的資料目錄。
将MySQL資料庫中某個表的資料增量同步到Hive表
這裡,每次運作增量導入到Hive表之前,都要修改<code>--last-value</code>的值,否則Hive表中會出現重複記錄。
将MySQL資料庫中某個表的幾個字段的資料導入到Hive表
我們這裡将MySQL資料庫workflow中tags表的id和tag字段的值導入到Hive表tag_db.tags。其中<code>--create-hive-table</code>選項會自動建立Hive表,<code>--hive-import</code>選項會将選擇的指定列的資料導入到Hive表。如果在Hive中通過SHOW TABLES無法看到導入的表,可以在conf/hive-site.xml中顯式修改如下配置選項:
<code><</code><code>property</code><code>></code>
<code>2</code>
<code></code><code><</code><code>name</code><code>>javax.jdo.option.ConnectionURL</</code><code>name</code><code>></code>
<code>3</code>
<code></code><code><</code><code>value</code><code>>jdbc:derby:;databaseName=hive_metastore_db;create=true</</code><code>value</code><code>></code>
<code>4</code>
<code></</code><code>property</code><code>></code>
然後再重新運作,就能看到了。
使用驗證配置選項
上面這個是官方使用者手冊上給出的用法,我們在實際中還沒用過這個,有感興趣的可以驗證嘗試一下。
資料導出工具export
export工具,是将HDFS平台的資料,導出到外部的結構化存儲系統中,可能會為一些應用系統提供資料支援。我們看一下export工具的基本選項及其含義,如下表所示:
<code>--validate <class-name></code>
啟用資料副本驗證功能,僅支援單表拷貝,可以指定驗證使用的實作類
<code>--validation-threshold <class-name></code>
指定驗證門限所使用的類
使用直接導出模式(優化速度)
<code>--export-dir <dir></code>
導出過程中HDFS源路徑
使用n個map任務并行導出
導出的目的表名稱
<code>--call <stored-proc-name></code>
導出資料調用的指定存儲過程名
<code>--update-key <col-name></code>
更新參考的列名稱,多個列名使用逗号分隔
<code>--update-mode <mode></code>
指定更新政策,包括:updateonly(預設)、allowinsert
<code>--input-null-string <null-string></code>
使用指定字元串,替換字元串類型值為null的列
<code>--input-null-non-string <null-string></code>
使用指定字元串,替換非字元串類型值為null的列
<code>--staging-table <staging-table-name></code>
在資料導出到資料庫之前,資料臨時存放的表名稱
<code>--clear-staging-table</code>
清除工作區中臨時存放的資料
<code>--batch</code>
使用批量模式導出
下面,我們通過執行個體來說明,在實際中如何使用這些選項。這裡,我們主要結合一個執行個體,講解如何将Hive中的資料導入到MySQL資料庫。
首先,我們準備幾個表,MySQL資料庫為tag_db,裡面有兩個表,定義如下所示:
<code>01</code>
<code>CREATE</code> <code>TABLE</code> <code>tag_db.users (</code>
<code>02</code>
<code></code><code>id</code><code>INT</code><code>(11)</code><code>NOT</code> <code>NULL</code> <code>AUTO_INCREMENT,</code>
<code>03</code>
<code></code><code>name</code> <code>VARCHAR</code><code>(100)</code><code>NOT</code> <code>NULL</code><code>,</code>
<code>04</code>
<code></code><code>PRIMARY</code> <code>KEY</code> <code>(`id`)</code>
<code>05</code>
<code>) ENGINE=InnoDB</code><code>DEFAULT</code> <code>CHARSET=utf8;</code>
<code>06</code>
<code>07</code>
<code>CREATE</code> <code>TABLE</code> <code>tag_db.tags (</code>
<code>08</code>
<code>09</code>
<code></code><code>user_id</code><code>INT</code> <code>NOT</code> <code>NULL</code><code>,</code>
<code>10</code>
<code></code><code>tag</code><code>VARCHAR</code><code>(100)</code><code>NOT</code> <code>NULL</code><code>,</code>
<code>11</code>
<code>12</code>
這兩個表中存儲的是基礎資料,同時對應着Hive中如下兩個表:
<code>CREATE</code> <code>TABLE</code> <code>users (</code>
<code></code><code>id</code><code>INT</code><code>,</code>
<code></code><code>name</code> <code>STRING</code>
<code>);</code>
<code>CREATE</code> <code>TABLE</code> <code>tags (</code>
<code></code><code>user_id</code><code>INT</code><code>,</code>
<code></code><code>tag STRING</code>
我們首先在上述MySQL的兩個表中插入一些測試資料:
<code>INSERT</code> <code>INTO</code> <code>tag_db.users(</code><code>name</code><code>)</code><code>VALUES</code><code>(</code><code>'jeffery'</code><code>);</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.users(</code><code>name</code><code>)</code><code>VALUES</code><code>(</code><code>'shirdrn'</code><code>);</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.users(</code><code>name</code><code>)</code><code>VALUES</code><code>(</code><code>'sulee'</code><code>);</code>
<code>5</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.tags(user_id, tag)</code><code>VALUES</code><code>(1,</code><code>'Music'</code><code>);</code>
<code>6</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.tags(user_id, tag)</code><code>VALUES</code><code>(1,</code><code>'Programming'</code><code>);</code>
<code>7</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.tags(user_id, tag)</code><code>VALUES</code><code>(2,</code><code>'Travel'</code><code>);</code>
<code>8</code>
<code>INSERT</code> <code>INTO</code> <code>tag_db.tags(user_id, tag)</code><code>VALUES</code><code>(3,</code><code>'Sport'</code><code>);</code>
然後,使用Sqoop的import工具,将MySQL兩個表中的資料導入到Hive表,執行如下指令行:
導入成功以後,再在Hive中建立一個用來存儲users和tags關聯後資料的表:
<code>CREATE</code> <code>TABLE</code> <code>user_tags (</code>
<code></code><code>id STRING,</code>
<code></code><code>name</code> <code>STRING,</code>
執行如下HQL語句,将關聯資料插入user_tags表:
<code>FROM</code> <code>users u</code><code>JOIN</code> <code>tags t</code><code>ON</code> <code>u.id=t.user_id</code><code>INSERT</code> <code>INTO</code> <code>TABLE</code> <code>user_tags</code><code>SELECT</code><code>CONCAT(</code><code>CAST</code><code>(u.id</code><code>AS</code> <code>STRING),</code><code>CAST</code><code>(t.id</code><code>AS</code> <code>STRING)), u.</code><code>name</code><code>, t.tag;</code>
将users.id與tags.id拼接的字元串,作為新表的唯一字段id,name是使用者名,tag是标簽名稱。
再在MySQL中建立一個對應的user_tags表,如下所示:
<code>CREATE</code> <code>TABLE</code> <code>tag_db.user_tags (</code>
<code></code><code>id</code><code>varchar</code><code>(200)</code><code>NOT</code> <code>NULL</code><code>,</code>
<code></code><code>name</code> <code>varchar</code><code>(100)</code><code>NOT</code> <code>NULL</code><code>,</code>
<code></code><code>tag</code><code>varchar</code><code>(100)</code><code>NOT</code> <code>NULL</code>
使用Sqoop的export工具,将Hive表user_tags的資料同步到MySQL表tag_db.user_tags中,執行如下指令行:
執行導出成功後,可以在MySQL的tag_db.user_tags表中看到對應的資料。
如果在導出的時候出現類似如下的錯誤:
<code>14/02/27 17:59:06 INFO mapred.JobClient: Task Id : attempt_201402260008_0057_m_000001_0, Status : FAILED</code>
<code>java.io.IOException: Can't export data, please check task tracker logs</code>
<code></code><code>at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:112)</code>
<code></code><code>at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)</code>
<code></code><code>at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)</code>
<code></code><code>at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)</code>
<code></code><code>at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)</code>
<code></code><code>at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)</code>
<code></code><code>at org.apache.hadoop.mapred.Child$4.run(Child.java:255)</code>
<code></code><code>at java.security.AccessController.doPrivileged(Native Method)</code>
<code></code><code>at javax.security.auth.Subject.doAs(Subject.java:396)</code>
<code></code><code>at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)</code>
<code>13</code>
<code></code><code>at org.apache.hadoop.mapred.Child.main(Child.java:249)</code>
<code>14</code>
<code>Caused by: java.util.NoSuchElementException</code>
<code>15</code>
<code></code><code>at java.util.AbstractList$Itr.next(AbstractList.java:350)</code>
<code>16</code>
<code></code><code>at user_tags.__loadFromFields(user_tags.java:225)</code>
<code>17</code>
<code></code><code>at user_tags.parse(user_tags.java:174)</code>
<code>18</code>
<code></code><code>at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83)</code>
<code>19</code>
<code></code><code>... 10 more</code>
通過指定字段分隔符選項<code>--input-fields-terminated-by</code>,指定Hive中表字段之間使用的分隔符,供Sqoop讀取解析,就不會報錯了。