天天看點

es搜尋引擎

1、es簡介

2、es優缺點

3、es使用

4、es可以解決的問題

5、es舉例

6、es執行結果截圖

7、es資料增量方案

8、使用es搜尋

  一、es簡介

    es是一個是一個實時的分布式搜尋和分析引擎。它可以幫助你用前所未有的速度去處理大規模資料。

    它可以用于全文搜尋,結構化搜尋以及分析,當然你也可以将這三者進行組合。

    es是一個建立在全文搜尋引擎 Apache Lucene™ 基礎上的搜尋引擎,可以說Lucene是當今最先進,最高效的全功能開源搜尋引擎架構。

    es使用Lucene作為内部引擎,但是在使用它做全文搜尋時,隻需要使用統一開發好的API即可,而不需要了解其背後複雜的Lucene的運作原理。      

    es除了做全索引外,還可以做如下工作:

    分布式實時檔案存儲,并将每一個字段都編入索引,使其可以被搜尋。

    實時分析的分布式搜尋引擎。

    可以擴充到上百台伺服器,處理PB級别的結構化或非結構化資料。

    以上功能可以通過你喜歡的程式設計語言或用戶端與es的restful api進行通訊。

  二、概念說了半天了,說下優缺點吧

    優點:

    1、es是分布式的,不需要其它元件,分發是實時由es主節點内部自動完成的。

    2、處理多組使用者,而不需特殊配置。

    3、es擦用gateway的概念,(gateway:網關是網絡連接配接裝置的重要組成部分,它不僅具有路由的功能,而且能在兩個不同的協定集之間進行轉換,進而使不同的網絡之間進行互聯。例如:一個Netware區域網路通過網關可以通路IBM的SNA網絡,這樣使用IPX協定的PC就可和SNA網絡上的IBM主機進行通信。)是得備份更簡單。

    4、es節點發生故障時,可以進行自動配置設定其它節點替代。

    缺點:

    1、文檔太少,不易維護。

    2、目前覺得,建索引的速度不夠快,期待有更好的方法。

  三、es使用(包括建立和搜尋以及關閉)        

es的擷取和關閉方法:

package com.elasticsearch.config;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.lang3.StringUtils;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.ImmutableSettings;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.xcontent.XContentBuilder;

/**

* 初始化連接配接es服務端,這裡相當于dao層..

* @author:jackkang

* 2013-1-12 下午11:27:37

*/

public class InitES {

static Log log = LogFactory.getLog(InitES.class);

/**

* 靜态,單例...

*/

private static TransportClient client;

public static TransportClient initESClient() {

try {

if (client == null) {

// 配置你的es,現在這裡隻配置了叢集的名,預設是elasticsearch,跟伺服器的相同

Settings settings = ImmutableSettings

.settingsBuilder()

.put("cluster.name", "elasticsearch")

.put("discovery.type", "zen")//發現叢集方式

.put("discovery.zen.minimum_master_nodes", 2)//最少有2個master存在

.put("discovery.zen.ping_timeout", "200ms")//叢集ping時間,太小可能會因為網絡通信而導緻不能發現叢集

.put("discovery.initial_state_timeout", "500ms")

.put("gateway.type", "local")//(fs, none, local)

.put("index.number_of_shards", 1)

.put("action.auto_create_index", false)//配置是否自動建立索引

.put("cluster.routing.schedule", "50ms")//發現新節點時間 

.build();

// 從屬性檔案中擷取搜尋伺服器相對域位址

String transportAddresses = Config.getProperty(

"transportAddresses", "");

// 叢集位址配置

List<InetSocketTransportAddress> list = new ArrayList<InetSocketTransportAddress>();

if (StringUtils.isNotEmpty(transportAddresses)) {

String[] strArr = transportAddresses.split(",");

for (String str : strArr) {

String[] addressAndPort = str.split(":");

String address = addressAndPort[0];

int port = Integer.valueOf(addressAndPort[1]);

InetSocketTransportAddress inetSocketTransportAddress = new InetSocketTransportAddress(

address, port);

list.add(inetSocketTransportAddress);

}

}

// 這裡可以同時連接配接叢集的伺服器,可以多個,并且連接配接服務是可通路的

InetSocketTransportAddress addressList[] = (InetSocketTransportAddress[]) list

.toArray(new InetSocketTransportAddress[list.size()]);

// Object addressList[]=(Object [])list.toArray();

client = new TransportClient(settings)

.addTransportAddresses(addressList);

// 這裡可以同時連接配接叢集的伺服器,可以多個,并且連接配接服務是可通路的 192.168.1.102

// client = new TransportClient(settings).addTransportAddresses(

// new InetSocketTransportAddress("192.168.1.103", 9300));

//

// Client client = new TransportClient()

// .addTransportAddress(new

// InetSocketTransportAddress("192.168.0.149", 9300))

// .addTransportAddress(new

// InetSocketTransportAddress("192.168.0.162", 9300));

// 改變shards數目:

/*client.admin().indices().prepareUpdateSettings("test")

.setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_replicas", 2)).execute().actionGet();*/

}

} catch (Exception e) {

// if (log.isDebugEnabled()) {

// log.debug("方法AppCommentAction-deleteAppComment,參數資訊:commentid" );

// }

log.error("擷取用戶端對象異常:" + e.getMessage());

}

return client;

}

public static void closeESClient() {

if (client != null) {

client.close();

}

}

}

