设为首页 收藏本站
查看: 1288|回复: 0

[经验分享] 小米Elasticsearch 服务化实践

[复制链接]

尚未签到

发表于 2019-1-29 08:25:58 | 显示全部楼层 |阅读模式
  转载自 :小米运维(公众号>

摘要

  使用 Elasticsearch 做日志检索、分析服务已成为当前互联网公司首选工具之一了。那么怎么与公司内部系统(如数据采集、数据 schema 管理、数据权限等系统)打通,怎么提供一套易用的内部 Elasticsearch 系统便成为了首先需要解决的问题。本文将介绍小米内部 Elasticsearch 服务化之路的演进。


数据链路图
  首先给大家介绍一下我米的数据链路图(老版)

  简单划分了四层,数据采集、数据控制与传输、数据存储与消息队列、数据应用层。
  整个数据链路中,由数据工场负责源数据注册、schema 定义,权限控制。数据注册后会更新 scribe 白名单(刷新 scribe 配置),创建 hdfs 目录,hive 表创建、授权等。还有一个重要功能就是数据 schema 定义。包括 schema 序列化与反序列化协议,字段定义等等。业务方在数据工场注册好数据之后就可以通过 agent 打数据了。一般常用的方式就是 xlogger。xlogger 是我们内部开发的一个 java log sdk,支持 scribe 协议,能够通过 xlogger 直接往 scribe-agent 或者 scribe-server 打数据。scribe-server 根据数据工场定义的规则,将数据写入 hdfs 或者 kafka。然后数据应用层开始通过各种工具进行分析。

怎么将 elasticsearch 服务无缝对接到公司成熟的数据链路中?
  我们主要考虑以下几个因素


  • Elasticsearch 输入源从哪里对接最合适?
  • 怎么结合数据工场定义的 schema 去做反序列化?
  • 怎么给数据做 ETL?
  • 怎么方便管理和监控?
  对与第 1 点,我们选择了对接 kafka 和 hdfs。其中 kafka 走实时数据摄入,hdfs 走离线数据摄入。
  综合 2、3、4 点,我们只有两种方案可选,一是在现有开源组件上做二次开发,与我们内部数据工场打通,能通过数据工场定义的 schema 结构去做反序列化个 ETL。二是自研类似 logstash 的中间件。
  最终我们选择了自研 kafka2es 组件(用 java 开发)。主要实现第 2 和第 3 点。当定位清楚了,那么实现起来,也不复杂。第一版做的非常简单,通过 pom 去把业务在数据工场定义的 schema 的反序列化类引入进来,实现一个通用的 ETL 类(也就是存粹把从 kafka 读出来的 byte 数组的反序列化出来,再转成 json 结构写入 elasticsearch),然后再留一个可配置的空间,让业务自己实现 ETL 逻辑,我们 kafka2es 在 elk 时通过动态加载的方式,调用业务实现的 ETL 类做处理。监控方面就是在各个环节加计数:read kafka 条数、write es 条数、write 失败条数、etl 失败条数,将点打入 falcon 做监控告警。

具体实现
  我们将读 kafka 和写 es 的逻辑全部封装好(这里不详细描述了,代码都非常简单),当业务接入的时候只需要准备一个配置文件和一个 ETL 类,提到我们 kafka2es 的项目中即可。如果不需要特殊处理的数据,则只需一份配置文件即可,样例如下:
  

server.conf  

  
kafka_topic_name=xxx                                #kafka topic 名称
  
kafka_zk_url=xxx                                    #kafka zk 地址
  
kafka_consumer_groupid=xxx                          #kafka 消费者 group>  
decoder_class=xxx                                   #数据分序列化类
  
es_url=xxx                                          #elasticsearch 集群地址
  
es_cluster_name=es-test                             #elasticsearch 集群名称
  
es_index_name=xxx                                   #写入索引名称
  
es_index_name_suffix_format=yyyy.MM.dd              #索引日期后缀规则,如按天建索引则是yyyy.MM.dd 按月建索引则是yyyy.MM
  
es_index_type_name=doc                              #索引type名,统一默认使用doc
  
es_authtoken=xxx                                    #权限token,通过账号密码加密得到
  
parser_class=com.xiaomi.data.CommonParser           #ETL类 一般为业务自行实现,主要用于数据清洗和过滤
  
thread_num=1                                        #处理线程数
  
es_index_pipeline=test-pipeline                     #pipeline,如nginx日志,apache日志通过pipeline处理更为简单,默认所有data节点都开了ingest功能,master节点和client节点都不开ingest功能
  

  以上配置分为三个部分:kafka 相关,elasticsearch 集群相关,ETL 相关。整个 kafka2es 项目也封装了 三个部分,写读 kafka,做 ETL,写 elasticsearch。
  ETL 类怎么抽象出来让业务自行实现?
  

public abstract>
/**
  * @param log byte[]
  * @return json string
  */
  
public abstract  String parser(byte[] log);
  
}
  

  通过定义个一个抽象类 BaseParser,抽象方法 parser,业务 ETL 类只需要基础 BaseParser,实现 parser,parser 方法的参数是从 kafka 读出来的消息体 byte[],返回值必须是 json string。
  可以说是非常简单,整个数据流就这样跑通了(目前主要做了实时接入,对接 kafka)。


怎么实现 elasticsearch 的多租户权限管理?权限管理系统账号怎么与公司内部账号系统打通?
  调研了几个 es 权限管理插件,如: x-pack 、search-guard、elasticsearch-http-user-auth 等。x-pack 为商用,放弃;search-guard 实现略复杂,从可维护性和二次开发上看都不太满意。elasticsearch-http-user-auth 非常非常简洁,但是只支持 http 鉴权,没有 transport 鉴权。
  github 地址
  search-guard:https://github.com/floragunncom/search-guard
  elasticsearch-http-user-auth:https://github.com/elasticfence/elasticsearch-http-user-auth
  于是我们还是决定自己写一个:es-authority-manager-mi
  主要功能实现了:


  • 索引的读写权限管理(http+transport)
  • 账号体系与公司内部 kerberos 账号打通
  • 所有用户行为记录
  下面我简单分析一下鉴权插件的实现 (elasticsearch-5.6.2):
  首先看插件的主类
  

public>
  public AuthorityManagerMiPlugin(final Settings settings) {
  //插件初始化
  }
  

  //restful 接口注册, AuthorityManagerAction实现插件api接口
  @Override
  public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
  IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
  IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) {
  final List handlers = new ArrayList(1);
  if (!AMMPluginIsDisabled) {
  handlers.add(new AuthorityManagerAction(settings, restController));
  }
  return handlers;
  }
  

  //transport拦截器注册
  @Override
  public List

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-668942-1-1.html 上篇帖子: Elasticsearch索引定时清理 下篇帖子: Elasticsearch在智能运维领域的应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表