APDPlat充分利用Compass的OSEM和ORM integration特性,提供了简单易用且功能强大的内置搜索特性。
APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的方式在模型中指定需要搜索哪些字段(还可在模型之间进行关联搜索)就获得了搜索能力,而不用编写任何代码。平台自动处理索引维护、查询解析、结果高亮等支撑功能。
然而APDPlat的内置搜索只能在单机上面使用,不支持分布式,只能用于中小规模的场景。为了支持大规模的分布式搜索和实时分析,APDPlat选用Compass的进化版ElasticSearch (Compass和ElasticSearch的关系)。
ElasticSearch提供了Java Client API,但是由于该API依赖于Lucene的org.apache.lucene.util包中的几个类,以致于无法和APDPlat集成,原因是APDPlat中Compass依赖的Lucene的版本和ElasticSearch依赖的版本冲突。
从这里可以得知,ElasticSearch的Java Client API如果完全移除对Lucene的依赖,仅仅作为用户和ElasticSearch集群之间通信的接口,使用起来就会更方便。
因此,APDPlat只能采用ElasticSearch的RESTful API。
接下来我们看一个APDPlat和ElasticSearch集成的例子:
APDPlat提供了可扩展的日志处理接口,用户可编写自己的插件并在配置文件中指定启用哪些插件,日志处理接口如下:
/** * 日志处理接口: * 可将日志存入独立日志数据库(非业务数据库) * 可将日志传递到activemq\rabbitmq\zeromq等消息队列 * 可将日志传递到kafka\flume\chukwa\scribe等日志聚合系统 * 可将日志传递到elasticsearch\solr等搜索服务器 * @author 杨尚川 */ public interface LogHandler { public <T extends Model> void handle(List<T> list); }
将日志传递到ElasticSearch搜索服务器的实现使用了几个配置信息,这些配置信息默认存放在config.properties中,如下所示:
#elasticsearch服务器配置 elasticsearch.host=localhost elasticsearch.port=9200 elasticsearch.log.index.name=apdplat_for_log
因为LogHandler接口中定义的参数List<T> list为泛型,只知道T是Model的子类,而不知道具体是哪一个类,所以我们使用反射的机制来获取具体对象类型:
String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); json.append("{\"index\":{\"_index\":\"") .append(INDEX_NAME) .append("\",\"_type\":\"") .append(simpleName) .append("\"}}") .append("\n"); json.append("{");
同时,我们利用反射的方式获取对象的字段以及相应的值,并正确处理类型问题:
Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //小心空指针异常,LogHandler线程会悄无声息地退出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } if(i>0){ json.append(","); } String valueClass=value.getClass().getSimpleName(); LOG.debug("name: "+name+" type: "+valueClass); if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){ //提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式 value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T"); } String prefix = "\""; String suffix = "\""; //提交给ES的数字和布尔值不要加双引号 if("Float".equals(valueClass) || "Double".equals(valueClass) || "Long".equals(valueClass) || "Integer".equals(valueClass) || "Short".equals(valueClass) || "Boolean".equals(valueClass)){ prefix=""; suffix=""; } json.append("\"") .append(name) .append("\":") .append(prefix) .append(value) .append(suffix); } json.append("}\n");
构造完要提交的JSON数据之后,向服务器发送HTTP PUT请求:
HttpURLConnection conn = (HttpURLConnection) URL.openConnection(); conn.setRequestMethod("PUT"); conn.setDoOutput(true); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8")); writer.write(json.toString()); writer.flush(); StringBuilder result = new StringBuilder(); try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) { String line = reader.readLine(); while(line != null){ result.append(line); line = reader.readLine(); } }
服务器会以JSON数据格式返回处理结果,我们使用Jackson解析返回的JSON字符串:
JsonNode node = MAPPER.readTree(resultStr); for(JsonNode item : node.get("items")){ JsonNode createJsonNode = item.get("create"); JsonNode okJsonNode = createJsonNode.get("ok"); if(okJsonNode != null){ boolean r = okJsonNode.getBooleanValue(); if(r){ success++; } }else{ JsonNode errorJsonNode = createJsonNode.get("error"); if(errorJsonNode != null){ String errorMessage = errorJsonNode.getTextValue(); LOG.error("索引失败:"+errorMessage); } } }
下面是ElasticSearchLogHandler完整的实现:
/** * * 日志处理实现: * 将日志保存到ElasticSearch中 * 进行高性能实时搜索和分析 * 支持大规模分布式搜索 * * @author 杨尚川 */ @Service public class ElasticSearchLogHandler implements LogHandler{ private static final APDPlatLogger LOG = new APDPlatLogger(ElasticSearchLogHandler.class); private static final String INDEX_NAME = PropertyHolder.getProperty("elasticsearch.log.index.name"); private static final String HOST = PropertyHolder.getProperty("elasticsearch.host"); private static final String PORT = PropertyHolder.getProperty("elasticsearch.port"); private static final ObjectMapper MAPPER = new ObjectMapper(); private static URL URL; private int success; public ElasticSearchLogHandler(){ LOG.info("elasticsearch.log.index.name: "+INDEX_NAME); LOG.info("elasticsearch.host: "+HOST); LOG.info("elasticsearch.port: "+PORT); try { URL = new URL("http://"+HOST+":"+PORT+"/_bulk"); } catch (MalformedURLException ex) { LOG.error("构造URL失败",ex); } } /** * 批量索引 * 批量提交 * * @param <T> 泛型参数 * @param list 批量模型 */ public <T extends Model> void index(List<T> list){ success = 0; StringBuilder json = new StringBuilder(); int j = 1; //构造批量索引请求 for(T model : list){ try{ String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); json.append("{\"index\":{\"_index\":\"") .append(INDEX_NAME) .append("\",\"_type\":\"") .append(simpleName) .append("\"}}") .append("\n"); json.append("{"); Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //小心空指针异常,LogHandler线程会悄无声息地退出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } if(i>0){ json.append(","); } String valueClass=value.getClass().getSimpleName(); LOG.debug("name: "+name+" type: "+valueClass); if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){ //提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式 value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T"); } String prefix = "\""; String suffix = "\""; //提交给ES的数字和布尔值不要加双引号 if("Float".equals(valueClass) || "Double".equals(valueClass) || "Long".equals(valueClass) || "Integer".equals(valueClass) || "Short".equals(valueClass) || "Boolean".equals(valueClass)){ prefix=""; suffix=""; } json.append("\"") .append(name) .append("\":") .append(prefix) .append(value) .append(suffix); } json.append("}\n"); }catch(SecurityException | IllegalArgumentException | IllegalAccessException e){ LOG.error("构造索引请求失败【"+model.getMetaData()+"】\n"+model, e); } } //批量提交索引 try{ LOG.debug("提交JSON数据:\n"+json.toString()); HttpURLConnection conn = (HttpURLConnection) URL.openConnection(); conn.setRequestMethod("PUT"); conn.setDoOutput(true); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8")); writer.write(json.toString()); writer.flush(); StringBuilder result = new StringBuilder(); try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) { String line = reader.readLine(); while(line != null){ result.append(line); line = reader.readLine(); } } String resultStr = result.toString(); LOG.debug(resultStr); //使用Jackson解析返回的JSON JsonNode node = MAPPER.readTree(resultStr); for(JsonNode item : node.get("items")){ JsonNode createJsonNode = item.get("create"); JsonNode okJsonNode = createJsonNode.get("ok"); if(okJsonNode != null){ boolean r = okJsonNode.getBooleanValue(); if(r){ success++; } }else{ JsonNode errorJsonNode = createJsonNode.get("error"); if(errorJsonNode != null){ String errorMessage = errorJsonNode.getTextValue(); LOG.error("索引失败:"+errorMessage); } } } }catch(IOException e){ LOG.error("批量提交索引失败", e); } } @Override public <T extends Model> void handle(List<T> list) { LOG.info("开始将 "+list.size()+" 个日志对象索引到ElasticSearch服务器"); long start = System.currentTimeMillis(); index(list); long cost = System.currentTimeMillis() - start; if(success != list.size()){ LOG.info("索引失败: "+(list.size()-success)+" 个"); } if(success > 0){ LOG.info("索引成功: "+success+" 个"); } LOG.info("耗时:"+ConvertUtils.getTimeDes(cost)); } }
最后我们在配置文件config.local.properties中指定log.handlers的值为ElasticSearchLogHandler类的Spring bean name elasticSearchLogHandler,因为ElasticSearchLogHandler类加了Spring的@Service注解:
log.handlers=elasticSearchLogHandler
相关推荐
spring spingmvc 集成elasticSearch 5.5.x版本 ,基本的增删改查.完成
一般来说我们开发Elasticsearch会选择使用集成springboot,在网上找的springboot集成elasticsearch文章几乎都是extends ElasticsearchRepository的方式来实现。但是新版本ElasticsearchRepository里的方法基本上已经...
springboot 集成elasticsearch ,api的使用, elasticsearch版本为7.13.4, springboot版本 2.5.3 jdk8
本书以实例讲述如何在Spring框架之上搭建ElasticSearch开发,以及如何利用JPA建立、更新和删除索引,如何配置ElasticSearch Server的applicationContext等。
本实例为博主原创,属于简单易上手并且能够拿来就用的SpringBoot ES 项目,全文使用的是ElasticsearchTemplate进行开发。 本实例涵盖ES中的各类操作,如索引操作、CRUD操作、批处理、结果排序、分页查询、检索查询、...
SpringBoot 集成ElasticSearch两个依赖的jar下载,大家可以看一下,我的项目启动了,不懂加Q 243517277,请注明来源
好记性不如烂笔头哦~,ElasticSearch,简称es,es是一个开源的高拓展的分布式全文搜索引擎它可以近乎实时的存储、检索数据;本身拓展性很好,可以拓展到上百台服务器,处理PB级别的数据。es也是用Java开发并使用...
本实例属于简单易上手并且能够拿来就用的SpringBoot ES 项目,全文使用的是ElasticsearchTemplate进行开发。 本实例涵盖ES中的各类操作,如索引操作、CRUD操作、批处理、结果排序、分页查询、检索查询、关键字查询、...
Atlas2.2.0编译、安装及使用(集成ElasticSearch,导入Hive数据).doc
springboot 2.0.2集成elasticsearch5.5.1,并使用集群模式,亲测可用!!!
ElasticSearch8.x配置类 非加密模式与加密模式 最新配置,使用elastic客户端-co.elastic.clients.transport+springboot开发配置类,配置包含加密客户端和非加密客户端两种模式,开发可用
Springboot集成Elasticsearch+京东搜索实战代码
原elasticsearch sql(不是官方收费版)只支持select查询查找,现已完善支持增删改查功能且支持mybatis集成
springboot集成elasticSearch的完整demo,包含从实体创建、controller、server、dap及具体实现。
elasticsearch-ruby - Ruby集成Elasticsearch
ElasticSearch 官方 java API
基于SSM架构结合全文搜索引擎ElasticSearch的电影搜索系统项目源码.zip 基于SSM架构结合全文搜索引擎ElasticSearch的电影搜索系统项目源码.zip 基于SSM架构结合全文搜索引擎ElasticSearch的电影搜索系统项目源码.zip...
网上的学习资料有限,正好我最近在研究es,所以写了一个demo。 springboot集成elasticsearch5.1,支持关键字的拼音和汉字检索。 接口: 1.查询部门树 2.查询产品信息
扫描完整版带书签 Elasticsearch集成Hadoop最佳实践 Elasticsearch集成Hadoop最佳实践