搜尋:

package com.elasticsearch.action;

import java.io.UnsupportedEncodingException;

import java.net.MalformedURLException;

import java.util.ArrayList;

import java.util.Calendar;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.regex.Pattern;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.search.SearchHit;

import org.elasticsearch.search.SearchHits;

import com.elasticsearch.config.ElasticsearchUtil;

import com.elasticsearch.pojo.Pager;

import com.opensymphony.xwork2.ActionSupport;

public class SearchAction extends ActionSupport {

private static final long serialVersionUID = 1L;

/** 關鍵字 **/

private String wd;

/** 消耗時間 **/

private double spendTime;

/** 查詢結果集對象 **/

private List<Map<String, Object>> pageList = new ArrayList<Map<String, Object>>();

/** 分頁對象 **/

private Pager pager;

/** 總記錄數 使用靜态變量的方式緩存 **/

private Long total;

private SearchResponse response;

/**

* 條件檢索action

* @throws MalformedURLException

* @throws SolrServerException

* @throws UnsupportedEncodingException

**/

public String search() throws MalformedURLException,

UnsupportedEncodingException {

/** 檢索開始時間 **/

long startTime = System.currentTimeMillis();

/** 擷取頁面封裝好的分頁對象 **/

if (pager == null) {

pager = new Pager();

pager.setMaxPageItems(10);

}

wd = new String(wd.getBytes("ISO-8859-1"), "UTF-8"); // 解決亂碼

pager.setDefaultMaxPageItems(1);

/**高亮字段**/

String[] highFields=new String[]{"content","title"};

response = ElasticsearchUtil.searcher("medcl", "news",

pager.getOffset(), pager.getMaxPageItems(), wd,highFields);

/** 總記錄數 **/

total = response.getHits().totalHits();

System.out.println("命中總數:" + total);

SearchHits searchHits = response.getHits();

SearchHit[] hits = searchHits.getHits();

for (int i = 0; i < hits.length; i++) {

Map<String, Object> map = new HashMap<String, Object>();

SearchHit hit = hits[i];

String id=hit.getId();

String content = ElasticsearchUtil.getHighlightFields(hit,"content");

String title = ElasticsearchUtil.getHighlightFields(hit,"title");

map.put("id", hit.getSource().get("id"));

map.put("content", content);

map.put("title", title);

map.put("create_time", hit.getSource().get("create_time"));

map.put("links", hit.getSource().get("link"));

pageList.add(map);

}

/** 檢索完成時間 **/

long endTime = System.currentTimeMillis();

/** 檢索花費時間 **/

//spendTime = (double) (endTime - startTime) / 1000;

Calendar c = Calendar.getInstance(); 

c.setTimeInMillis(endTime - startTime); 

spendTime = c.get(Calendar.MILLISECOND);

return SUCCESS;

}

public static String Html2Text(String inputString) {

String htmlStr = inputString; // 含html标簽的字元串

String textStr = "";

java.util.regex.Pattern p_script;

java.util.regex.Matcher m_script;

java.util.regex.Pattern p_style;

java.util.regex.Matcher m_style;

java.util.regex.Pattern p_html;

java.util.regex.Matcher m_html;

try {

String regEx_script = "<[\\s]*?script[^>]*?>[\\s\\S]*?<[\\s]*?\\/[\\s]*?script[\\s]*?>"; // 定義script的正規表達式{或<script[^>]*?>[\\s\\S]*?<\\/script>

// }

String regEx_style = "<[\\s]*?style[^>]*?>[\\s\\S]*?<[\\s]*?\\/[\\s]*?style[\\s]*?>"; // 定義style的正規表達式{或<style[^>]*?>[\\s\\S]*?<\\/style>

// }

String regEx_html = "<[^>]+>"; // 定義HTML标簽的正規表達式

p_script = Pattern.compile(regEx_script, Pattern.CASE_INSENSITIVE);

m_script = p_script.matcher(htmlStr);

htmlStr = m_script.replaceAll(""); // 過濾script标簽

p_style = Pattern.compile(regEx_style, Pattern.CASE_INSENSITIVE);

m_style = p_style.matcher(htmlStr);

htmlStr = m_style.replaceAll(""); // 過濾style标簽

p_html = Pattern.compile(regEx_html, Pattern.CASE_INSENSITIVE);

m_html = p_html.matcher(htmlStr);

htmlStr = m_html.replaceAll(""); // 過濾html标簽

textStr = htmlStr;

} catch (Exception e) {

System.err.println("Html2Text: " + e.getMessage());

}

return textStr;// 傳回文本字元串

}

public String getWd() {

return wd;

}

public void setWd(String wd) {

this.wd = wd;

}

public double getSpendTime() {

return spendTime;

}

public void setSpendTime(double spendTime) {

this.spendTime = spendTime;

}

public List<Map<String, Object>> getPageList() {

return pageList;

}

public void setPageList(List<Map<String, Object>> pageList) {

this.pageList = pageList;

}

public Pager getPager() {

return pager;

}

public void setPager(Pager pager) {

this.pager = pager;

}

public Long getTotal() {

return total;

}

public void setTotal(Long total) {

this.total = total;

}

}

  四、可以決絕基金的問題

    随着基金系統的逐漸完善,資料量的增加,使用es可以緩解檢索資料給資料庫帶來的壓力。

    比如,基金中報帳的病例清單,報帳記錄,消費記錄等

  五、es舉例

    es使用java連結,建立mapping,儲存資料

    demo的javabean

