前言:本文是一個完整的大資料項目實戰,實時|離線統計分析使用者的搜尋話題,并用JavaEE工程前端界面展示出來。這些名額對網站的精準營銷、營運都有極大幫助。架構大緻是按照企業标準來的,從日志的采集、轉化處理、實時計算、JAVA背景開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的架構包括 :Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Streaming )+Hive+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的語言包括:JAVA、Scala、Shell。
項目部署位址:
http://120.79.35.74:443/Hot_News_Web/ 項目源代碼聯系郵箱: [email protected]項目架構圖:
一:資料源處理(搜狗實驗室擷取新聞資源 XML——>TXT:java解析大批量xml檔案 代碼後貼)
處理思路:利用SAXReader擷取xml檔案内容,并建構News實體類以便寫入txt檔案,然後編寫ReadWebLog類并編寫腳本運作在Liunx上模拟新聞搜尋日志産生
Liunx運作jar指令:java -jar 你的上傳jar包所在目錄 args0 args1
或Shell腳本指令:
#/bin/bash
echo "start log"
java -jar 你的上傳jar包所在目錄 args0 args1
代碼:
處理搜狗實驗室中繼資料.xml----->txt
package cn.yusys.hotnews.datasource; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; /** * 解析搜狗實驗室新聞xml檔案為txt檔案---->項目資料源 * @author Tangzhi mail: * Created on 2018年11月12日 */ public class MyXMLReader2JDOM { public static void main(String[] args) { // 擷取xml檔案讀取流 SAXReader reader = new SAXReader(); // 設定字元集編碼方式 reader.setEncoding("utf-8"); Document document; Element rootElement; List<Element> docList; Iterator<Element> iterator; // 用于存放節點資料以便後面的寫入之news.log ArrayList<News> list = new ArrayList<News>(); // 開始進行讀取 try { document = reader.read(new File("D:\\Downloads\\大資料資料源\\news_tensite_xml.smarty.dat")); // 得到根節點元素 <docs>...</docs> rootElement = document.getRootElement(); //<doc>...<doc> docList = rootElement.elements("doc"); /* * 得到xml具體配置檔案資訊 */ iterator = docList.iterator(); for (Element e : docList) { News news = new News(); /** * 周遊子節點将具體新聞資訊寫入txt檔案 */ if (e.element("url") != null && !" ".equals(e.element("url"))) { news.setUrl(e.element("url").getStringValue().trim()); } if (e.element("docno") != null && !" ".equals(e.element("docno"))) { news.setDocno(e.element("docno").getStringValue().trim()); if (e.element("contenttitle") != null && !" ".equals(e.element("contenttitle"))) { news.setContenttitle(e.element("contenttitle").getStringValue().trim()); if (e.element("content") != null && !" ".equals(e.element("content"))) { news.setContent(e.element("content").getStringValue().trim()); list.add(news); } /** * 進行寫入txt檔案 */ writwToFile(list); } catch (Exception e) { e.printStackTrace(); } } /** * 寫入txt檔案(後期當源資料檔案過大時進行分片處理) * @throws IOException */ public static void writwToFile(List<News> list) throws IOException { File file = new File("D:\\Downloads\\大資料資料源\\news2.log"); BufferedWriter bw = new BufferedWriter(new FileWriter(file)); if (!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } else { for (News news : list) { Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); bw.write("datetime"+"="+dateStr+"|"); bw.write("url"+"="+news.getUrl()+"|"); bw.write("docno"+"="+news.getDocno()+"|"); bw.write("contenttitle"+"="+news.getContenttitle()+"|"); bw.write("content"+"="+news.getContent()); bw.write("\n"); bw.flush(); } ---------------------------------------------------------------------------------------------------------------- *xml解析時新聞實體類 public class News implements Serializable{ // 實作序列化接口以便多台機器同時解析 public News () { } public News(String url, String docno, String contenttitle, String content) { super(); this.url = url; this.docno = docno; this.contenttitle = contenttitle; this.content = content; String url; String docno; String contenttitle; String content; public String getUrl() { return url; public void setUrl(String url) { public String getDocno() { return docno; public void setDocno(String docno) { public String getContenttitle() { return contenttitle; public void setContenttitle(String contenttitle) { public String getContent() { return content; public void setContent(String content) { ----------------------------------------------------------------------------------------- 運作在Liunx上模拟日志産生并通過flume采集 import java.io .*; * 模拟日志伺服器産生日(從news.log/news1.log中随機切換檔案讀取資料然後寫入日志檔案-----》然後使用進行flume采集) * @date 2018年11月12日 public class ReadWebLog { public static String readFileName; public static String writeFileName; public static void main (String[] args) { readFileName = args[0]; writeFileName = args[1]; readFile(readFileName); * 從new.log/news1.log中随機讀取日志資訊 public static void readFile(String fileName){ FileInputStream fs = new FileInputStream(fileName); // 轉換流 InputStreamReader isr = new InputStreamReader(fs,"utf-8"); BufferedReader br = new BufferedReader(isr); int count = 0; while (br.readLine() != null){ String line = br.readLine(); count ++; // 自定義讀取間隔毫秒 Thread.sleep(1000); System.out.println("row:" + count + ">>>>>>>>" + line); * 寫入到指定檔案中(與flume配置檔案對應) writeFile(writeFileName,line); * 檔案内容的寫入 public static void writeFile (String fileName,String line) { FileOutputStream fs = new FileOutputStream(fileName, true); OutputStreamWriter osw = new OutputStreamWriter(fs); BufferedWriter bw = new BufferedWriter(osw); // 執行檔案内容的寫入 bw.write(line); bw.write("\n"); bw.close();Q&A
Q1:
Java異常: "2 位元組的 UTF-8 序列的位元組 2 無效。"
A1:利用記事本打開 另行儲存編碼格式為UTF-8 再Notepad++(其他編輯器亦可)用打開即可
Q2 :
在Liunx系統上運作jar時出現找不到主類
A1 :使用IDEA時pom.xml加入以下依賴并在<mainClass></mainClass>部分寫入你類全路徑
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>Liunx效果圖:
附 :當資料源檔案很大時,可以在ReadWebLog類中進行随機讀取多個log檔案設定也可是模拟資料更為真實
搜狗實驗室:中文資訊處理以及部分資料提供
http://www.sogou.com/labs/注:根據項目需求進行資料前期清洗
到此:資料源擷取 簡單清洗(uid設定 時間戳綁定 部分資料過濾)已經完成
二 :Flume與HBase、Kafka資料對接
處理思路:Flume采集資料源資料與HBase內建----->HBase與Hive對接------>Hadoop MapReduce 完成離線計算分析----->前台Echart
Flume采集資料源資料與Kafka內建----->Kafka與Spark Streaming對接----->Spark + MySql 完成實時計算分析----->前台Echart
1.Flume與HBase
Tips:此時Flume Sink 下沉目标為HBase (前提:Liunx環境安裝HBase,了解HBase原理)修改hbase-site.xml 修改hbase資料目錄 zookeeper位址
HBase常用Shell指令:
啟動HBase: strat-hbase.sh
Shell指令互動模式:./hbase shell
建立表 : create '表名', '列族名1','列族名2','列族名N'.......
HBase配置檔案下圖:
##hbase-env.sh export JAVA_HOME=/opt/jdk1.7.0_65 ----自己虛拟機jdk路徑(etc/profile) export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export HBASE_MANAGES_ZK=false ----不使用HBase自帶zookeeper使用外部zookeeper叢集 ##hbase-site.xml <configuration> <property> <name>hbase.master</name> <value>192.168.25.136:60000</value> </property> <name>hbase.master.maxclockskew</name> <value>180000</value> <name>hbase.rootdir</name> <value>hdfs://192.168.25.136:9000/hbase</value> <name>hbase.cluster.distributed</name> <value>true</value> <name>hbase.zookeeper.quorum</name> <value>192.168.25.136</value> <name>hbase.zookeeper.property.dataDir</name> <value>/tmp/hbase_data</value> </configuration> ##regionservers localhost ---填寫叢集IP位址或主機名Q1:建立HBase表時報錯: ERROR:Can not get address from Zookeeper; znode data == null
A1:1、確定zookeeper叢集可用 Hadoop叢集可用 HBase服務正常啟動
2、vi hbase-site.xml 檢視HBase資料存放目錄權限是否為可讀可寫
Q2:HBase0.9.4 通過腳本啟動後建立表時報錯
ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times
A2 :
1.檢視hadoop叢集 zookeeper叢集是否可用
2.HBase 0.9.4 與 Hadoop 2.x 相容性差 版本不一緻 導緻無法進行RPC通信 建議搭配:HBase 0.9.9.x + Hadoop 2.x但請注意此時
Flume最好選擇1.7.x 原因見下圖:
1.Flume與Kafka
Tips:此時Flume Sink 下沉目标為Kafka(前提:Liunx環境安裝Kafka,了解Kafka原理)
Kafka常用Shell指令:
啟動kafka: bin/kafka-servers-start ../config/server.properties (指定特定檔案啟動)
建立主題:bin/kafka-topics.sh --create --zookeeper 192.168.25.136:2181 --replication-factor 1 --partitions 1 --topic weblogs
删除主題:bin/kafka-topics.sh --delete --zookeeper 192.168.25.136:2181 --topic weblogs
控制台消費topic的資料:bin/kafka-console-consumer.sh --zookeeper 192.168.25.136:2181 --topic weblogs --from-beginning
控制台生産資料:bin/kafka-console-producer.sh --broker-list 192.168.25.136:9092 --topic weblogs
檢視主題具體資訊:bin/kafka-topics.sh --zookeeper 192.168.25.136:2181 --describe --topic weblogs
Kafka配置檔案 (kafka在大資料項目中大多作為資料緩沖區 生産者-消費者模式)
#broker的全局唯一編号,不能重複 broker.id =0 #用來監聽連結的端口,producer或consumer将在此端口建立連接配接 port=9092 #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤IO的線程數量 num.io.threads=8 #發送套接字的緩沖區大小 socket.send.buffer.bytes=102400 #接受套接字的緩沖區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩沖區大小 socket.request.max.bytes= 104857600 #kafka運作日志存放的路徑 log.dirs=/export/logs/kafka #topic在目前broker上的分片個數 num.partitions=2 #用來恢複和清理data下資料的線程數量 num.recovery.threads.per.data.dir=1 #segment檔案保留的最長時間,逾時将被删除 log.retention.hours=168 #滾動生成新的segment檔案的最大時間 log.roll.hours=168 #日志檔案中每個segment的大小,預設為1G log.segment.bytes= 1073741824 #周期性檢查檔案大小的時間 log.retention.check.interval.ms=300000 #日志清理是否打開 log.cleaner.enable=true #broker需要使用zookeeper儲存meta資料 zookeeper.connect=192.168.25.136:2181,192.168.25.136:2182,192.168.25.136:2183 #zookeeper連結逾時時間 zookeeper.connection.timeout.ms=6000 #partion buffer中,消息的條數達到門檻值,将觸發flush到磁盤 log.flush.interval.messages=10000 #消息buffer的時間,達到門檻值,将觸發flush到磁盤 log.flush.interval.ms=3000 #删除topic需要server.properties中設定delete.topic.enable=true否則隻是标記删除 delete.topic.enable=true #此處的host.name為本機IP(重要),如果不改,則用戶端會抛出:Producer connection to localhost:9092 unsuccessful 錯誤! host.name=192.168.25.136當kafka消費的資料與HBase的weblogs表記錄總數相等時說明已完成Flume與HBase、Kafka資料對接
Flume啟動日志圖:
HBase count表中資料:
Kafka消費資料:
附:Flume采集資料按特定列下沉至Hbase、Kafka配置檔案(重點)
a1.sources = r1 a1.channels = kafkaC hbaseC a1.sinks = kafkaS hbaseS a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hotnews/weblogs.log a1.sources.r1.channels = kafkaC hbaseC # flume + hbase # sink 配置為HBaseSink 和 SimpleHbaseEventSerializer a1.channels.hbaseC.type = memory a1.channels.hbaseC.capacity = 10000 a1.channels.hbaseC.transactionCapacity = 10000 #HBase表名 a1.sinks.hbaseS.type = org.apache.flume.sink.hbase.HBaseSink a1.sinks.hbaseS.table = weblogs #HBase表的列族名稱 a1.sinks.hbaseS.columnFamily = info a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer #正則比對新聞資料去到對應的列族下的對應列(xxxx|xxxx|xxxx|xxxx|xxxx) a1.sinks.hbaseS.serializer.regex = ^(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*) a1.sinks.hbaseS.serializer.colNames =datatime,url,docno,contenttitle,content #a1.sinks.hbaseS.serializer.rowKeyIndex = 0 # 組合sink和channel a1.sinks.hbaseS.channel = hbaseC # flume + kafka a1.channels.kafkaC.type = memory a1.channels.kafkaC.capacity = 10000 a1.channels.kafkaC.transactionCapacity = 10000 a1.sinks.kafkaS.channel = kafkaC a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.kafkaS.topic = weblogs a1.sinks.kafkaS.brokerList = 192.168.25.136:9092 a1.sinks.kafkaS.zookeeperConnect = 192.168.25.136:2181 a1.sinks.kafkaS.requiredAcks = 1 a1.sinks.kafkaS.batchSize = 20 a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder三 :Kafka+Spark Streaming +MySql 實時計算分析
1、資料庫連接配接池編寫(Java原生版 + Scala c3p0版)
附:MySql部署在Liunx,連接配接資訊配置在db.properties
Java原生版:
package cn.yuysy.hotnews.realtime.db; import java.io.FileInputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedList; import java.util.Properties; * 資料庫連接配接池 * Created on 2018-11-15 * @author @author tangzhi mail: public class ConnectionPool { private static LinkedList<Connection> connectionQueue; private static Properties prop ; * 驅動類 static { prop = new Properties(); prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\db\\db.properties"))); Class.forName(prop.getProperty("driverName").toString()); * 擷取連接配接對象 public synchronized Connection getConnection () { if (connectionQueue == null || connectionQueue.size() == 0) { connectionQueue = new LinkedList<Connection>(); for (int i = 0;i < 5;i ++) { try { Connection connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("username").toString(), prop.getProperty("password").toString()); connectionQueue.add(connection); } catch (SQLException e) { e.printStackTrace(); return connectionQueue.poll(); * 歸還連接配接至連接配接池 public void returnConnection(Connection connection) { connectionQueue.add(connection);Scala c3p0版:
package cn.yuysy.hotnews.realtime.db .{File, FileInputStream, InputStream} import java.sql.Connection import java.util.Properties import com.mchange.v2.c3p0.ComboPooledDataSource import org.apache.spark.SparkFiles * C3P0資料庫連接配接池 * Created on 2018-11-15 * @author tangzhi mail: */ class c3p0ConnectionPool(isLocal: Boolean) extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val prop = new Properties() private var in: InputStream = _ isLocal match { case true => in = getClass.getResourceAsStream("db.properties") //本地IDEA模式 case false => in = new FileInputStream(new File(SparkFiles.get("db.properties"))) //Liunx spark叢集模式 } /** * 注冊連接配接 * @return */ try { prop.load(in); cpds.setJdbcUrl(prop.getProperty("url").toString()) cpds.setDriverClass(prop.getProperty("driverName").toString()) cpds.setUser(prop.getProperty("username").toString()) cpds.setPassword(prop.getProperty("password").toString()) cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString())); cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString())); cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString())) } catch { case ex: Exception => ex.printStackTrace() def getConnection: Connection={ try { cpds.getConnection() } catch { case ex: Exception => ex.printStackTrace() null } object c3p0ConnectionPool{ var connectionPool: c3p0ConnectionPool = _ def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = { synchronized { if (connectionPool == null) { connectionPool = new c3p0ConnectionPool(isLocal) } } connectionPoolQ1:本地運作spark streaming 程式讀取kafka資料報錯:
Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute at scala
A1:spark streaming 算子運作沒有觸發Action 以下為常見action
Q2:spark streamign 實時計算處理後的資料寫入MySqL亂碼
A2:在資料庫連接配接配置檔案的URL後添加: ?useUnicode=true&characterEncoding=utf8即可
2、實時分析思路 + 部分代碼
實時分析思路:
從kafka讀取資料後(_._2)----->新聞資料------>先将value映射為Map[String,String]----->切割、根據key分組、聚合----->根據key值寫sql------>寫入MySql成功
部分代碼:
package cn.yuysy.hotnews.realtime .{File, FileInputStream} import java.sql.{Connection, Statement} import cn.yuysy.hotnews.realtime.db.c3p0ConnectionPool import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} * spark streaming從kafka擷取資料進行實時計算然後下沉至MySql object HotnewsRealTimeStreaming { val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) } } def main(args: Array[String]): Unit = { // 設定日志列印級别 LoggerLevels.setStreamingLogLevels() val prop = new Properties() prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\scala\\cn\\yuysy\\hotnews\\realtime\\sparkstreaming-kafka.properties"))) val sparkConf = new SparkConf().setAppName("HotnewsRealTimeStreaming").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(3)) //檢查點檔案存放位址 ssc.checkpoint("d://ck") // 擷取kafka主題(多個kafka主題) val topicMap = prop.getProperty("topics").split(",").map((_, prop.getProperty("numThreads").toInt)).toMap // 從kafka定時批量擷取新聞資料 val line = KafkaUtils.createStream(ssc,prop.getProperty("zkQuorum"),prop.getProperty("group"),topicMap,StorageLevel.MEMORY_AND_DISK_SER).map(_._2) * spark 實時從kafka上拉取資料存入MySql資料庫 */ // 構造連接配接、結果集 var conn: Connection = null var smt: Statement = null // 1.将kafka value即新聞資料存入Map[String,String] ---> ["datetime","2018-11-18"] def valueSplit(value: String): Map[String,String] = { val x = value.split("\\|") var valueMap: Map[String,String] = Map() x.foreach(kvs => { val kv = kvs.split("=") if (kv != null && kv.length == 2) { valueMap += (kv(0) -> kv(1)) }) valueMap * spark streaming 實時計算分析 寫入MySql資料庫 line.flatMap(_.split("\\|")).map(valueSplit).map(x => { (x.getOrElse("contenttitle",null),1) }).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true).map(data => { if (data._1 != null) { conn = c3p0ConnectionPool.getc3p0ConnectionPool(true).getConnection conn.setAutoCommit(false) smt = conn.createStatement() val updateSql = "UPDATE testweblogs SET count = '"+data._2+"' where contenttitle = '" + data._1 + "'" // val insertSql = "INSERT INTO testweblogs(contenttitle,count) VALUES ('" + data._1 + "','" + data._2 + "')" smt.execute(updateSql) conn.commit() } catch { case ex => ex.printStackTrace() } finally { if (smt != null) { smt.close() } if (conn != null) { conn.close() } }).print() ssc.start() ssc.awaitTermination() object c3p0ConnectionPool{ var connectionPool: c3p0ConnectionPool = _ def getc3p0ConnectionPool(isLocal: Boolean): c3p0ConnectionPool = { synchronized { if (connectionPool == null) { connectionPool = new c3p0ConnectionPool(isLocal) connectionPool四 :Kafka+HBase+Hive(Hadoop) 離線計算分析
1、Kafka對接Hbase已經完成
2、HBase對接Hive
處理思路:在Hive建立外部表weblogs連接配接HBase對應表weblogs
CREATE EXTERNAL TABLE weblogs(`id` string , `datatime` string , `url` string, `docno` string, `contenttitle` string,`content` string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' //指定存儲處理器 WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:datatime,info:url,info:docno,info:contenttitle,info:content") //聲明列族,列名 TBLPROPERTIES ("hbase.table.name" = "weblogs", "hbase.mapred.output.outputtable" = "weblogs"); //hbase.table.name聲明HBase表名,為可選屬性預設與Hive的表名相同, hbase.mapred.output.outputtable指定插入資料時寫入的表,如果以後需要往表插入資料就需要指定該值離線分析思路:
資料高速存入HBase---->通過外部表同步至Hive------>書寫SQL完成資料離線統計---->選擇時間間隔存入MySql----->前台展示
附:IDE遠端通路Hive必須開啟hiveserver2服務不然會出現連接配接拒絕錯誤
啟動指令:./hive --service hiveserver2
package cn.yusys.hotnews.offline; import cn.yusys.hotnews.dataconnection.ConnectionPool; import java.sql.*; * @ClassName HotnewsOfflineStreaming * @Description 從Hive讀取資料存入MySql * @Author tangzhi mail: * * Created on 2018-11-20 **/ public class HotnewsOfflineStreaming { public static void main (String[] args) throws IOException { Connection connection = null ,connection1 = null; Statement statement = null ,statement1 = null; Properties prop = new Properties(); prop.load(new FileInputStream(new File("C:\\Users\\Administrator\\Hot_News\\src\\main\\java\\cn\\yusys\\hotnews\\offline\\db.properties"))); /** * 1.從Hive讀取資料 */ connection = DriverManager.getConnection(prop.getProperty("url").toString(), prop.getProperty("user").toString(), prop.getProperty("password").toString()); connection.setAutoCommit(false); statement = connection.createStatement(); String hiveSql = "select count(1) from weblogs"; ResultSet resultSet = statement.executeQuery(hiveSql); while (resultSet.next()) { // 根據項目需求書寫Sql int count = resultSet.getInt(1); * 統計後的資料 ConnectionPool connPool = new ConnectionPool(); connection1 = connPool.getConnection(); connection1.setAutoCommit(false); statement1 = connection1.createStatement(); String sql = "INSERT INTO testweblogs2(count) VALUE('"+count+"')"; boolean execute = statement1.execute(sql); connection1.commit(); if (!execute) { // excute:false ....>插入或更新成功 true:查詢成功 System.out.println("離線分析資料庫更新成功"); }else { System.out.println("離線分析資料庫更新失敗"); // 釋放資源 if (connection != null && connection1 != null && statement != null && statement1 != null) { statement.close(); statement1.close(); connection.close(); connection1.close();至此大資料分析處理部分已經結束接下來即是建構JavaEE工程進行前端WEB展示
五 :JavaEE工程前端展示(SSM)
1、SSM工程搭建(略)
2、采用WebSocket全雙工通信方式使前段與後端實作實時交換
Q1:測試SSM架構搭建時出現資料無法加載問題
A1:1.當配置檔案中沒有指定mapper.xml具體路徑時應把Mapper類與Mapper.xml檔案放在同一包下
2.也有可能spring無法加載到mapper.xml檔案可在項目pom.xml中添加
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
<directory>src/main/resources</directory>
</resources>
Q1:通過WebSocke通信時報錯:
A1:1.若是在MVC模式下确認類上是否有@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)注解
2.伺服器版本要求:
3.配置檔案xml應當是如下配置
其中 <scope>provided</scope>沒寫則會造成runtime 沖突 也會引起404 not found
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
#部分主要代碼
package service.impl;
import com.alibaba.fastjson.JSON;
import entity.Weblog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.server.standard.SpringConfigurator;
import service.WeblogService;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
* @ClassName WebSocket
* @Description 實作前端與資料庫全雙工通信
* @Author Administrator
* @Version 1.0
@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)
public class WebSocket {
@Autowired
private WeblogService weblogService;
@OnMessage
public void onMessage(String message, Session session) throws IOException, InterruptedException {
String[] titleNames = new String[10];
Long[] titleCounts = new Long[10];
Long[] titleSum = new Long[1];
while (true) {
Map<String,Object> map = new HashMap<String, Object>();
List<Weblog> weblogList = weblogService.webcount();
System.out.println(weblogList);
for (int i = 0;i<weblogList.size();i++) {
titleNames[i] = weblogList.get(i).getContenttitle();
titleCounts[i] = weblogList.get(i).getCount();
}
titleSum[0] = weblogService.websum();
map.put("titleName", titleNames);
map.put("titleCount", titleCounts);
map.put("titleSum", titleSum);
System.out.print(map);
session.getBasicRemote().sendText(JSON.toJSONString(map));
Thread.sleep(1000);
map.clear();
@OnOpen
public void onOpen() {
System.out.println("Client connected");
@OnClose
public void onClose() {
System.out.println("Connection closed");
至此前端展示工程也已經完成,也就意味着項目一期完成
注:原項目貢獻者阿裡雲雲栖社群位址:
https://yq.aliyun.com/articles/557454