十、微服务进阶之 ElasticSearch(接口封装)

十、微服务进阶之 ElasticSearch(接口封装)

功能介绍

对 es 模块的功能补充,由于 java 和 es 的每次交互都需要编写大量冗余的代码,所以对交互 Api 进行封装,提高交互的便捷性。

成熟的es插件:Easy-Es,使用方式类似 Mybatis-Plus

需求说明
  1. 调用接口可以直接将实体类转换成es的索引,存在就更新;否则创建新索引;
  2. 调用接口可以直接将对象数据保存在对应的索引中,只需要根据类名找到对应的索引名即可;
  3. 提供基本的 CRUD 接口,包括分页请求接口,返回的数据可以直接用 java 实体接收。
需求目的

将 es 功能模块细节透明化,对用户完全不可见,用户只需要调用默认的 Api 即可,但是不会限制用户使用初始的方法和 es 进行交互

需求来源

功能设计主要参考 mybatis-plus, MP 就是封装了所有单表的 CRUD 的交互,用户仅需要调用简单的 Api 接口就可以使用特定的实体接收到数据库返回的数据;但是针对于多表关联的情况 MP 无法解决,这一部分需要用户自己编写 SQL 语句完成具体的业务逻辑;mybatis-plus 是一个功能非常强大的中间件,博主自己封装的 elastic 模块仅仅只是借鉴了其中很小的一部分思路而已;

需求实现

step1创建注解标签

用于标注当前类为一个 es 的实体类,value 即索引名称;类似于 mybatis-plus 的 @TableName 注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EsIndex {
    /** * 索引名称 */
    @AliasFor("value") String name() default "";

    /** * 索引名称,必填 */
    @AliasFor("name") String value() default "";
}

用于标注当前字段是否需要被记录在 es 的索引中或者标注类似于 mybatis-plus 的 @TableField 注解