package com.elasticsearch;

import com.google.common.collect.Lists;

import com.util.date.Joda_Time;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import java.io.IOException;

import java.util.Date;

import java.util.List;

/**

 * javabean

 */

public class User {

    private String name;

    private String home;//家鄉

    private double height;//身高

    private int age;

    private Date birthday;

    public User() {

    }

    public User(String name, String home, double height, int age, Date birthday) {

        this.name = name;

        this.home = home;

        this.height = height;

        this.age = age;

        this.birthday = birthday;

    }

    /**

     * 随機生成一個使用者資訊

     *

     * @return

     */

    public static User getOneRandomUser() {

        return new User("葫蘆" + (int) (Math.random() * 1000) + "娃", "山西省太原市" + (int) (Math.random() * 1000) + "街道", (Math.random() * 1000), (int) (Math.random() * 100), new Date(System.currentTimeMillis() - (long) (Math.random() * 100000)));

    }

    /**

     * 随機生成num個使用者資訊

     *

     * @param num 生成數量

     * @return

     */

    public static List<User> getRandomUsers(int num) {

        List<User> users = Lists.newArrayList();

        if (num < 0) num = 10;

        for (int i = 0; i < num; i++) {

            users.add(new User("葫蘆" + (int) (Math.random() * 1000) + "娃", "山西省太原市" + (int) (Math.random() * 1000) + "街道", (Math.random() * 1000), (int) (Math.random() * 100), new Date(System.currentTimeMillis() - (long) (Math.random() * 100000))));

        }

        return users;

    }

    /**

     * 封裝對象的Json資訊

     *

     * @param user

     * @return

     * @throws IOException

     */

    public static XContentBuilder getXContentBuilder(User user) throws IOException {

        return XContentFactory.jsonBuilder()

                .startObject()

                .field("name", user.getName())//該字段在上面的方法中mapping定義了,是以該字段就有了自定義的屬性,比如 age等

                .field("home", user.getHome())

                .field("height", user.getHeight())

                .field("age", user.getAge())

                .field("birthday", user.getBirthday())

                .field("state", "預設屬性,mapping中沒有定義")//該字段在上面方法中的mapping中沒有定義,是以該字段的屬性使用es預設的.

                .endObject();

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public String getHome() {

        return home;

    }

    public void setHome(String home) {

        this.home = home;

    }

    public double getHeight() {

        return height;

    }

    public void setHeight(double height) {

        this.height = height;

    }

    public int getAge() {

        return age;

    }

    public void setAge(int age) {

        this.age = age;

    }

    public Date getBirthday() {

        return birthday;

    }

    public void setBirthday(Date birthday) {

        this.birthday = birthday;

    }

}

