天天看點

大資料項目實戰之新聞話題統計分析Java異常: "2 位元組的 UTF-8 序列的位元組 2 無效。"  

前言:本文是一個完整的大資料項目實戰,實時|離線統計分析使用者的搜尋話題,并用JavaEE工程前端界面展示出來。這些名額對網站的精準營銷、營運都有極大幫助。架構大緻是按照企業标準來的,從日志的采集、轉化處理、實時計算、JAVA背景開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的架構包括 :Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Streaming )+Hive+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的語言包括:JAVA、Scala、Shell。 

項目部署位址:

http://120.79.35.74:443/Hot_News_Web/ 項目源代碼聯系郵箱: [email protected]

項目架構圖:

     一:資料源處理(搜狗實驗室擷取新聞資源 XML——>TXT:java解析大批量xml檔案 代碼後貼)

       處理思路:利用SAXReader擷取xml檔案内容,并建構News實體類以便寫入txt檔案,然後編寫ReadWebLog類并編寫腳本運作在Liunx上模拟新聞搜尋日志産生

       Liunx運作jar指令:java -jar 你的上傳jar包所在目錄  args0 args1

       或Shell腳本指令:

#/bin/bash

echo "start log"

java -jar 你的上傳jar包所在目錄  args0 args1

代碼:

處理搜狗實驗室中繼資料.xml----->txt

