Java realizes the addition, deletion, modification and query code of ES database
•
Java
import com.alibaba.fastjson.JSONObject; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexReqest; import org.elasticsearch.action.search.*; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.client.indices.GetIndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.*; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.sortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.message.BasicHeader; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; public class ESService { private Logger logger=LoggerFactory.getLogger(this.getClass()); private RestHighLevelClient client; public ESService(String host,int port,int connectTimeOut,int socketTimeOut){ RestClientBuilder restClientBuilder=restClientBuilder(host,port); restClientBuilder.setRequestConfigCallback( builder -> builder.setConnectTimeOut(connectTimeOut).setSocketTimeOut(socketTimeOut)); client = new RestHighLevelClient(restClientBuilder); } public boolean deleteDoc(String index,String id){ try{ DeleteRequest deleteRequest=new DeleteRequest(index,id); DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT); int status=deleteResponse.status().getStatus(); if(status != 200){ return false; } return true; }catch (Excption e){ logger.error(e.getMessage()); return false; } } public boolean deleteIndex(String index){ try{ DeleteRequest deleteRequest=new DeleteRequest(index); DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT); int status=deleteResponse.status().getStatus(); if(status != 200){ return false; } return true; }catch (Exception e){ logger.error(e.getMessage()); return false; } } /** * 如果没有索引,创建一个 * @param query */ public void createIndex(String indexName){ boolean indexExists = isExists(indexName); if(!indexExists){ try{ CreateIndexRequest indexRequest=new CreateIndexRequest(indexName); indexRequest.settings( Settings.builder().put("index.number_of_shards",1).put("index.number_of_replicas",1)); client.indices().create(indexRequest,RequestOptions.DEFAULT); }catch (IOException ioe){ logger.error(ioe.getMessage()); } } } /** * 按指定索引字段和主键字段批量提交 * * @param list 待提交数据 * @param isEnforceIndex 是否强制索引名次 * @param indexField 索引字段(索引名称) * @param KeyFiled ID字段 * @Name bulkPutIndex 批量导入 * @Author John * @Date 2021/1/21 12:00:00 * @Exception **/ public boolean bulkPutIndex(List<Map<String,Object>> list,Boolean isEnforceIndex,String indexField,String KeyFiled){ boolean flag=false; try { int size=list.size(); BulkRequest bulkRequest=new BulkRequest(); for(int i=0;i<size;i++){ Map<String,Object> map =list.get(i); IndexRequest indexRequest=new IndexRequest(); if(isEnforceIndex){ indexRequest=new IndexRequest(indexField); }else{ String indexName=map.get(indexField).toString(); if(indexName != null && !indexName.isEmpty()){ indexRequest=new IndexRequest(indexName); map.remove(indexField); } } if(!KeyFiled.isEmpty()){ String keyid=map.get(KeyFiled).toString(); indexRequest.id(keyid); } bulkRequest.add(indexRequest.source(map,XContentType.JSON)); } if(bulkRequest.numberOfActions() > 0){ BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT); if(!bulkResponse.hasFailures()){ flag=true; }else { logger.error("批量提交失败"); //放入缓存记录 //logger.info(list); } } }catch (IOException io){ logger.error(io.getMessage()); } return flag; } public boolean insertOne(JSONObject jsonObject,String indexName){ boolean flag=false; try{ BulkRequest bulkRequest=new BulkRequest(); bulkRequest.add(new IndexRequest(indexName).source(jsonObject.getInnerMap(),XContentType.JSON)); if(bulkRequest.numberOfActions()>0){ BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT); if(!bulkResponse.hasFailures()){ flag=true; }else { logger.error("提交失败:"+jsonObject.toJSONString()); } } }catch (IOException io){ logger.error(io.getMessage()); } return flag; } /** * 分页查询 * * @param page 页号 * @param size 每页大小 * @param indexName 索引名称 * @Name searchall * @Author John * @Date 2021/1/21 14:00:00 * @Return org.elasticsearch.action.search.SearchResponse * @Exception */ public JSONObject searchPageData(int page,int size,JSONObject filterObject,String[] fileds,String orderName,SortOrder sortOrder,String indexName) throws IOException{ //请求游标数据集合 JSONObject allObject; List<Map<String,Object>> allMap = new ArrayList<>(); long totalCount=0; if(isExists(indexName)){ // 请求最大数据量 Long maxNumber =Long.valueOf(page * size); Long minNumber =Long.valueOf((page-1) * size); Long minCount =minNumber % 10000; //游标一次性取10000条数据 SearchRequest request=new SearchRequest(indexName); SearchSouceBuilder builder=new SearchSourceBuilder(); //多条件查询处理 BoolQueryBuilder booleanQueryBuilder =GetBoolQueryBuilder(filterObject); builder.query(booleanQueryBuilder).sort(orderName,sortOrder).fetchSource(fileds,null).size(10000) .timeout(new TimeValue(60,TimeUnit.SECONDS)); request.source(builder.trackTotalHits(true)); //设置游标时间1分钟有效期 Scroll scroll=new Scroll(TimeValue.timeValueMinutes(1L)); request.scroll(scroll); SearchResponse searchResponse=client.search(request,RequestOptions.DEFAULT); String scrollId =searchResponse.getScrollId(); //获取最大记录数 totalCount=searchResponse.getHits().getTotalHits().value; //小分页在第几个游标数据中 if(totalCount > 0){ int pageNum =1; Long maxCount =0L; //变动 if(maxNumber > 10000){ pageNum=(int) Math.ceil(maxNumber / 10000); } if(maxNumber > totalCount){ maxCount = totalCount % 10000; //变动 if(maxCount == 0){ maxCount = 10000L; } } else { maxCount = maxNumber % 10000; //变动 if(maxCount == 0){ maxCount = 10000L; } } for(int i=1;i<=pageNum;i++){ // 取出游标数据 if(pageNum == i){ SearchHit[] hits=searchResponse.getHits().getHits(); int fromHit = minCount.intValue(); int toHit = maxCount.intValue(); //数组赋值 SearchHit[] pageHits=Arrays.copyOfRange(hits,fromHit,toHit); for(SearchHit hit : pageHits){ allMap.add(hit.getSourceAsMap()); } break; } SearchScrollRequest searchScrollRequest=new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT); scrollId=searchScrollResponse.getScrollId(); } //及时清理ES快照,释放资源 ClearScrollRequest clearScrollRequest=new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse=client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT); clearScrollResponse.isSucceeded(); } } allObject = pageData(totalCount,page,size,allMap); return allObject; } /** * 分页查询数据转换 * * @param total * @param page * @param size * @param mapList * @Name pageAuditData * @Author John * @Date 2021/1/21 15:00:00 * @Return com.alibaba.fastjson.JSONObject * @Exception */ private JSONObject pageData(long total,int page,List<Map<String,Object>> mapList){ JSONObject allObject = new JSONObject(); JSONObject pageObject = new JSONObject(); //分页数据组装 pageObject.put("page",page); pageObject.put("size",size); pageObject.put("total",total); //分页数据返回 allObject.put("data",mapList); allObject.put("total",pageObject); return allObject; } /** * 查询单条数据 * * @param key 查询字段(主键字段) * @param value 查询字段值(主键字段值) * @param fileds 显示字段 * @param indexName 索引名称 * @return JSONObject * @param IOException */ public JSONObject termOneSearch(String key,String value,String indexName)throws IOException{ Map<String,Object> preMap=termOne(key,value,fileds,indexName); JSONObject rootObject =new JSONObject(); rootObject.put("data",preMap); return rootObject; } /** * 单条查询数据转换 * * @param searchResponse * @Name searchAuditData * @Author * @Date 2021/1/22 * @Return com.alibab.fastjson.JSONObject * @Exception */ private Map<String,Object> searchOneData(SearchResponse searchResponse){ Map<String,Object> sourceAsMap = new HashMap<>(); //获得单条响应的文档 if(searchResponse.getHits().getTotalHits().value > 0){ SearchHit searchHit=searchResponse.getHits().getHits()[0]; sourceAsMap=searchHit.getSourceAsMap(); } return sourceAsMap; } /** * 获取索引列表 * * @param indexName 索引名称 */ public JSONObject getIndexList(String indexName) throws IOException{ JSONObject allObject = new JSONObject(); List<String> list = new ArrayList<String>(); if(isExists(indexName + "*")){ GetIndexRequest getIndexRequest = new GetIndexRequest(indexName + "*").includeDeFaults(true); GetIndexResponse indexResponse = client.indices().get(getIndexRequest,RequestOptions.DEFAULT); String[] indexs = indexResponse.getIndices(); //倒序处理 //Collections.reverse(Array.asList(indexs); list=Arrays.asList(indexs); } allObject.put("data",list); return allObject; } /** * 判断索引index是否存在 * * @param indexName 索引名称 */ public boolean isExists(String indexName){ boolean flag = false; try{ GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); flag = client.indices().exists(getIndexRequest,RequestOptions.DEFAULT); }catch (IOException io){ logger.error(io.getMessage()); } return flag; } /** * 构造ES Bool查询条件 * * @param filterObject */ private BoolQueryBuilder GetBoolQueryBuilder(JSONObject filterObject){ BoolQueryBuilder booleanQueryBuilder = new BoolQueryBuilder(); //循环加载多条件 for(Map.Entry<String,Object> entry : filterObject.entrySet()){ if(entry.getKey() != null){ booleanQueryBuilder.must(QueryBuilder.termsQuery(entry.getKey(),entry.getValue())); } } return booleanQueryBuilder; } /** * 单一主键精确查找 * * @param key 查询字段(主键字段) * @param value 查询字段值(主键字段值) * @param fileds 显示字段 * @param indexName 索引名称 */ private Map<String,Object> termOne(String key,String indexName) throws IOException{ Map<String,Object> jsonObject = new HashMap<>(); if(isExists(indexName)){ SearchRequest request =new SearchRequest(indexName); SearchSourceBuilder builder =new SearchSourceBuilder(); if(fileds.length > 0){ builder.query(QueryBuilders.termsQuery(key,value)).fetchSource(fileds,null).from(0).size(1); }else { builder.query(QueryBuilders.termsQuery(key,value)).from(0).size(1); } request.source(builder); SearchResponse searchResponse =client.search(request,RequestOptions.DEFAULT); jsonObject =searchOneData(searchResponse); } return jsonObject; } /** * 单一主键精确查找 * * @param id 字段名 * @param indexName 索引名称 */ public Map<String,Object> deleteOne(String id,String indexName) throws IOException{ Map<String,Object> jsonObject = new HashMap<>(); if(isExists(indexName)){ DeleteRequest request= new DeleteRequest(indexName); request.id(id); DeleteResponse searchResponse =client.delete(request,RequestOptions.DEFAULT); jsonObject.put("status",searchResponse.status().getStatus()); } return jsonObject; } /** * 构造ES客户端初始化参数 * * @param host ES主机 * @param port ES主机端口 */ private RestClientBuilder restClientBuilder(String host,int port){ RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host,port,"http")); Header[] defaultHeaders = new Header[]{new BasicHeader("Accept","*/*"),new BasicHeader("Charset","UTF-8")}; restClientBuilder.setDefaultHeaders(defaultHeaders); restClientBuilder.setFailureListener(new RestClient.FailureListener(){ //@Override //public void onFailure(Node node){ // "监听某个es节点失败" // } }); restClientBuilder.setRequestConfigCallback(builder -> builder.setConnectTimeout(50000).setSocketTimeout(1500000)); return restClientBuilder; } /** * 获取全部数据 * * @param filterObject * @param fileds * @param orderName * @param sortOrder * @param indexName */ public List<Map<String,Objtc>> getAllData(JSONObject filterObject,String indexName){ List<Map<String,Object>> datalist = new ArrayList<>(); //Set<Map<String,Object>> datalist=new HashSet<>(); //去重可以考虑使用Set集合 if(isExists(indexName)){ SearchRequest searchRequest = new SearchRequest(indexName); //其他参数过滤 SearchSourceBuilder builder=new SearchSourceBuilder(); BoolQueryBuilder booleanQueryBuilder = GetBoolQueryBuilder(filterObject); builder.query(booleanQueryBuilder).size(10000).fetchSource(fileds,null); searchRequest.source(builder.trackTotalHits(true)); Scroll scroll =new Scroll(TimeValue.timeValueMinutes(1L)); searchRequest.scroll(scroll); try { SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT); String scrollId =searchResponse.getScrollUd(); SearchHit[] hits=searchResponse.getHits().getHits(); while(hits != null && hits.length >0){ for(SearchHit hit : hits){ datalist.add(hit.getSourceAsMap()); } //每次游标数据 SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); SearchResponse searchScrollResponse =client.scroll(searchScrollRequest,RequestOptions.DEFAULT); scrollId=searchScrollResponse.getScrollId(); hits=searchScrollResponse.getHits().getHits(); } ClearScrollRequest clearScrollRequest=new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse=client.scroll(clearScrollRequest,RequestOptions.DEFAULT); clearScrollResponse.isSucceeded(); }catch (IOException ex){ logger.error("ES查询IO异常:"+ex.getMessage()); } }else { logger.error("ES indexName:"+indexName +"不存在!"); } return datalist; } }
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
二维码