  2、java與es互動demo

package com.framework_technology.elasticsearch;

import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;

import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder;

import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;

import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;

import org.elasticsearch.action.admin.indices.stats.IndexStats;

import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexResponse;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

 * mapping建立

 * 添加記錄到es

 */

public class Es_BuildIndex {

    /**

     * 索引的mapping

     * <p>

     * 預定義一個索引的mapping,使用mapping的好處是可以個性的設定某個字段等的屬性

     * Es_Setting.INDEX_DEMO_01類似于資料庫

     * mapping 類似于預設某個表的字段類型

     * <p>

     * Mapping,就是對索引庫中索引的字段名及其資料類型進行定義,類似于關系資料庫中表建立時要定義字段名及其資料類型那樣,

     * 不過es的 mapping比資料庫靈活很多,它可以動态添加字段。

     * 一般不需要要指定mapping都可以,因為es會自動根據資料格式定義它的類型,

     * 如果你需要對某 些字段添加特殊屬性(如:定義使用其它分詞器、是否分詞、是否存儲等),就必須手動添加mapping。

     * 有兩種添加mapping的方法,一種是定義在配 置檔案中,一種是運作時手動送出mapping,兩種選一種就行了。

     *

     * @throws Exception Exception

     */

    protected static void buildIndexMapping() throws Exception {

        Map<String, Object> settings = new HashMap<>();

        settings.put("number_of_shards", 4);//分片數量

        settings.put("number_of_replicas", 0);//複制數量

        settings.put("refresh_interval", "10s");//重新整理時間

        //在本例中主要得注意,ttl及timestamp如何用java ,這些字段的具體含義,請去到es官網檢視

        CreateIndexRequestBuilder cib = Es_Utils.client.admin().indices().prepareCreate(Es_Utils.LOGSTASH_YYYY_MM_DD);

        cib.setSettings(settings);

        XContentBuilder mapping = XContentFactory.jsonBuilder()

                .startObject()

                .startObject("we3r")//

                .startObject("_ttl")//有了這個設定,就等于在這個給索引的記錄增加了失效時間,

                        //ttl的使用地方如在分布式下,web系統使用者登入狀态的維護.

                .field("enabled", true)//預設的false的

                .field("default", "5m")//預設的失效時間,d/h/m/s 即天/小時/分鐘/秒

                .field("store", "yes")

                .field("index", "not_analyzed")

                .endObject()

                .startObject("_timestamp")//這個字段為時間戳字段.即你添加一條索引記錄後,自動給該記錄增加個時間字段(記錄的建立時間),搜尋中可以直接搜尋該字段.

                .field("enabled", true)

                .field("store", "no")

                .field("index", "not_analyzed")

                .endObject()

                        //properties下定義的name等等就是屬于我們需要的自定義字段了,相當于資料庫中的表字段 ,此處相當于建立資料庫表

                .startObject("properties")

                .startObject("@timestamp").field("type", "long").endObject()

                .startObject("name").field("type", "string").field("store", "yes").endObject()

                .startObject("home").field("type", "string").field("index", "not_analyzed").endObject()

                .startObject("now_home").field("type", "string").field("index", "not_analyzed").endObject()

                .startObject("height").field("type", "double").endObject()

                .startObject("age").field("type", "integer").endObject()

                .startObject("birthday").field("type", "date").field("format", "YYYY-MM-dd").endObject()

                .startObject("isRealMen").field("type", "boolean").endObject()

                .startObject("location").field("lat", "double").field("lon", "double").endObject()

                .endObject()

                .endObject()

                .endObject();

        cib.addMapping(Es_Utils.LOGSTASH_YYYY_MM_DD_MAPPING, mapping);

        cib.execute().actionGet();

    }

    /**

     * 給 []index 建立别名

     * 重載方法可以按照過濾器或者Query 作為一個别名

     *

     * @param aliases aliases别名

     * @param indices 多個 index

     * @return 是否完成

     */

    protected static boolean createAliases(String aliases, String... indices) {

        IndicesAliasesRequestBuilder builder = Es_Utils.client.admin().indices().prepareAliases();

        return builder.addAlias(indices, aliases).execute().isDone();

    }

