功能介绍
对 es 模块的功能补充,由于 java 和 es 的每次交互都需要编写大量冗余的代码,所以对交互 Api 进行封装,提高交互的便捷性。
成熟的es插件:Easy-Es,使用方式类似 Mybatis-Plus
需求说明
- 调用接口可以直接将实体类转换成es的索引,存在就更新;否则创建新索引;
- 调用接口可以直接将对象数据保存在对应的索引中,只需要根据类名找到对应的索引名即可;
- 提供基本的 CRUD 接口,包括分页请求接口,返回的数据可以直接用 java 实体接收。
需求目的
将 es 功能模块细节透明化,对用户完全不可见,用户只需要调用默认的 Api 即可,但是不会限制用户使用初始的方法和 es 进行交互
需求来源
功能设计主要参考 mybatis-plus, MP 就是封装了所有单表的 CRUD 的交互,用户仅需要调用简单的 Api 接口就可以使用特定的实体接收到数据库返回的数据;但是针对于多表关联的情况 MP 无法解决,这一部分需要用户自己编写 SQL 语句完成具体的业务逻辑;mybatis-plus 是一个功能非常强大的中间件,博主自己封装的 elastic 模块仅仅只是借鉴了其中很小的一部分思路而已;
需求实现
step1:创建注解标签:
用于标注当前类为一个 es 的实体类,value 即索引名称;类似于 mybatis-plus 的 @TableName 注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EsIndex {
/** * 索引名称 */
@AliasFor("value") String name() default "";
/** * 索引名称,必填 */
@AliasFor("name") String value() default "";
}
用于标注当前字段是否需要被记录在 es 的索引中或者标注类似于 mybatis-plus 的 @TableField 注解
@Target({ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EsDocField {
/** * 字段类型 */
FieldEnum type() default FieldEnum.TEXT;
/** * 分词器 */
AnalyzerEnum analyzer() default AnalyzerEnum.STANDARD;
}
用于标注当前字段是否属于文档的 id 值,类似于 mybatis-plus 的 @TableId 注解;
注:当前注解可以缺省,但是会影响部分 CRUD 功能,因为 es 文档不指定 id 值会生成默认的无规则 id,但是指定字段为 id 后,封装的 Api 每次 CRUD 都会将此字段作为文档的 id 值进行交互;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EsDocField
public @interface EsDocId {
/** * 字段类型 */
@AliasFor(annotation = EsDocField.class) FieldEnum type() default FieldEnum.KEYWORD;
/** * 分词器 */
@AliasFor(annotation = EsDocField.class) AnalyzerEnum analyzer() default AnalyzerEnum.NO;
}
step2:创建功能枚举
字段类型枚举,只列举了常用的几种类型,特殊类型可继续拓展
@AllArgsConstructor
@Getter
public enum FieldEnum {
/** * 普通文本类型,一般使用分词器 */
TEXT("text"),
/** * 主键类型,会做等值比较,不分词 */
KEYWORD("keyword"),
/** * 整型 */
INTEGER("integer"),
/** * 浮点型 */
DOUBLE("double"),
/** * 布尔型 */
BOOLEAN("boolean"),
/** * 日期类型 */
DATE("date"),
/** * 单条数据(嵌套对象) */
OBJECT("object"),
/** * 嵌套数组 */
NESTED("nested"),
;
private String type;
}
分词器枚举,包含 IK 中文分词器
@AllArgsConstructor
@Getter
public enum AnalyzerEnum {
/** * 指定不使用分词 */
NO(null),
/** * 标准分词,默认分词器 */
STANDARD("standard"),
/** * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有 */
IK_SMART("ik_smart"),
/** * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语 */
IK_MAX_WORD("ik_max_word");
private String type;
}
step3:创建基础类
创建分页请求的参数体 EsPage
@Data
public class EsPage implements Serializable {
private static final long serialVersionUID = 4818672803196769363L;
/** * 用于查询的实体参数设置 */
private T search;
/** * 当前页码(从 1 开始) */
private int current = 1;
/** * 每页大小 */
private int size = 10;
/** * 结果总数目 */
private long total;
/** * 结果详情条目 */
private List records;
/** * 排序字段和规则 */
private List orders;
}
创建分页请求的参数体 EsOrder
@Data
public class EsOrder implements Serializable {
private static final long serialVersionUID = -8662841283448869349L;
/** * 字段名称 */
private String name;
/** * 默认升序排序 */
private Boolean asc = Boolean.TRUE;
}
交互模板
交互模板即自定义用于和 es 进行交互的工具类模板,对原 RestHighLevelClient 中的功能二次封装
模板1:索引模板
@SpringBootConfiguration
@Slf4j
public class ElasticIndexTemplate {
@Autowired
private RestHighLevelClient restHighLevelClient;
/** * 默认分片数 */
private static int indexNumberOfShards = 2;
/** * 默认副本数 单节点 */
private static int indexNumberOfReplicas = 1;
/** * 自定义分片数 */
public void setIndexNumber(int indexNumberOfShards, int indexNumberOfReplicas) {
ElasticIndexTemplate.indexNumberOfShards = indexNumberOfShards;
ElasticIndexTemplate.indexNumberOfReplicas = indexNumberOfReplicas;
}
/** * 创建索引(默认分片数为5和副本数为1) * * @param clazz 根据实体自动映射es索引 * @return boolean 表示创建失败与否 */
public boolean createIndex(Class clazz) throws IOException {
// 获取索引名
String indexName = createIndexNameByClass(clazz);
// 判断当前索引是否存在
if (isIndexExists(indexName)) {
// 存在返回失败标志
return Boolean.FALSE;
}
// 如果不存在就创建
return createRootIndex(indexName, clazz);
}
/**
* 更新索引(默认分片数为2和副本数为1):
* * 只能给索引上添加一些不存在的字段
* * 已经存在的映射不能改
* * * @param clazz 根据实体自动映射es索引
* * @return 是否删除成功 */
public boolean updateIndex(Class clazz) throws IOException {
// 获取索引名
String indexName = createIndexNameByClass(clazz);
PutMappingRequest request = new PutMappingRequest(indexName);
// 开始组装json
XContentBuilder builder = XContentFactory.jsonBuilder();
request.source(generateBuilder(builder, clazz, null));
AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
return response.isAcknowledged();
}
/** * 删除索引 * * @param clazz 类 * @return 是否删除成功 */
public boolean delIndex(Class clazz) throws IOException {
String indexName = createIndexNameByClass(clazz);
return delIndex(indexName);
}
/** * 删除索引 * * @param indexName 索引名称 * @return 是否删除成功 */
public boolean delIndex(String indexName) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
return delete.isAcknowledged();
}
/** * 创建一个索引 * * @param indexName 索引名称 * @param clazz 用来映射的类 * @return 是否成功 */
private boolean createRootIndex(String indexName, Class clazz) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
// 设置分片数, 副本数
.put("index.number_of_shards", indexNumberOfShards).put("index.number_of_replicas", indexNumberOfReplicas));
// 组装json
XContentBuilder builder = XContentFactory.jsonBuilder();
// 创建根节点,根节点没有名称
request.mapping(generateBuilder(builder, clazz, null));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
// 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
boolean shardsAcknowledged = response.isShardsAcknowledged();
return acknowledged || shardsAcknowledged;
}
/** * 判断索引是否存在 * * @param indexName 索引名称 * @return true:存在,false:不存在 */
private boolean isIndexExists(String indexName) {
boolean exists = Boolean.FALSE;
try {
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
getIndexRequest.humanReadable(Boolean.TRUE);
exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("es 访问失败", e);
}
return exists;
}
/** * 构建更新的json * * @param builder 组装器 * @param clazz 用于映射的类 * @return 组装后的json */
private XContentBuilder generateBuilder(XContentBuilder builder, Class clazz, String nodeName) throws IOException {
// 节点名称,根节点为空
if (StrUtil.isBlank(nodeName)) {
builder.startObject();
} else {
builder.startObject(nodeName);
}
// 创建 properties 节点
builder.startObject("properties");
// 利用反射获取当前类的所有字段,getFields() 只能获取公共字段,但是也可以获取继承的公共字段
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (!AnnotatedElementUtils.isAnnotated(field, EsDocField.class)) {
// 当前字段不含有指定注解,直接忽略
continue;
}
// 通过 ElementUtils 工具类可以获取 AliasFor 标注的注解
EsDocField docField = AnnotatedElementUtils.getMergedAnnotation(field, EsDocField.class);
builder.startObject(field.getName());
builder.field("type", docField.type().getType());
if (FieldEnum.TEXT.equals(docField.type())) {
// 文本类型写上分词器
builder.field("analyzer", docField.analyzer().getType());
// TODO 暂时不对时间格式的字段格式化,时间格式字段序列化目前主流工具都是转换成时间戳
// } else if (FieldEnum.DATE.equals(docField.type())) {
// // 日期类型写上日期格式
// builder.field("format", "yyyy-MM-dd HH:mm:ss");
} else if (FieldEnum.OBJECT.equals(docField.type())) {
// 对象类型(嵌套类型)递归组装,除根节点外的子节点含有名称
generateBuilder(builder, field.getType(), field.getName());
}
builder.endObject();
}
// 关闭 properties 节点
builder.endObject();
// 关闭当前节点
builder.endObject();
return builder;
}
}
模板2: 文档模板
@SpringBootConfiguration
@Slf4j
public class ElasticDocTemplate {
/** * 从文档中读取到的最大数量,控制在 100 */
@Value("${spring.elastic.max-return-record:100}")
private int maxReturnRecord = 100;
@Autowired
private RestHighLevelClient restHighLevelClient;
/** * 查询所有符合条件的数据 * * @param query 用于过滤的参数集合 * @param 索引类型 * @return 结果集 */
public EsPage page(EsPage query, Class clazz) throws Exception {
// 获取查询过滤条件
T search = query.getSearch();
// 获取索引名称
String indexName = ElasticUtil.createIndexNameByClass(clazz);
SearchSourceBuilder queryBuilder = new SearchSourceBuilder();
// 构建查询体
buildQueryBuilder(queryBuilder, search);
// 设置分页
queryBuilder.from(query.getCurrent());
queryBuilder.size(Math.min(query.getSize(), maxReturnRecord));
// 构建排序条件
buildSortOrder(queryBuilder, query.getOrders());
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(queryBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
List list = getHitResult(searchResponse, clazz);
query.setRecords(list);
query.setTotal(searchResponse.getHits().getTotalHits().value);
return query;
}
/** * 查询所有符合条件的数据 * * @param object 用于过滤的参数,null 字段不参与过滤条件 * @param 索引类型 * @return 结果集 */
public List list(T object, Class clazz) throws Exception {
SearchSourceBuilder queryBuilder = new SearchSourceBuilder();
// 构建查询体
buildQueryBuilder(queryBuilder, object);
// 默认返回100条结果
queryBuilder.size(maxReturnRecord);
String indexName = ElasticUtil.createIndexNameByClass(object.getClass());
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(queryBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
return getHitResult(searchResponse, clazz);
}
/** * 插入或更新一条文本 * * @param object es 索引映射的实体 * @return 插入是否成功 */
public boolean saveOrUpdate(T object) throws Exception {
// 获取请求封装体
IndexRequest request = getIndexRequest(object);
// 执行请求
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
}
/** * 插入或更新一条文本 * * @param indexName 索引名称 * @param id 数据id,不提供无自定义id的插入接口 * @param json 序列化后的插入数据 * @return 插入是否成功 */
public boolean saveOrUpdate(String indexName, String id, String json) throws IOException {
// 获取请求封装体
IndexRequest request = getIndexRequest(indexName, id, json);
// 执行请求
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
}
/** * 批量请求体-同步执行 * * @param list 数据集 * @return 执行是否全部成功 */
public boolean saveOrUpdateBatch(List list) throws Exception {
// 组装批量请求体
BulkRequest bulkRequest = new BulkRequest();
for (T object : list) {
bulkRequest.add(getIndexRequest(object));
}
// 执行批量请求
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
RestStatus status = bulkResponse.status();
return RestStatus.CREATED.equals(status) || RestStatus.OK.equals(status);
}
/** * 批量请求体-异步执行 * * @param list 数据集 */
public void saveOrUpdateBatchAsync(List list) throws Exception {
// 组装批量请求体
BulkRequest bulkRequest = new BulkRequest();
for (T object : list) {
bulkRequest.add(getIndexRequest(object));
}
// 执行批量请求
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, createAsyncListener());
}
/** * 获取指定id的文档 * * @param indexName 文档所在索引名称 * @param id 文档id * @return 查询的数据 */
public String getDocById(String indexName, String id) throws Exception {
GetRequest request = new GetRequest(indexName, id);
GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
return response.getSourceAsString();
}
/** * 获取指定id的文档 * * @param indexName 文档所在索引名称 * @param clazz 具体的类对象 * @param id 文档id * @return 查询的数据 */
public T getDocById(String indexName, Class clazz, String id) throws Exception {
GetRequest request = new GetRequest(indexName, id);
GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
String json = response.getSourceAsString();
return JSONUtil.toBean(json, clazz);
}
/** * 删除指定id的文档 * * @param indexName 索引名 * @param id 文档id * @return 删除成功与否 */
public boolean delDocById(String indexName, String id) throws Exception {
DeleteRequest request = new DeleteRequest(indexName, id);
DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
return response.getShardInfo().getSuccessful() > 0;
}
/** * 批量删除指定id的文档-同步 * * @param indexName 索引名称 * @param ids 文档id * @return 成功与否 */
public boolean delDocBatch(String indexName, List ids) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (String id : ids) {
DeleteRequest request = new DeleteRequest(indexName, id);
bulkRequest.add(request);
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return RestStatus.OK.equals(bulkResponse.status());
}
/** * 批量删除指定id的文档-异步 * * @param indexName 索引名称 * @param ids 文档id */
public void delDocBatchAsync(String indexName, List ids) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (String id : ids) {
DeleteRequest request = new DeleteRequest(indexName, id);
bulkRequest.add(request);
}
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, createAsyncListener());
}
/** * 构建查询请求体 * * @param queryBuilder 主查询体 * @param search 查询参数实体 * @param 实体类型 */
private void buildQueryBuilder(SearchSourceBuilder queryBuilder, T search) throws IllegalAccessException {
// 根据查询条件构建
QueryBuilder Field[] fields = search.getClass().getDeclaredFields();
// 默认查询所有,如果存在某个字段不为空,则查询指定字段
Boolean searchAll = Boolean.TRUE;
for (Field field : fields) {
field.setAccessible(Boolean.TRUE);
if (!AnnotatedElementUtils.isAnnotated(field, EsDocField.class) || ObjectUtil.isNull(field.get(search))) {
// 如果当前字段值为 null 不查询,如果字段不包含在索引里面 不查询
continue;
}
searchAll = Boolean.FALSE;
EsDocField docField = AnnotatedElementUtils.getMergedAnnotation(field, EsDocField.class);
if (FieldEnum.NESTED.equals(docField.type()) || FieldEnum.OBJECT.equals(docField.type())) {
// TODO 内嵌查询,内嵌查询过于麻烦,暂时不封装
} else if (!AnalyzerEnum.NO.equals(docField.analyzer())) {
// 指定分词类型匹配,匹配到一个即可
queryBuilder.query(QueryBuilders.matchQuery(field.getName(), field.get(search)).analyzer(docField.analyzer().getType()));
} else {
// 精准匹配
queryBuilder.query(QueryBuilders.termQuery(field.getName(), field.get(search)));
}
}
if (searchAll) {
queryBuilder.query(QueryBuilders.matchAllQuery());
}
}
/** * 从查询的结果中获取到对应的数据并转换成实体 * * @param searchResponse es查询响应结果 * @param clazz 实体类型 * @param 泛型类 */
private List getHitResult(SearchResponse searchResponse, Class clazz) {
if (ObjectUtil.isNull(searchResponse)) {
return CollUtil.newArrayList();
}
SearchHits hits = searchResponse.getHits();
int total = (int) hits.getTotalHits().value;
List list = new ArrayList<>(total);
// 获取泛型的class类型
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
list.add(JSONUtil.toBean(sourceAsString, clazz));
}
return list;
}
/** * 设置排序条件 * * @param queryBuilder 查询构造体 * @param orders 排序集合 */
private void buildSortOrder(SearchSourceBuilder queryBuilder, List orders) {
if (CollUtil.isEmpty(orders)) {
return;
}
// 设置排序条件
for (EsOrder order : orders) {
queryBuilder.sort(order.getName(), order.getAsc() ? SortOrder.ASC : SortOrder.DESC);
}
}
/** * 返回插入或更新一条记录请求的封装 * * @param object es 索引映射的实体 * @return 插入是否成功 */
private IndexRequest getIndexRequest(T object) throws Exception {
Class<?> clazz = object.getClass();
// 获取索引名称
String indexName = createIndexNameByClass(clazz);
// 创建请求体
IndexRequest request = new IndexRequest(indexName);
// 遍历获取id字段
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(EsDocId.class)) {
// 获取当前值
field.setAccessible(Boolean.TRUE);
request.id(field.get(object).toString());
}
}
request.source(JSONUtil.toJsonStr(object), XContentType.JSON);
return request;
}
/** * 执行插入或更新一条文本 * * @param indexName 索引名称 * @param id 数据id,不提供无自定义id的插入接口 * @param json 序列化后的插入数据 * @return 插入是否成功 */
private IndexRequest getIndexRequest(String indexName, String id, String json) {
IndexRequest request = new IndexRequest(indexName);
request.id(id);
request.source(json, XContentType.JSON);
return request;
}
/** * 创建异步执行的监听器 * * @return 监听器 */
private ActionListener createAsyncListener() {
return new ActionListener() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// 响应回调
for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
log.info("当前记录执行失败:{}", itemResponse);
}
}
}
@Override
public void onFailure(Exception ex) {
// 错误回调,忽略当前错误,防止执行中断
log.error("批量执行错误", ex);
}
};
}
}