设为首页 收藏本站
查看: 965|回复: 0

[经验分享] APDPlat拓展搜索之集成ElasticSearch

[复制链接]

尚未签到

发表于 2017-5-21 06:11:28 | 显示全部楼层 |阅读模式
  APDPlat充分利用Compass的OSEMORM integration特性,提供了简单易用功能强大内置搜索特性。
  APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的方式在模型中指定需要搜索哪些字段(还可在模型之间进行关联搜索)就获得了搜索能力,而不用编写任何代码。平台自动处理索引维护、查询解析、结果高亮等支撑功能。
  然而APDPlat的内置搜索只能在单机上面使用,不支持分布式,只能用于中小规模的场景。为了支持大规模的分布式搜索和实时分析,APDPlat选用Compass的进化版ElasticSearch (Compass和ElasticSearch的关系)。
  ElasticSearch提供了Java Client API,但是由于该API依赖于Lucene的org.apache.lucene.util包中的几个类,以致于无法和APDPlat集成,原因是APDPlat中Compass依赖的Lucene的版本和ElasticSearch依赖的版本冲突。
  从这里可以得知,ElasticSearch的Java Client API如果完全移除对Lucene的依赖,仅仅作为用户和ElasticSearch集群之间通信的接口,使用起来就会更方便。
  因此,APDPlat只能采用ElasticSearch的RESTful API
  接下来我们看一个APDPlat和ElasticSearch集成的例子:
  APDPlat提供了可扩展的日志处理接口,用户可编写自己的插件并在配置文件中指定启用哪些插件,日志处理接口如下:

/**
* 日志处理接口:
* 可将日志存入独立日志数据库(非业务数据库)
* 可将日志传递到activemq\rabbitmq\zeromq等消息队列
* 可将日志传递到kafka\flume\chukwa\scribe等日志聚合系统
* 可将日志传递到elasticsearch\solr等搜索服务器
* @author 杨尚川
*/
public interface LogHandler {
public <T extends Model> void handle(List<T> list);
}
  将日志传递到ElasticSearch搜索服务器的实现使用了几个配置信息,这些配置信息默认存放在config.properties中,如下所示:

#elasticsearch服务器配置
elasticsearch.host=localhost
elasticsearch.port=9200
elasticsearch.log.index.name=apdplat_for_log
  因为LogHandler接口中定义的参数List<T> list为泛型,只知道T是Model的子类,而不知道具体是哪一个类,所以我们使用反射的机制来获取具体对象类型:

String simpleName = model.getClass().getSimpleName();
LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
json.append("{\"index\":{\"_index\":\"")
  .append(INDEX_NAME)
  .append("\",\"_type\":\"")
  .append(simpleName)
  .append("\"}}")
  .append("\n");
json.append("{");
  同时,我们利用反射的方式获取对象的字段以及相应的值,并正确处理类型问题:

Field[] fields = model.getClass().getDeclaredFields();
int len = fields.length;
for(int i = 0; i < len; i++){
Field field = fields;
String name = field.getName();
field.setAccessible(true);
Object value = field.get(model);
//小心空指针异常,LogHandler线程会悄无声息地退出!
if(value == null){
LOG.debug("忽略空字段:"+name);
continue;
}
if(i>0){
json.append(",");
}
String valueClass=value.getClass().getSimpleName();
LOG.debug("name: "+name+"   type: "+valueClass);
if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
//提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式
value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
}
String prefix = "\"";
String suffix = "\"";
//提交给ES的数字和布尔值不要加双引号
if("Float".equals(valueClass)
|| "Double".equals(valueClass)
|| "Long".equals(valueClass)
|| "Integer".equals(valueClass)
|| "Short".equals(valueClass)
|| "Boolean".equals(valueClass)){
prefix="";
suffix="";
}
json.append("\"")
.append(name)
.append("\":")
.append(prefix)
.append(value)
.append(suffix);
}
json.append("}\n");
  构造完要提交的JSON数据之后,向服务器发送HTTP PUT请求:

HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
conn.setRequestMethod("PUT");
conn.setDoOutput(true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));   
writer.write(json.toString());
writer.flush();
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
String line = reader.readLine();
while(line != null){
result.append(line);
line = reader.readLine();
}
}
  服务器会以JSON数据格式返回处理结果,我们使用Jackson解析返回的JSON字符串:

JsonNode node = MAPPER.readTree(resultStr);
for(JsonNode item : node.get("items")){
JsonNode createJsonNode = item.get("create");
JsonNode okJsonNode = createJsonNode.get("ok");
if(okJsonNode != null){
boolean r = okJsonNode.getBooleanValue();
if(r){
success++;
}
}else{
JsonNode errorJsonNode = createJsonNode.get("error");
if(errorJsonNode != null){
String errorMessage = errorJsonNode.getTextValue();
LOG.error("索引失败:"+errorMessage);
}
}
}
  下面是ElasticSearchLogHandler完整的实现:

/**
*
* 日志处理实现:
* 将日志保存到ElasticSearch中
* 进行高性能实时搜索和分析
* 支持大规模分布式搜索
*
* @author 杨尚川
*/
@Service
public class ElasticSearchLogHandler implements LogHandler{
private static final APDPlatLogger LOG = new APDPlatLogger(ElasticSearchLogHandler.class);
private static final String INDEX_NAME = PropertyHolder.getProperty("elasticsearch.log.index.name");
private static final String HOST = PropertyHolder.getProperty("elasticsearch.host");
private static final String PORT = PropertyHolder.getProperty("elasticsearch.port");
private static final ObjectMapper MAPPER = new ObjectMapper();
private static URL URL;
private int success;
public ElasticSearchLogHandler(){
LOG.info("elasticsearch.log.index.name: "+INDEX_NAME);
LOG.info("elasticsearch.host: "+HOST);
LOG.info("elasticsearch.port: "+PORT);
try {
URL = new URL("http://"+HOST+":"+PORT+"/_bulk");
} catch (MalformedURLException ex) {
LOG.error("构造URL失败",ex);
}
}
/**
* 批量索引
* 批量提交
*
* @param <T> 泛型参数
* @param list 批量模型
*/
public <T extends Model> void index(List<T> list){
success = 0;
StringBuilder json = new StringBuilder();
int j = 1;
//构造批量索引请求
for(T model : list){
try{
String simpleName = model.getClass().getSimpleName();
LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
json.append("{\"index\":{\"_index\":\"")
.append(INDEX_NAME)
.append("\",\"_type\":\"")
.append(simpleName)
.append("\"}}")
.append("\n");
json.append("{");
Field[] fields = model.getClass().getDeclaredFields();
int len = fields.length;
for(int i = 0; i < len; i++){
Field field = fields;
String name = field.getName();
field.setAccessible(true);
Object value = field.get(model);
//小心空指针异常,LogHandler线程会悄无声息地退出!
if(value == null){
LOG.debug("忽略空字段:"+name);
continue;
}
if(i>0){
json.append(",");
}
String valueClass=value.getClass().getSimpleName();
LOG.debug("name: "+name+"   type: "+valueClass);
if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
//提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式
value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
}
String prefix = "\"";
String suffix = "\"";
//提交给ES的数字和布尔值不要加双引号
if("Float".equals(valueClass)
|| "Double".equals(valueClass)
|| "Long".equals(valueClass)
|| "Integer".equals(valueClass)
|| "Short".equals(valueClass)
|| "Boolean".equals(valueClass)){
prefix="";
suffix="";
}
json.append("\"")
.append(name)
.append("\":")
.append(prefix)
.append(value)
.append(suffix);
}
json.append("}\n");
}catch(SecurityException | IllegalArgumentException | IllegalAccessException e){
LOG.error("构造索引请求失败【"+model.getMetaData()+"】\n"+model, e);
}
}
//批量提交索引
try{
LOG.debug("提交JSON数据:\n"+json.toString());
HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
conn.setRequestMethod("PUT");
conn.setDoOutput(true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));   
writer.write(json.toString());
writer.flush();
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
String line = reader.readLine();
while(line != null){
result.append(line);
line = reader.readLine();
}
}
String resultStr = result.toString();
LOG.debug(resultStr);         
//使用Jackson解析返回的JSON
JsonNode node = MAPPER.readTree(resultStr);
for(JsonNode item : node.get("items")){
JsonNode createJsonNode = item.get("create");
JsonNode okJsonNode = createJsonNode.get("ok");
if(okJsonNode != null){
boolean r = okJsonNode.getBooleanValue();
if(r){
success++;
}
}else{
JsonNode errorJsonNode = createJsonNode.get("error");
if(errorJsonNode != null){
String errorMessage = errorJsonNode.getTextValue();
LOG.error("索引失败:"+errorMessage);
}
}
}
}catch(IOException e){
LOG.error("批量提交索引失败", e);
}
}
@Override
public <T extends Model> void handle(List<T> list) {
LOG.info("开始将 "+list.size()+" 个日志对象索引到ElasticSearch服务器");
long start = System.currentTimeMillis();
index(list);
long cost = System.currentTimeMillis() - start;
if(success != list.size()){
LOG.info("索引失败: "+(list.size()-success)+" 个");            
}
if(success > 0){
LOG.info("索引成功: "+success+" 个");
}
LOG.info("耗时:"+ConvertUtils.getTimeDes(cost));
}
}
  最后我们在配置文件config.local.properties中指定log.handlers的值为ElasticSearchLogHandler类的Spring bean name elasticSearchLogHandler,因为ElasticSearchLogHandler类加了Spring的@Service注解:

log.handlers=elasticSearchLogHandler
  APDPlat托管在Github

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379408-1-1.html 上篇帖子: Elasticsearch java api——节点实例化 下篇帖子: ElasticSearch入门介绍之安装部署(二)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表