    /**

     * 查詢此别名是否存在

     *

     * @param aliases aliases

     * @return 是否存在

     */

    protected static boolean aliasesExist(String... aliases) {

        AliasesExistRequestBuilder builder =

                Es_Utils.client.admin().indices().prepareAliasesExist(aliases);

        AliasesExistResponse response = builder.execute().actionGet();

        return response.isExists();

    }

    /**

     * 添加記錄到es

     * <p>

     * 增加索引記錄

     *

     * @param user 添加的記錄

     * @throws Exception Exception

     */

    protected static void buildIndex(User user) throws Exception {

        // INDEX_DEMO_01_MAPPING為上個方法中定義的索引,prindextype為類型.jk8231為id,以此可以代替memchche來進行資料的緩存

        IndexResponse response = Es_Utils.client.prepareIndex(Es_Utils.LOGSTASH_YYYY_MM_DD, Es_Utils.LOGSTASH_YYYY_MM_DD_MAPPING)

                .setSource(

                        User.getXContentBuilder(user)

                )

                .setTTL(8000)//這樣就等于單獨設定了該條記錄的失效時間,機關是毫秒,必須在mapping中打開_ttl的設定開關

                .execute()

                .actionGet();

    }

    /**

     * 批量添加記錄到索引

     *

     * @param userList 批量添加資料

     * @throws java.io.IOException IOException

     */

    protected static void buildBulkIndex(List<User> userList) throws IOException {

        BulkRequestBuilder bulkRequest = Es_Utils.client.prepareBulk();

        // either use Es_Setting.client#prepare, or use Requests# to directly build index/delete requests

        for (User user : userList) {

            //通過add批量添加

            bulkRequest.add(Es_Utils.client.prepareIndex(Es_Utils.LOGSTASH_YYYY_MM_DD, Es_Utils.LOGSTASH_YYYY_MM_DD_MAPPING)

                            .setSource(

                                    User.getXContentBuilder(user)

                            )

            );

        }

        BulkResponse bulkResponse = bulkRequest.execute().actionGet();

        //如果失敗

        if (bulkResponse.hasFailures()) {

            // process failures by iterating through each bulk response item

            System.out.println("buildFailureMessage:" + bulkResponse.buildFailureMessage());

        }

    }

}

資料檢視、

  通過第三方工具檢視

六、執行時間

七、es資料增量方案

  1、定時任務輪訓,bulk方式操作

    批量添加操作:(可以把以前的資料導進來)

   /**

     * 批量添加記錄到索引

     *

     * @param userList 批量添加資料

     * @throws java.io.IOException IOException

     */

    protected static void buildBulkIndex(List<User> userList) throws IOException {

        BulkRequestBuilder bulkRequest = InitES.buildClient().prepareBulk();

        // either use Es_Setting.client#prepare, or use Requests# to directly build index/delete requests

        for (User user : userList) {

            //通過add批量添加

            bulkRequest.add(InitES.buildClient().prepareIndex(LOGSTASH_YYYY_MM_DD, LOGSTASH_YYYY_MM_DD_MAPPING)

                            .setSource(

                                    User.getXContentBuilder(user)

                            )

            );

        }

        BulkResponse bulkResponse = bulkRequest.execute().actionGet();

        //如果失敗

        if (bulkResponse.hasFailures()) {

            // process failures by iterating through each bulk response item

            System.out.println("buildFailureMessage:" + bulkResponse.buildFailureMessage());

        }

    }

  2、用隊列做資料同步,異步的方式,生産一條,放在隊列裡去消費一條。

   增量使用隊列,當新增一條記錄時往es中添加一條記錄

   protected static void buildIndex(User user) throws Exception {

        // INDEX_DEMO_01_MAPPING為上個方法中定義的索引,prindextype為類型.jk8231為id,以此可以代替memchche來進行資料的緩存

        IndexResponse response = InitES.buildClient().prepareIndex(LOGSTASH_YYYY_MM_DD, LOGSTASH_YYYY_MM_DD_MAPPING)

                .setSource(

                        User.getXContentBuilder(user)

                )

                .setTTL(8000)//這樣就等于單獨設定了該條記錄的失效時間,機關是毫秒,必須在mapping中打開_ttl的設定開關

                .execute()

                .actionGet();

    }

  es中的QueryBuilders的termQuery查詢,

     1. 若value為漢字,則大部分情況下,隻能為一個漢字;

        2. 若value為英文,則是一個單詞;

    queryString支援多個中文查詢