天天看点

小米Elasticsearch 服务化实践

转载自 :小米运维(公众号 ID:MI-SRE)

摘要

使用 Elasticsearch 做日志检索、分析服务已成为当前互联网公司首选工具之一了。那么怎么与公司内部系统(如数据采集、数据 schema 管理、数据权限等系统)打通,怎么提供一套易用的内部 Elasticsearch 系统便成为了首先需要解决的问题。本文将介绍小米内部 Elasticsearch 服务化之路的演进。

数据链路图

首先给大家介绍一下我米的数据链路图(老版)

小米Elasticsearch 服务化实践

简单划分了四层,数据采集、数据控制与传输、数据存储与消息队列、数据应用层。

整个数据链路中,由数据工场负责源数据注册、schema 定义,权限控制。数据注册后会更新 scribe 白名单(刷新 scribe 配置),创建 hdfs 目录,hive 表创建、授权等。还有一个重要功能就是数据 schema 定义。包括 schema 序列化与反序列化协议,字段定义等等。业务方在数据工场注册好数据之后就可以通过 agent 打数据了。一般常用的方式就是 xlogger。xlogger 是我们内部开发的一个 java log sdk,支持 scribe 协议,能够通过 xlogger 直接往 scribe-agent 或者 scribe-server 打数据。scribe-server 根据数据工场定义的规则,将数据写入 hdfs 或者 kafka。然后数据应用层开始通过各种工具进行分析。

怎么将 elasticsearch 服务无缝对接到公司成熟的数据链路中?

我们主要考虑以下几个因素

  1. Elasticsearch 输入源从哪里对接最合适?
  2. 怎么结合数据工场定义的 schema 去做反序列化?
  3. 怎么给数据做 ETL?
  4. 怎么方便管理和监控?

对与第 1 点,我们选择了对接 kafka 和 hdfs。其中 kafka 走实时数据摄入,hdfs 走离线数据摄入。

综合 2、3、4 点,我们只有两种方案可选,一是在现有开源组件上做二次开发,与我们内部数据工场打通,能通过数据工场定义的 schema 结构去做反序列化个 ETL。二是自研类似 logstash 的中间件。

最终我们选择了自研 kafka2es 组件(用 java 开发)。主要实现第 2 和第 3 点。当定位清楚了,那么实现起来,也不复杂。第一版做的非常简单,通过 pom 去把业务在数据工场定义的 schema 的反序列化类引入进来,实现一个通用的 ETL 类(也就是存粹把从 kafka 读出来的 byte 数组的反序列化出来,再转成 json 结构写入 elasticsearch),然后再留一个可配置的空间,让业务自己实现 ETL 逻辑,我们 kafka2es 在 elk 时通过动态加载的方式,调用业务实现的 ETL 类做处理。监控方面就是在各个环节加计数:read kafka 条数、write es 条数、write 失败条数、etl 失败条数,将点打入 falcon 做监控告警。

具体实现

我们将读 kafka 和写 es 的逻辑全部封装好(这里不详细描述了,代码都非常简单),当业务接入的时候只需要准备一个配置文件和一个 ETL 类,提到我们 kafka2es 的项目中即可。如果不需要特殊处理的数据,则只需一份配置文件即可,样例如下:

server.conf

kafka_topic_name=xxx                                #kafka topic 名称
kafka_zk_url=xxx                                    #kafka zk 地址
kafka_consumer_groupid=xxx                          #kafka 消费者 group id
decoder_class=xxx                                   #数据分序列化类
es_url=xxx                                          #elasticsearch 集群地址
es_cluster_name=es-test                             #elasticsearch 集群名称
es_index_name=xxx                                   #写入索引名称
es_index_name_suffix_format=yyyy.MM.dd              #索引日期后缀规则,如按天建索引则是yyyy.MM.dd 按月建索引则是yyyy.MM
es_index_type_name=doc                              #索引type名,统一默认使用doc
es_authtoken=xxx                                    #权限token,通过账号密码加密得到
parser_class=com.xiaomi.data.CommonParser           #ETL类 一般为业务自行实现,主要用于数据清洗和过滤
thread_num=1                                        #处理线程数
es_index_pipeline=test-pipeline                     #pipeline,如nginx日志,apache日志通过pipeline处理更为简单,默认所有data节点都开了ingest功能,master节点和client节点都不开ingest功能           

