天天看點

scala/java es連接配接工具

import java.io.IOException
import java.util

import com.google.gson.{Gson, JsonParser}
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.http.HttpHost
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.bulk.{BulkRequest, BulkResponse}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client._
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentFactory, XContentType}
import org.slf4j
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.parsing.json._
import collection.JavaConverters._

class ElasticSearchUtil private extends Serializable {
  @transient lazy val logger: slf4j.Logger = LoggerFactory.getLogger(this.getClass.getName)
  //(Array(host),port)
  private val conf: (Array[String], String) = EnvUtil.getEsConf
  var client: RestHighLevelClient = null
  var myes: Settings = null

  def getClient: RestHighLevelClient = {
    myes = Settings.builder.put("cluster.name", "es-yunsom-dev").build
    val clientBuilder = RestClient
      .builder(new HttpHost(
        conf._1(0),
        conf._2.toInt,
        "http"
      )
      ).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
      override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder) = {
        httpClientBuilder
      }
    })
    client = new RestHighLevelClient(clientBuilder)
    if (client != null) println("成功擷取用戶端")
    client
  }

  def close(client: RestHighLevelClient): Unit = {
    if (client != null) client.close()
    println("Client已關閉。。。")
  }

  def insert_one(action_list: List[Map[String, Any]]) = {
    val insert_data: Map[String, Any] = action_list.head
    val index_name: String = insert_data.getOrElse("index", "").toString
    val id: String = insert_data.getOrElse("id", "").toString
    val client: RestHighLevelClient = this.getClient
    val request: IndexRequest = new IndexRequest(index_name).id(id).source(XContentType.JSON, toArgs(insert_data): _*)
    val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
  }

  def insert_many(action_list: List[Map[String, Any]]) = {
    val client: RestHighLevelClient = this.getClient
    val batch_num = 5000
    val insertCount = action_list.length
    val insertBatch = (insertCount / batch_num) + 1
    for (index <- 0 until insertBatch) {
      val start = index * batch_num
      var end = (index + 1) * batch_num
      val insert_data_list = ListBuffer[Map[String, Any]]()
      if (insertCount <= end) {
        end = insertCount
      }
      for (x <- start until end) {
        insert_data_list.append(action_list(x))
      }
      val bulkRequest = new BulkRequest
      if (insert_data_list.nonEmpty) {
        for (x <- 0 until insert_data_list.length) {
          val data: Map[String, Any] = insert_data_list(x)
          val index_name: String = data.getOrElse("index", "").toString
          val id: String = data.getOrElse("id", "").toString
          val builder: XContentBuilder = XContentFactory.jsonBuilder
          builder.startObject()
          builder.field("index", index_name)
          builder.field("id", id)
            val builder1: XContentBuilder = XContentFactory.jsonBuilder
            builder1.startObject()
              data.getOrElse("source","").asInstanceOf[mutable.Map[String,Any]].foreach{
                case (key, value) => {
                  if(value.isInstanceOf[ListBuffer[Map[String,Any]]]){
                      val arrayList: util.ArrayList[java.util.Map[String,Any]] = new util.ArrayList()
                      value.asInstanceOf[ListBuffer[Map[String, Any]]].foreach {
                        case map: Map[String, Any] => {
                          arrayList.add(map.asJava)
                          map.values.foreach {
                            case map: Map[String, Any] => map.asJava
                          }
                        }
                      }
                    builder1.field(key,arrayList)
                  }else{
                    builder1.field(key,value)
                  }
                }
              }
            builder1.endObject()
          builder.field("source", builder1)
          builder.endObject()
          bulkRequest.add(new IndexRequest(index_name).id(id).source(builder))
        }
        client.bulk(bulkRequest, RequestOptions.DEFAULT)
      }
    }
    this.close(client)
  }

  def toArgs(data: Map[String, Any]) = {
    val args = ListBuffer[Object]()
    if (data.isEmpty) {
      throw new RuntimeException("批量操作資料不可為空 !")
    }
    for (elem <- data) {
      args.append(elem._1.toString)
      args.append(elem._2.asInstanceOf[Object])
    }
    args.toArray
  }
}
object ElasticSearchUtil {
  val elasticSearchUtil = new ElasticSearchUtil
  def apply(): ElasticSearchUtil = {
    elasticSearchUtil
  }
}