package cn.yusys.hotnews.datasource; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; /** * 解析搜狗實驗室新聞xml檔案為txt檔案---->項目資料源 * @author Tangzhi mail: * Created on 2018年11月12日 */ public class MyXMLReader2JDOM {     public static void main(String[] args) {         // 擷取xml檔案讀取流         SAXReader reader = new SAXReader();         // 設定字元集編碼方式         reader.setEncoding("utf-8");         Document document;         Element rootElement;         List<Element> docList;         Iterator<Element> iterator;         // 用于存放節點資料以便後面的寫入之news.log         ArrayList<News> list = new ArrayList<News>();         // 開始進行讀取         try {             document = reader.read(new File("D:\\Downloads\\大資料資料源\\news_tensite_xml.smarty.dat"));             // 得到根節點元素 <docs>...</docs>             rootElement = document.getRootElement();             //<doc>...<doc>             docList = rootElement.elements("doc");          /*           * 得到xml具體配置檔案資訊           */             iterator = docList.iterator();             for (Element e : docList) {                 News news = new News();                 /**                  * 周遊子節點将具體新聞資訊寫入txt檔案                  */                 if (e.element("url") != null && !" ".equals(e.element("url"))) {                     news.setUrl(e.element("url").getStringValue().trim());                 }                 if (e.element("docno") != null && !" ".equals(e.element("docno"))) {                     news.setDocno(e.element("docno").getStringValue().trim());                 if (e.element("contenttitle") != null && !" ".equals(e.element("contenttitle"))) {                     news.setContenttitle(e.element("contenttitle").getStringValue().trim());                 if (e.element("content") != null && !" ".equals(e.element("content"))) {                     news.setContent(e.element("content").getStringValue().trim());                 list.add(news);             }             /**              * 進行寫入txt檔案              */             writwToFile(list);         } catch (Exception e) {             e.printStackTrace();         }     }     /**      * 寫入txt檔案(後期當源資料檔案過大時進行分片處理)      * @throws IOException      */     public static void writwToFile(List<News> list) throws IOException {         File file = new File("D:\\Downloads\\大資料資料源\\news2.log");         BufferedWriter bw = new BufferedWriter(new FileWriter(file));         if (!file.exists()) {             try {                 file.createNewFile();             } catch (IOException e) {                 e.printStackTrace();         } else {             for (News news : list) {                 Date date = new Date();                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                 String dateStr = sdf.format(date);                 bw.write("datetime"+"="+dateStr+"|");                 bw.write("url"+"="+news.getUrl()+"|");                 bw.write("docno"+"="+news.getDocno()+"|");                 bw.write("contenttitle"+"="+news.getContenttitle()+"|");                 bw.write("content"+"="+news.getContent());                 bw.write("\n");                 bw.flush(); } ---------------------------------------------------------------------------------------------------------------- *xml解析時新聞實體類 public class News implements Serializable{      // 實作序列化接口以便多台機器同時解析    public News () {    }    public News(String url, String docno, String contenttitle, String content) {     super();     this.url = url;     this.docno = docno;     this.contenttitle = contenttitle;     this.content = content; String url;    String docno;    String contenttitle;    String content; public String getUrl() {     return url; public void setUrl(String url) { public String getDocno() {     return docno; public void setDocno(String docno) { public String getContenttitle() {     return contenttitle; public void setContenttitle(String contenttitle) { public String getContent() {     return content; public void setContent(String content) { ----------------------------------------------------------------------------------------- 運作在Liunx上模拟日志産生并通過flume采集 import java.io .*; * 模拟日志伺服器産生日(從news.log/news1.log中随機切換檔案讀取資料然後寫入日志檔案-----》然後使用進行flume采集) * @date 2018年11月12日 public class ReadWebLog {     public static String readFileName;     public static String writeFileName;     public static void main (String[] args) {          readFileName = args[0];          writeFileName = args[1];          readFile(readFileName);      * 從new.log/news1.log中随機讀取日志資訊     public static void readFile(String fileName){             FileInputStream fs = new FileInputStream(fileName);             // 轉換流             InputStreamReader isr = new InputStreamReader(fs,"utf-8");             BufferedReader br = new BufferedReader(isr);             int count = 0;             while (br.readLine() != null){                 String line = br.readLine();                 count ++;                 // 自定義讀取間隔毫秒                 Thread.sleep(1000);                 System.out.println("row:" + count + ">>>>>>>>" + line);                  * 寫入到指定檔案中(與flume配置檔案對應)                 writeFile(writeFileName,line);      * 檔案内容的寫入     public static void writeFile (String fileName,String line) {             FileOutputStream fs = new FileOutputStream(fileName, true);             OutputStreamWriter osw = new OutputStreamWriter(fs);             BufferedWriter bw = new BufferedWriter(osw);             // 執行檔案内容的寫入             bw.write(line);             bw.write("\n");             bw.close();

Q&A

Q1:

Java異常: "2 位元組的 UTF-8 序列的位元組 2 無效。"  

A1:利用記事本打開 另行儲存編碼格式為UTF-8 再Notepad++(其他編輯器亦可)用打開即可

Q2 :

在Liunx系統上運作jar時出現找不到主類

A1 :使用IDEA時pom.xml加入以下依賴并在<mainClass></mainClass>部分寫入你類全路徑

<build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-shade-plugin</artifactId>                 <version>2.4.3</version>                 <executions>                     <execution>                         <phase>package</phase>                         <goals>                             <goal>shade</goal>                         </goals>                         <configuration>                             <filters>                                 <filter>                                     <artifact>*:*</artifact>                                     <excludes>                                         <exclude>META-INF/*.SF</exclude>                                         <exclude>META-INF/*.DSA</exclude>                                         <exclude>META-INF/*.RSA</exclude>                                     </excludes>                                 </filter>                             </filters>                             <transformers>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                     <mainClass></mainClass>                                 </transformer>                             </transformers>                         </configuration>                     </execution>                 </executions>             </plugin>         </plugins>     </build>

Liunx效果圖:

附 :當資料源檔案很大時,可以在ReadWebLog類中進行随機讀取多個log檔案設定也可是模拟資料更為真實

      搜狗實驗室:中文資訊處理以及部分資料提供  

http://www.sogou.com/labs/

注:根據項目需求進行資料前期清洗

到此:資料源擷取 簡單清洗(uid設定  時間戳綁定 部分資料過濾)已經完成

     二 :Flume與HBase、Kafka資料對接

處理思路:Flume采集資料源資料與HBase內建----->HBase與Hive對接------>Hadoop MapReduce 完成離線計算分析----->前台Echart

                 Flume采集資料源資料與Kafka內建----->Kafka與Spark Streaming對接----->Spark + MySql 完成實時計算分析----->前台Echart

1.Flume與HBase

    Tips:此時Flume Sink 下沉目标為HBase (前提:Liunx環境安裝HBase,了解HBase原理)修改hbase-site.xml 修改hbase資料目錄 zookeeper位址

               HBase常用Shell指令:

                                            啟動HBase: strat-hbase.sh

                                            Shell指令互動模式:./hbase shell

                                                     建立表 : create '表名', '列族名1','列族名2','列族名N'.......

                 HBase配置檔案下圖:

##hbase-env.sh export JAVA_HOME=/opt/jdk1.7.0_65   ----自己虛拟機jdk路徑(etc/profile) export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export HBASE_MANAGES_ZK=false ----不使用HBase自帶zookeeper使用外部zookeeper叢集 ##hbase-site.xml <configuration> <property> <name>hbase.master</name> <value>192.168.25.136:60000</value> </property> <name>hbase.master.maxclockskew</name> <value>180000</value> <name>hbase.rootdir</name> <value>hdfs://192.168.25.136:9000/hbase</value> <name>hbase.cluster.distributed</name> <value>true</value> <name>hbase.zookeeper.quorum</name> <value>192.168.25.136</value> <name>hbase.zookeeper.property.dataDir</name> <value>/tmp/hbase_data</value> </configuration> ##regionservers localhost ---填寫叢集IP位址或主機名

Q1:建立HBase表時報錯: ERROR:Can not get address from Zookeeper; znode data == null

A1:1、確定zookeeper叢集可用 Hadoop叢集可用 HBase服務正常啟動

        2、vi hbase-site.xml 檢視HBase資料存放目錄權限是否為可讀可寫  

Q2:HBase0.9.4 通過腳本啟動後建立表時報錯  

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

A2 : 

   1.檢視hadoop叢集 zookeeper叢集是否可用

   2.HBase 0.9.4 與 Hadoop 2.x 相容性差 版本不一緻 導緻無法進行RPC通信 建議搭配:HBase 0.9.9.x + Hadoop 2.x但請注意此時

      Flume最好選擇1.7.x 原因見下圖:

1.Flume與Kafka

    Tips:此時Flume Sink 下沉目标為Kafka(前提:Liunx環境安裝Kafka,了解Kafka原理) 

               Kafka常用Shell指令:

                                          啟動kafka: bin/kafka-servers-start   ../config/server.properties  (指定特定檔案啟動)

                                          建立主題:bin/kafka-topics.sh --create --zookeeper 192.168.25.136:2181 --replication-factor 1 --partitions 1 --topic weblogs

                                          删除主題:bin/kafka-topics.sh --delete --zookeeper 192.168.25.136:2181 --topic weblogs

                                          控制台消費topic的資料:bin/kafka-console-consumer.sh --zookeeper 192.168.25.136:2181 --topic weblogs  --from-beginning

                                          控制台生産資料:bin/kafka-console-producer.sh --broker-list 192.168.25.136:9092 --topic weblogs

                                          檢視主題具體資訊:bin/kafka-topics.sh --zookeeper 192.168.25.136:2181 --describe --topic weblogs

           Kafka配置檔案 (kafka在大資料項目中大多作為資料緩沖區  生産者-消費者模式)

#broker的全局唯一編号,不能重複 broker.id =0 #用來監聽連結的端口,producer或consumer将在此端口建立連接配接 port=9092 #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤IO的線程數量 num.io.threads=8 #發送套接字的緩沖區大小 socket.send.buffer.bytes=102400 #接受套接字的緩沖區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩沖區大小 socket.request.max.bytes= 104857600 #kafka運作日志存放的路徑 log.dirs=/export/logs/kafka #topic在目前broker上的分片個數 num.partitions=2 #用來恢複和清理data下資料的線程數量 num.recovery.threads.per.data.dir=1 #segment檔案保留的最長時間,逾時将被删除 log.retention.hours=168 #滾動生成新的segment檔案的最大時間 log.roll.hours=168 #日志檔案中每個segment的大小,預設為1G log.segment.bytes= 1073741824 #周期性檢查檔案大小的時間 log.retention.check.interval.ms=300000 #日志清理是否打開 log.cleaner.enable=true #broker需要使用zookeeper儲存meta資料 zookeeper.connect=192.168.25.136:2181,192.168.25.136:2182,192.168.25.136:2183 #zookeeper連結逾時時間 zookeeper.connection.timeout.ms=6000 #partion buffer中,消息的條數達到門檻值,将觸發flush到磁盤 log.flush.interval.messages=10000 #消息buffer的時間,達到門檻值,将觸發flush到磁盤 log.flush.interval.ms=3000 #删除topic需要server.properties中設定delete.topic.enable=true否則隻是标記删除 delete.topic.enable=true #此處的host.name為本機IP(重要),如果不改,則用戶端會抛出:Producer connection to localhost:9092 unsuccessful 錯誤! host.name=192.168.25.136

  當kafka消費的資料與HBase的weblogs表記錄總數相等時說明已完成Flume與HBase、Kafka資料對接

   Flume啟動日志圖:

HBase count表中資料:

Kafka消費資料:

附:Flume采集資料按特定列下沉至Hbase、Kafka配置檔案(重點)

a1.sources = r1 a1.channels = kafkaC hbaseC a1.sinks = kafkaS hbaseS a1.sources.r1.type = exec   a1.sources.r1.command = tail -F /home/hotnews/weblogs.log a1.sources.r1.channels = kafkaC hbaseC # flume + hbase # sink 配置為HBaseSink 和 SimpleHbaseEventSerializer a1.channels.hbaseC.type = memory a1.channels.hbaseC.capacity = 10000 a1.channels.hbaseC.transactionCapacity = 10000 #HBase表名 a1.sinks.hbaseS.type = org.apache.flume.sink.hbase.HBaseSink a1.sinks.hbaseS.table = weblogs #HBase表的列族名稱 a1.sinks.hbaseS.columnFamily  = info a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer #正則比對新聞資料去到對應的列族下的對應列(xxxx|xxxx|xxxx|xxxx|xxxx) a1.sinks.hbaseS.serializer.regex = ^(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*) a1.sinks.hbaseS.serializer.colNames =datatime,url,docno,contenttitle,content #a1.sinks.hbaseS.serializer.rowKeyIndex = 0    # 組合sink和channel a1.sinks.hbaseS.channel = hbaseC # flume + kafka a1.channels.kafkaC.type = memory a1.channels.kafkaC.capacity = 10000 a1.channels.kafkaC.transactionCapacity = 10000 a1.sinks.kafkaS.channel = kafkaC a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.kafkaS.topic = weblogs a1.sinks.kafkaS.brokerList = 192.168.25.136:9092 a1.sinks.kafkaS.zookeeperConnect = 192.168.25.136:2181 a1.sinks.kafkaS.requiredAcks = 1 a1.sinks.kafkaS.batchSize = 20 a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder

三 :Kafka+Spark Streaming +MySql 實時計算分析

1、資料庫連接配接池編寫(Java原生版 + Scala c3p0版)

      附:MySql部署在Liunx,連接配接資訊配置在db.properties

Java原生版:

package cn.yuysy.hotnews.realtime.db; import java.io.FileInputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedList; import java.util.Properties; * 資料庫連接配接池 * Created on 2018-11-15 * @author @author tangzhi mail: public class ConnectionPool {            private static LinkedList<Connection> connectionQueue;            private static Properties prop ;      * 驅動類     static {             prop  = new Properties();             prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\db\\db.properties")));             Class.forName(prop.getProperty("driverName").toString());      * 擷取連接配接對象     public synchronized Connection getConnection () {         if (connectionQueue == null || connectionQueue.size() == 0) {             connectionQueue = new LinkedList<Connection>();             for (int i = 0;i < 5;i ++) {                 try {                     Connection connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("username").toString(), prop.getProperty("password").toString());                     connectionQueue.add(connection);                 } catch (SQLException e) {                     e.printStackTrace();         return connectionQueue.poll();      * 歸還連接配接至連接配接池     public void returnConnection(Connection connection) {         connectionQueue.add(connection);

Scala c3p0版:

package cn.yuysy.hotnews.realtime.db .{File, FileInputStream, InputStream} import java.sql.Connection import java.util.Properties import com.mchange.v2.c3p0.ComboPooledDataSource import org.apache.spark.SparkFiles   * C3P0資料庫連接配接池   * Created on 2018-11-15   * @author tangzhi mail:   */ class c3p0ConnectionPool(isLocal: Boolean) extends Serializable {      private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)      private val prop = new Properties()      private var in: InputStream = _      isLocal match {        case true => in = getClass.getResourceAsStream("db.properties")  //本地IDEA模式        case false => in = new FileInputStream(new File(SparkFiles.get("db.properties"))) //Liunx spark叢集模式      }   /**     * 注冊連接配接     * @return     */     try {       prop.load(in);       cpds.setJdbcUrl(prop.getProperty("url").toString())       cpds.setDriverClass(prop.getProperty("driverName").toString())       cpds.setUser(prop.getProperty("username").toString())       cpds.setPassword(prop.getProperty("password").toString())       cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString()));       cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString()));       cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString()))     } catch {       case ex: Exception => ex.printStackTrace()   def getConnection: Connection={          try {            cpds.getConnection()          } catch {            case ex: Exception => ex.printStackTrace()              null          }    object c3p0ConnectionPool{      var  connectionPool: c3p0ConnectionPool = _      def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = {            synchronized {               if (connectionPool == null) {                 connectionPool = new c3p0ConnectionPool(isLocal)               }            }        connectionPool

Q1:本地運作spark streaming 程式讀取kafka資料報錯:

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute at scala

A1:spark streaming 算子運作沒有觸發Action 以下為常見action

Q2:spark streamign 實時計算處理後的資料寫入MySqL亂碼

A2:在資料庫連接配接配置檔案的URL後添加: ?useUnicode=true&characterEncoding=utf8即可

2、實時分析思路 + 部分代碼

實時分析思路:

       從kafka讀取資料後(_._2)----->新聞資料------>先将value映射為Map[String,String]----->切割、根據key分組、聚合----->根據key值寫sql------>寫入MySql成功

部分代碼:

package cn.yuysy.hotnews.realtime .{File, FileInputStream} import java.sql.{Connection, Statement} import cn.yuysy.hotnews.realtime.db.c3p0ConnectionPool import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}   * spark streaming從kafka擷取資料進行實時計算然後下沉至MySql object HotnewsRealTimeStreaming {   val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {     //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))     iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }   }   def main(args: Array[String]): Unit = {     // 設定日志列印級别      LoggerLevels.setStreamingLogLevels()      val prop = new Properties()      prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\sparkstreaming-kafka.properties")))      val sparkConf = new SparkConf().setAppName("HotnewsRealTimeStreaming").setMaster("local[2]")      val ssc = new StreamingContext(sparkConf,Seconds(3))     //檢查點檔案存放位址      ssc.checkpoint("d://ck")     // 擷取kafka主題(多個kafka主題)      val topicMap = prop.getProperty("topics").split(",").map((_, prop.getProperty("numThreads").toInt)).toMap     // 從kafka定時批量擷取新聞資料      val line = KafkaUtils.createStream(ssc,prop.getProperty("zkQuorum"),prop.getProperty("group"),topicMap,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)       * spark 實時從kafka上拉取資料存入MySql資料庫       */     // 構造連接配接、結果集     var conn: Connection = null     var smt: Statement = null     // 1.将kafka value即新聞資料存入Map[String,String] ---> ["datetime","2018-11-18"]     def valueSplit(value: String): Map[String,String] = {           val x = value.split("\\|")           var valueMap: Map[String,String] = Map()            x.foreach(kvs => {             val kv = kvs.split("=")             if (kv != null && kv.length == 2) {               valueMap += (kv(0) -> kv(1))            })        valueMap       * spark streaming 實時計算分析 寫入MySql資料庫     line.flatMap(_.split("\\|")).map(valueSplit).map(x => {       (x.getOrElse("contenttitle",null),1)     }).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true).map(data => {       if (data._1 != null) {           conn = c3p0ConnectionPool.getc3p0ConnectionPool(true).getConnection           conn.setAutoCommit(false)           smt = conn.createStatement()           val updateSql = "UPDATE testweblogs SET count = '"+data._2+"' where contenttitle = '" + data._1 + "'" //        val insertSql = "INSERT INTO testweblogs(contenttitle,count) VALUES ('" + data._1 + "','" + data._2 + "')"           smt.execute(updateSql) conn.commit()         } catch {           case ex => ex.printStackTrace()         } finally {           if (smt != null) {             smt.close()           }           if (conn != null) {             conn.close()       }     }).print()      ssc.start()      ssc.awaitTermination() object c3p0ConnectionPool{   var  connectionPool: c3p0ConnectionPool = _   def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = {     synchronized {       if (connectionPool == null) {         connectionPool = new c3p0ConnectionPool(isLocal)     connectionPool

四 :Kafka+HBase+Hive(Hadoop) 離線計算分析

1、Kafka對接Hbase已經完成

2、HBase對接Hive

      處理思路:在Hive建立外部表weblogs連接配接HBase對應表weblogs

CREATE EXTERNAL TABLE weblogs(`id` string , `datatime` string , `url` string, `docno` string, `contenttitle` string,`content` string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'   //指定存儲處理器 WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:datatime,info:url,info:docno,info:contenttitle,info:content")   //聲明列族,列名 TBLPROPERTIES ("hbase.table.name" = "weblogs", "hbase.mapred.output.outputtable" = "weblogs"); //hbase.table.name聲明HBase表名,為可選屬性預設與Hive的表名相同, hbase.mapred.output.outputtable指定插入資料時寫入的表,如果以後需要往表插入資料就需要指定該值

離線分析思路: 

       資料高速存入HBase---->通過外部表同步至Hive------>書寫SQL完成資料離線統計---->選擇時間間隔存入MySql----->前台展示

附:IDE遠端通路Hive必須開啟hiveserver2服務不然會出現連接配接拒絕錯誤

       啟動指令:./hive --service hiveserver2

package cn.yusys.hotnews.offline; import cn.yusys.hotnews.dataconnection.ConnectionPool; import java.sql.*; * @ClassName HotnewsOfflineStreaming * @Description 從Hive讀取資料存入MySql * @Author tangzhi mail: * * Created on 2018-11-20 **/ public class HotnewsOfflineStreaming {     public static void main (String[] args) throws IOException {         Connection connection = null ,connection1 = null;         Statement statement = null ,statement1 = null;         Properties prop = new Properties();         prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\java\\cn\\yusys\\hotnews\\offline\\db.properties")));         /**          * 1.從Hive讀取資料          */             connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("user").toString(), prop.getProperty("password").toString());             connection.setAutoCommit(false);             statement = connection.createStatement();             String hiveSql = "select count(1) from weblogs";             ResultSet resultSet = statement.executeQuery(hiveSql);             while (resultSet.next()) {                 // 根據項目需求書寫Sql                 int count = resultSet.getInt(1);                  * 統計後的資料                 ConnectionPool connPool = new ConnectionPool();                 connection1 = connPool.getConnection();                 connection1.setAutoCommit(false);                 statement1 = connection1.createStatement();                 String sql = "INSERT INTO testweblogs2(count) VALUE('"+count+"')";                 boolean execute = statement1.execute(sql);                 connection1.commit();                 if (!execute) {                     // excute:false ....>插入或更新成功  true:查詢成功                     System.out.println("離線分析資料庫更新成功");                 }else {                     System.out.println("離線分析資料庫更新失敗");             // 釋放資源             if (connection != null && connection1 != null && statement != null && statement1 != null) {                     statement.close();                     statement1.close();                     connection.close();                     connection1.close();

至此大資料分析處理部分已經結束接下來即是建構JavaEE工程進行前端WEB展示

五 :JavaEE工程前端展示(SSM)

1、SSM工程搭建(略)

2、采用WebSocket全雙工通信方式使前段與後端實作實時交換

Q1:測試SSM架構搭建時出現資料無法加載問題

A1:1.當配置檔案中沒有指定mapper.xml具體路徑時應把Mapper類與Mapper.xml檔案放在同一包下

        2.也有可能spring無法加載到mapper.xml檔案可在項目pom.xml中添加

<resources>

    <resource>

        <directory>src/main/java</directory>

        <includes>

            <include>**/*.xml</include>

        </includes>

    </resource>

        <directory>src/main/resources</directory>

</resources>

Q1:通過WebSocke通信時報錯:

A1:1.若是在MVC模式下确認類上是否有@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)注解

        2.伺服器版本要求:

        3.配置檔案xml應當是如下配置

           其中 <scope>provided</scope>沒寫則會造成runtime 沖突 也會引起404 not found

<dependency>

            <groupId>javax.websocket</groupId>

            <artifactId>javax.websocket-api</artifactId>

            <version>1.0</version>

            <scope>provided</scope>

</dependency>

 #部分主要代碼                

package service.impl;

import com.alibaba.fastjson.JSON;

import entity.Weblog;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.socket.server.standard.SpringConfigurator;

import service.WeblogService;

import javax.websocket.OnClose;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.ServerEndpoint;

import java.util.HashMap;

import java.util.Map;

* @ClassName WebSocket

* @Description 實作前端與資料庫全雙工通信

* @Author Administrator

* @Version 1.0

@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)

public class WebSocket {

    @Autowired

    private WeblogService weblogService;

    @OnMessage

    public void onMessage(String message, Session session) throws IOException, InterruptedException {

           String[] titleNames = new String[10];

           Long[]   titleCounts = new Long[10];

           Long[] titleSum = new Long[1];

           while (true) {

               Map<String,Object> map = new HashMap<String, Object>();

               List<Weblog> weblogList = weblogService.webcount();

               System.out.println(weblogList);

               for (int i = 0;i<weblogList.size();i++) {

                    titleNames[i] = weblogList.get(i).getContenttitle();

                    titleCounts[i] = weblogList.get(i).getCount();

               }

               titleSum[0] = weblogService.websum();

               map.put("titleName", titleNames);

               map.put("titleCount", titleCounts);

               map.put("titleSum", titleSum);

               System.out.print(map);

               session.getBasicRemote().sendText(JSON.toJSONString(map));

               Thread.sleep(1000);

               map.clear();

    @OnOpen

    public void onOpen() {

        System.out.println("Client connected");

    @OnClose

    public void onClose() {

        System.out.println("Connection closed");

至此前端展示工程也已經完成,也就意味着項目一期完成

注:原項目貢獻者阿裡雲雲栖社群位址:

https://yq.aliyun.com/articles/557454