@Target({ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EsDocField {
    /** * 字段类型 */
    FieldEnum type() default FieldEnum.TEXT;

    /** * 分词器 */
    AnalyzerEnum analyzer() default AnalyzerEnum.STANDARD;
}

用于标注当前字段是否属于文档的 id 值,类似于 mybatis-plus 的 @TableId 注解;
注:当前注解可以缺省,但是会影响部分 CRUD 功能,因为 es 文档不指定 id 值会生成默认的无规则 id,但是指定字段为 id 后,封装的 Api 每次 CRUD 都会将此字段作为文档的 id 值进行交互;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EsDocField
public @interface EsDocId {
    /** * 字段类型 */
    @AliasFor(annotation = EsDocField.class) FieldEnum type() default FieldEnum.KEYWORD;

    /** * 分词器 */
    @AliasFor(annotation = EsDocField.class) AnalyzerEnum analyzer() default AnalyzerEnum.NO;
}

step2:创建功能枚举

字段类型枚举,只列举了常用的几种类型,特殊类型可继续拓展

@AllArgsConstructor
@Getter
public enum FieldEnum {
    /** * 普通文本类型,一般使用分词器 */ 
    TEXT("text"),
    /** * 主键类型,会做等值比较,不分词 */ 
    KEYWORD("keyword"),
    /** * 整型 */ 
    INTEGER("integer"),
    /** * 浮点型 */ 
    DOUBLE("double"),
    /** * 布尔型 */ 
    BOOLEAN("boolean"),
    /** * 日期类型 */ 
    DATE("date"),
    /** * 单条数据(嵌套对象) */ 
    OBJECT("object"),
    /** * 嵌套数组 */ 
    NESTED("nested"),
    ;
    private String type;
}

分词器枚举,包含 IK 中文分词器

@AllArgsConstructor
@Getter
public enum AnalyzerEnum {
    /** * 指定不使用分词 */ 
    NO(null),
    /** * 标准分词,默认分词器 */ 
    STANDARD("standard"),
    /** * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有 */ 
    IK_SMART("ik_smart"),
    /** * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语 */ 
    IK_MAX_WORD("ik_max_word");
    private String type;
}

step3:创建基础类

创建分页请求的参数体 EsPage

@Data
public class EsPage implements Serializable {
    private static final long serialVersionUID = 4818672803196769363L;
    /** * 用于查询的实体参数设置 */
    private T search;
    /** * 当前页码(从 1 开始) */
    private int current = 1;
    /** * 每页大小 */
    private int size = 10;
    /** * 结果总数目 */
    private long total;
    /** * 结果详情条目 */
    private List records;
    /** * 排序字段和规则 */
    private List orders;
}

创建分页请求的参数体 EsOrder

@Data
public class EsOrder implements Serializable {
    private static final long serialVersionUID = -8662841283448869349L;
    /** * 字段名称 */
    private String name;
    /** * 默认升序排序 */
    private Boolean asc = Boolean.TRUE;
}
交互模板

交互模板即自定义用于和 es 进行交互的工具类模板,对原 RestHighLevelClient 中的功能二次封装

模板1:索引模板

@SpringBootConfiguration
@Slf4j
public class ElasticIndexTemplate {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    /** * 默认分片数 */
    private static int indexNumberOfShards = 2;
    /** * 默认副本数 单节点 */
    private static int indexNumberOfReplicas = 1;

    /** * 自定义分片数 */
    public void setIndexNumber(int indexNumberOfShards, int indexNumberOfReplicas) {
        ElasticIndexTemplate.indexNumberOfShards = indexNumberOfShards;
        ElasticIndexTemplate.indexNumberOfReplicas = indexNumberOfReplicas;
    }

    /** * 创建索引(默认分片数为5和副本数为1) * * @param clazz 根据实体自动映射es索引 * @return boolean 表示创建失败与否 */
    public boolean createIndex(Class clazz) throws IOException {
        // 获取索引名
        String indexName = createIndexNameByClass(clazz);
        // 判断当前索引是否存在
        if (isIndexExists(indexName)) {
            // 存在返回失败标志
            return Boolean.FALSE;
        }
        // 如果不存在就创建
        return createRootIndex(indexName, clazz);
    }

    /**
     *  更新索引(默认分片数为2和副本数为1):
     * * 只能给索引上添加一些不存在的字段
     * * 已经存在的映射不能改
     * * * @param clazz 根据实体自动映射es索引
     * * @return 是否删除成功 */
    public boolean updateIndex(Class clazz) throws IOException {
        // 获取索引名
        String indexName = createIndexNameByClass(clazz);
        PutMappingRequest request = new PutMappingRequest(indexName);
        // 开始组装json
        XContentBuilder builder = XContentFactory.jsonBuilder();
        request.source(generateBuilder(builder, clazz, null));
        AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        return response.isAcknowledged();
    }

    /** * 删除索引 * * @param clazz 类 * @return 是否删除成功 */
    public boolean delIndex(Class clazz) throws IOException {
        String indexName = createIndexNameByClass(clazz);
        return delIndex(indexName);
    }

    /** * 删除索引 * * @param indexName 索引名称 * @return 是否删除成功 */
    public boolean delIndex(String indexName) throws IOException {
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
        deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        return delete.isAcknowledged();
    }

    /** * 创建一个索引 * * @param indexName 索引名称 * @param clazz 用来映射的类 * @return 是否成功 */
    private boolean createRootIndex(String indexName, Class clazz) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.settings(Settings.builder()
                // 设置分片数, 副本数
                .put("index.number_of_shards", indexNumberOfShards).put("index.number_of_replicas", indexNumberOfReplicas));
        // 组装json
        XContentBuilder builder = XContentFactory.jsonBuilder();
        // 创建根节点,根节点没有名称
        request.mapping(generateBuilder(builder, clazz, null));
        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        boolean acknowledged = response.isAcknowledged();
        // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
        boolean shardsAcknowledged = response.isShardsAcknowledged();
        return acknowledged || shardsAcknowledged;
    }

    /** * 判断索引是否存在 * * @param indexName 索引名称 * @return true:存在,false:不存在 */
    private boolean isIndexExists(String indexName) {
        boolean exists = Boolean.FALSE;
        try {
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            getIndexRequest.humanReadable(Boolean.TRUE);
            exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("es 访问失败", e);
        }
        return exists;
    }

    /** * 构建更新的json * * @param builder 组装器 * @param clazz 用于映射的类 * @return 组装后的json */
    private XContentBuilder generateBuilder(XContentBuilder builder, Class clazz, String nodeName) throws IOException {
        // 节点名称,根节点为空
        if (StrUtil.isBlank(nodeName)) {
            builder.startObject();
        } else {
            builder.startObject(nodeName);
        }
        // 创建 properties 节点 
        builder.startObject("properties");
        // 利用反射获取当前类的所有字段,getFields() 只能获取公共字段,但是也可以获取继承的公共字段 
        Field[] fields = clazz.getDeclaredFields();
        for (Field field : fields) {
            if (!AnnotatedElementUtils.isAnnotated(field, EsDocField.class)) {
                // 当前字段不含有指定注解,直接忽略 
                continue;
            }
            // 通过 ElementUtils 工具类可以获取 AliasFor 标注的注解 
            EsDocField docField = AnnotatedElementUtils.getMergedAnnotation(field, EsDocField.class);
            builder.startObject(field.getName());
            builder.field("type", docField.type().getType());
            if (FieldEnum.TEXT.equals(docField.type())) {
                // 文本类型写上分词器 
                builder.field("analyzer", docField.analyzer().getType());
                // TODO 暂时不对时间格式的字段格式化,时间格式字段序列化目前主流工具都是转换成时间戳 
                // } else if (FieldEnum.DATE.equals(docField.type())) { 
                // // 日期类型写上日期格式 
                // builder.field("format", "yyyy-MM-dd HH:mm:ss"); 
            } else if (FieldEnum.OBJECT.equals(docField.type())) {
                // 对象类型(嵌套类型)递归组装,除根节点外的子节点含有名称 
                generateBuilder(builder, field.getType(), field.getName());
            }
            builder.endObject();
        }
        // 关闭 properties 节点 
        builder.endObject();
        // 关闭当前节点 
        builder.endObject();
        return builder;
    }
}

模板2: 文档模板

@SpringBootConfiguration
@Slf4j
public class ElasticDocTemplate {
    /** * 从文档中读取到的最大数量,控制在 100 */
    @Value("${spring.elastic.max-return-record:100}")
    private int maxReturnRecord = 100;
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /** * 查询所有符合条件的数据 * * @param query 用于过滤的参数集合 * @param 索引类型 * @return 结果集 */
    public EsPage page(EsPage query, Class clazz) throws Exception {
        // 获取查询过滤条件
        T search = query.getSearch();
        // 获取索引名称
        String indexName = ElasticUtil.createIndexNameByClass(clazz);
        SearchSourceBuilder queryBuilder = new SearchSourceBuilder();
        // 构建查询体
        buildQueryBuilder(queryBuilder, search);
        // 设置分页
        queryBuilder.from(query.getCurrent());
        queryBuilder.size(Math.min(query.getSize(), maxReturnRecord));
        // 构建排序条件
        buildSortOrder(queryBuilder, query.getOrders());
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(queryBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        List list = getHitResult(searchResponse, clazz);
        query.setRecords(list);
        query.setTotal(searchResponse.getHits().getTotalHits().value);
        return query;
    }

    /** * 查询所有符合条件的数据 * * @param object 用于过滤的参数,null 字段不参与过滤条件 * @param 索引类型 * @return 结果集 */
    public List list(T object, Class clazz) throws Exception {
        SearchSourceBuilder queryBuilder = new SearchSourceBuilder();
        // 构建查询体
        buildQueryBuilder(queryBuilder, object);
        // 默认返回100条结果
        queryBuilder.size(maxReturnRecord);
        String indexName = ElasticUtil.createIndexNameByClass(object.getClass());
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(queryBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        return getHitResult(searchResponse, clazz);
    }

    /** * 插入或更新一条文本 * * @param object es 索引映射的实体 * @return 插入是否成功 */
    public boolean saveOrUpdate(T object) throws Exception {
        // 获取请求封装体 
        IndexRequest request = getIndexRequest(object);
        // 执行请求 
        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        RestStatus status = response.status();
        return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
    }

    /** * 插入或更新一条文本 * * @param indexName 索引名称 * @param id 数据id,不提供无自定义id的插入接口 * @param json 序列化后的插入数据 * @return 插入是否成功 */
    public boolean saveOrUpdate(String indexName, String id, String json) throws IOException {
        // 获取请求封装体 
        IndexRequest request = getIndexRequest(indexName, id, json);
        // 执行请求 
        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        RestStatus status = response.status();
        return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
    }

    /** * 批量请求体-同步执行 * * @param list 数据集 * @return 执行是否全部成功 */
    public boolean saveOrUpdateBatch(List list) throws Exception {
        // 组装批量请求体 
        BulkRequest bulkRequest = new BulkRequest();
        for (T object : list) {
            bulkRequest.add(getIndexRequest(object));
        }
        // 执行批量请求 
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        RestStatus status = bulkResponse.status();
        return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
    }

    /** * 批量请求体-异步执行 * * @param list 数据集 */
    public void saveOrUpdateBatchAsync(List list) throws Exception {
        // 组装批量请求体
        BulkRequest bulkRequest = new BulkRequest();
        for (T object : list) {
            bulkRequest.add(getIndexRequest(object));
        }
        // 执行批量请求 
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, createAsyncListener());
    }

    /** * 获取指定id的文档 * * @param indexName 文档所在索引名称 * @param id 文档id * @return 查询的数据 */
    public String getDocById(String indexName, String id) throws Exception {
        GetRequest request = new GetRequest(indexName, id);
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        return response.getSourceAsString();
    }

    /** * 获取指定id的文档 * * @param indexName 文档所在索引名称 * @param clazz 具体的类对象 * @param id 文档id * @return 查询的数据 */
    public T getDocById(String indexName, Class clazz, String id) throws Exception {
        GetRequest request = new GetRequest(indexName, id);
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        String json = response.getSourceAsString();
        return JSONUtil.toBean(json, clazz);
    }

    /** * 删除指定id的文档 * * @param indexName 索引名 * @param id 文档id * @return 删除成功与否 */
    public boolean delDocById(String indexName, String id) throws Exception {
        DeleteRequest request = new DeleteRequest(indexName, id);
        DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        return response.getShardInfo().getSuccessful() > 0;
    }

    /** * 批量删除指定id的文档-同步 * * @param indexName 索引名称 * @param ids 文档id * @return 成功与否 */
    public boolean delDocBatch(String indexName, List ids) throws Exception {
        BulkRequest bulkRequest = new BulkRequest();
        for (String id : ids) {
            DeleteRequest request = new DeleteRequest(indexName, id);
            bulkRequest.add(request);
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        return RestStatus.OK.equals(bulkResponse.status());
    }

    /** * 批量删除指定id的文档-异步 * * @param indexName 索引名称 * @param ids 文档id */
    public void delDocBatchAsync(String indexName, List ids) throws Exception {
        BulkRequest bulkRequest = new BulkRequest();
        for (String id : ids) {
            DeleteRequest request = new DeleteRequest(indexName, id);
            bulkRequest.add(request);
        }
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, createAsyncListener());
    }

    /** * 构建查询请求体 * * @param queryBuilder 主查询体 * @param search 查询参数实体 * @param 实体类型 */
    private void buildQueryBuilder(SearchSourceBuilder queryBuilder, T search) throws IllegalAccessException {
        // 根据查询条件构建 
        QueryBuilder Field[] fields = search.getClass().getDeclaredFields();
        // 默认查询所有,如果存在某个字段不为空,则查询指定字段 
        Boolean searchAll = Boolean.TRUE;
        for (Field field : fields) {
            field.setAccessible(Boolean.TRUE);
            if (!AnnotatedElementUtils.isAnnotated(field, EsDocField.class) || ObjectUtil.isNull(field.get(search))) {
                // 如果当前字段值为 null 不查询,如果字段不包含在索引里面 不查询 
                continue;
            }
            searchAll = Boolean.FALSE;
            EsDocField docField = AnnotatedElementUtils.getMergedAnnotation(field, EsDocField.class);
            if (FieldEnum.NESTED.equals(docField.type()) || FieldEnum.OBJECT.equals(docField.type())) {
                // TODO 内嵌查询,内嵌查询过于麻烦,暂时不封装 
            } else if (!AnalyzerEnum.NO.equals(docField.analyzer())) {
                // 指定分词类型匹配,匹配到一个即可
                queryBuilder.query(QueryBuilders.matchQuery(field.getName(), field.get(search)).analyzer(docField.analyzer().getType()));
            } else {
                // 精准匹配 
                queryBuilder.query(QueryBuilders.termQuery(field.getName(), field.get(search)));
            }
        }
        if (searchAll) {
            queryBuilder.query(QueryBuilders.matchAllQuery());
        }
    }

    /** * 从查询的结果中获取到对应的数据并转换成实体 * * @param searchResponse es查询响应结果 * @param clazz 实体类型 * @param 泛型类 */
    private List getHitResult(SearchResponse searchResponse, Class clazz) {
        if (ObjectUtil.isNull(searchResponse)) {
            return CollUtil.newArrayList();
        }
        SearchHits hits = searchResponse.getHits();
        int total = (int) hits.getTotalHits().value;
        List list = new ArrayList<>(total);
        // 获取泛型的class类型 
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            list.add(JSONUtil.toBean(sourceAsString, clazz));
        }
        return list;
    }

    /** * 设置排序条件 * * @param queryBuilder 查询构造体 * @param orders 排序集合 */
    private void buildSortOrder(SearchSourceBuilder queryBuilder, List orders) {
        if (CollUtil.isEmpty(orders)) {
            return;
        }
        // 设置排序条件 
        for (EsOrder order : orders) {
            queryBuilder.sort(order.getName(), order.getAsc() ? SortOrder.ASC : SortOrder.DESC);
        }
    }

    /** * 返回插入或更新一条记录请求的封装 * * @param object es 索引映射的实体 * @return 插入是否成功 */
    private IndexRequest getIndexRequest(T object) throws Exception {
        Class<?> clazz = object.getClass();
        // 获取索引名称 
        String indexName = createIndexNameByClass(clazz);
        // 创建请求体 
        IndexRequest request = new IndexRequest(indexName);
        // 遍历获取id字段 
        Field[] fields = clazz.getDeclaredFields();
        for (Field field : fields) {
            if (field.isAnnotationPresent(EsDocId.class)) {
                // 获取当前值 
                field.setAccessible(Boolean.TRUE);
                request.id(field.get(object).toString());
            }
        }
        request.source(JSONUtil.toJsonStr(object), XContentType.JSON);
        return request;
    }

    /** * 执行插入或更新一条文本 * * @param indexName 索引名称 * @param id 数据id,不提供无自定义id的插入接口 * @param json 序列化后的插入数据 * @return 插入是否成功 */
    private IndexRequest getIndexRequest(String indexName, String id, String json) {
        IndexRequest request = new IndexRequest(indexName);
        request.id(id);
        request.source(json, XContentType.JSON);
        return request;
    }

    /** * 创建异步执行的监听器 * * @return 监听器 */
    private ActionListener createAsyncListener() {
        return new ActionListener() {
            @Override
            public void onResponse(BulkResponse bulkResponse) {
                // 响应回调 
                for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
                    if (itemResponse.isFailed()) {
                        log.info("当前记录执行失败:{}", itemResponse);
                    }
                }
            }

            @Override
            public void onFailure(Exception ex) {
                // 错误回调,忽略当前错误,防止执行中断 
                log.error("批量执行错误", ex);
            }
        };
    }
}