以上配置分为三个部分:kafka 相关,elasticsearch 集群相关,ETL 相关。整个 kafka2es 项目也封装了 三个部分,写读 kafka,做 ETL,写 elasticsearch。

ETL 类怎么抽象出来让业务自行实现?

public abstract class BaseParser {
/**
 * @param log byte[]
 * @return json string
 */
public abstract  String parser(byte[] log);
}           

通过定义个一个抽象类 BaseParser,抽象方法 parser,业务 ETL 类只需要基础 BaseParser,实现 parser,parser 方法的参数是从 kafka 读出来的消息体 byte[],返回值必须是 json string。

可以说是非常简单,整个数据流就这样跑通了(目前主要做了实时接入,对接 kafka)。

小米Elasticsearch 服务化实践

怎么实现 elasticsearch 的多租户权限管理?权限管理系统账号怎么与公司内部账号系统打通?

调研了几个 es 权限管理插件,如: x-pack 、search-guard、elasticsearch-http-user-auth 等。x-pack 为商用,放弃;search-guard 实现略复杂,从可维护性和二次开发上看都不太满意。elasticsearch-http-user-auth 非常非常简洁,但是只支持 http 鉴权,没有 transport 鉴权。

github 地址

search-guard:https://github.com/floragunncom/search-guard

elasticsearch-http-user-auth:https://github.com/elasticfence/elasticsearch-http-user-auth

于是我们还是决定自己写一个:es-authority-manager-mi

主要功能实现了:

  1. 索引的读写权限管理(http+transport)
  2. 账号体系与公司内部 kerberos 账号打通
  3. 所有用户行为记录

下面我简单分析一下鉴权插件的实现 (elasticsearch-5.6.2):

首先看插件的主类

public class AuthorityManagerMiPlugin extends Plugin implements ActionPlugin, NetworkPlugin {

    public AuthorityManagerMiPlugin(final Settings settings) {
        //插件初始化
    }

    //restful 接口注册, AuthorityManagerAction实现插件api接口
    @Override
    public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
                                         IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
                                         IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
        final List<RestHandler> handlers = new ArrayList<RestHandler>(1);
        if (!AMMPluginIsDisabled) {
            handlers.add(new AuthorityManagerAction(settings, restController));
        }
        return handlers;
    }

    //transport拦截器注册
    @Override
    public List<Class<? extends ActionFilter>> getActionFilters() {
        List<Class<? extends ActionFilter>> filters = new ArrayList<>(1);
        if (!AMMPluginIsDisabled) {
            filters.add(AuthorityManagerFilter.class);
        }
        return filters;
    }

    //restful拦截器注册
    @Override
    public UnaryOperator<RestHandler> getRestHandlerWrapper(final ThreadContext threadContext) {

        if (!AMMPluginIsDisabled) {
            if (AMMiRestHandler == null) {
                AMMiLogger.warn("AMMiRestHandler is null");
            }
            return (rh) -> AMMiRestHandler.wrap(rh);
        }
        return null;
    }
//.......
}           

插件 resful api 的实现:

public class AuthorityManagerAction extends BaseRestHandler {

    @Inject
    public AuthorityManagerAction(final Settings settings, final RestController controller) {
        super(settings);
        controller.registerHandler(GET, "/_authority_manager/auth", this);
        controller.registerHandler(POST, "/_authority_manager/auth", this);
    }

    @Override
    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
    }
//......
}           

transport 拦截器实现

public class AuthorityManagerFilter implements ActionFilter {

    public AuthorityManagerFilter(Settings settings, Client client, ClusterService clusterService, IndexNameExpressionResolver resolver) {
     //....
    }

    @Override
    public int order() {
        return Integer.MIN_VALUE;
    }

    @Override
    public void apply(Task task, final String action, final ActionRequest request, final ActionListener listener, final ActionFilterChain chain) {
        //权限校验
    }
}           

restful 拦截器实现

public class AuthorityManagerRestFilter {

    public AuthorityManagerRestFilter(Settings settings) {
       //....
    }
    public RestHandler wrap(RestHandler original) {
        return new RestHandler() {
            //....
        }
    }
}           

以上就是 es 权限管理插件实现的整体结构与过程,细节上大家自行看看对应继承类和实现类的源码即可。

不过上面仅仅是实现了初步的鉴权相关的功能,但是既然所有请求都能拦截到了,那么我们也可以做更进一步的控制了:

如:

  1. 查询请求的 qps 限制(search 配额、请求排队等)
  2. 查询的慢日志分析
  3. 集群高负载情况下的自我保护机制
  4. 多租户资源隔离机制等等

    以上也是我们未来规划。

kafka2es 进化版 ->es-sink

kafka2es 在用户量剧增的情况下怎么改进,怎么进一步减少接入成本以及运维成本。怎么与公司数据链路升级去公共进化?

首先来看下面新的数据链路图:

小米Elasticsearch 服务化实践

两个新名词:lcs talos

lcs 和 talos 是我米自研的两个组件,简单来说,lcs 对标 scribe,talos 对标 kafka。具体介绍可参考如下文章:

浅析什么是小米数据流服务

在这一套新的数据链路中,我们将 kafka2es 所有的功能集成到小米新版数据流中,作为 es-sink 提供服务。于是,新的架构变成了这样:

小米Elasticsearch 服务化实践

es-sink 还是继承了 kafka2es 的所有功能

  1. 关联数据工场 schema 定义,反序列化数据
  2. 自定义 ETL 处理
  3. 监控

其中 es-sink 与 kafka2es 最大的区别是通过 spark streaming 实现。而 kafka2es 还是独立的 java 项目,每次业务接入还需要业务提交配置文件与 ETL 代码,我们 review 后需要部署该服务,但是 es-sink 的实现方式可以让业务注册好索引之后,通过管理系统前端配置 es-sink,对应的 spark streaming 就起来了,还不需要关心资源问题(资源动态调整),很大程度减少了我们业务支持和运维的工作。

总结

本文主要介绍了数据摄入链路与公司数据链路整合以及索引权限管理插件的实现。可能大家也存在一些疑问,比如说:为什么我们不用社区开源的 filebeat,从客户端直接打到 elasticsearch,反而增加了好几个中间层的组件,这样导致架构复杂性的增加以及数据延时的增大?

要打造内部公共的 Elasticsearch 服务,责权分离一定要搞清楚:如果把直接写 Elasticsearch 的权限放开给业务方,带来的安全隐患是非常大的。业务的流量怎么控制?并发写入怎么控制?出错如何定位?怎么拿到业务的序列化类去反序列化数据等等。这些问题都可能是致命的。所以必须在中间加一次消息队列做缓冲。这样来做到与业务客户端的解耦。业务方只需要按照公司统一标准采集数据,写入消息队列(kafka/talos),这时候业务客户端的使命已经完成了。而我们,则负责将数据消费出来,经过业务自定义的 ETL 逻辑,写入 Elasticsearch。这个过程由我们控制,优势由以下几点:

1:数据接入 Elasticsearch 不需要改造业务现有架构(依然走公司统一数据收集方式)

2:减少 client 的并发连接;Elasticsearch 不管写入还是搜索,都是配了线程池。并发连接过多,非常容易把写入线程池打满,导致拒绝请求

3:提高写入效率;通过 transport bulk 模式实现写入,相比 filebeat 的 http 方式从写入效率上来说,有比较大的提升

4:增强可控性;当集群数据量非常大的时候,如果要对集群做升级或者重启,如果同时还在大量写数据,分片恢复的时间非常慢长。但是可以与大头业务协商,暂停写入(只需要把服务端的数据摄入停止即可,不用业务客户端操作),操作集群,恢复写入

5:集群升级完全不用业务方做适配

6:数据摄入监控很好收敛,故障定位非常明确

以上虽然表面上增加了系统的复杂度,和一定程度的写入延迟,但是都是可接受的。对于复杂度并没有过多的增大,由于与内部数据流的对接,对业务来说反而更加简单。虽然实时性牺牲了一些(整条链路数据延时在 1 分钟以内),但还是可接受的。

另外就是权限控制,权限控制其实是分为两个方面的,一、数据读写的权限控制(安全与隐私),二、资源权限控制(如:并发读写量控制);在 elasticsearch 中,读写权限控制容易做,资源权限控制并不太好做,我们也正在研究中,欢迎大家一起交流。

参考文献

https://www.elastic.co

https://github.com/elastic/elasticsearch

继续阅读