Java realizes the addition, deletion, modification and query code of ES database

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexReqest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.sortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class ESService {
    private Logger logger=LoggerFactory.getLogger(this.getClass());

    private RestHighLevelClient client;

    public ESService(String host,int port,int connectTimeOut,int socketTimeOut){
        RestClientBuilder restClientBuilder=restClientBuilder(host,port);
        restClientBuilder.setRequestConfigCallback(
                builder -> builder.setConnectTimeOut(connectTimeOut).setSocketTimeOut(socketTimeOut));
        client = new RestHighLevelClient(restClientBuilder);
    }

    public boolean deleteDoc(String index,String id){
        try{
            DeleteRequest deleteRequest=new DeleteRequest(index,id);
            DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT);

            int status=deleteResponse.status().getStatus();

            if(status != 200){
                return false;
            }
            return true;
        }catch (Excption e){
            logger.error(e.getMessage());
            return false;
        }
    }

    public boolean deleteIndex(String index){
        try{
            DeleteRequest deleteRequest=new DeleteRequest(index);
            DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT);

            int status=deleteResponse.status().getStatus();

            if(status != 200){
                return false;
            }
            return true;
        }catch (Exception e){
            logger.error(e.getMessage());
            return false;
        }
    }

    /**
     * 如果没有索引,创建一个
     * @param query
     */
    public void createIndex(String indexName){
        boolean indexExists = isExists(indexName);
        if(!indexExists){
            try{
                CreateIndexRequest indexRequest=new CreateIndexRequest(indexName);
                indexRequest.settings(
                        Settings.builder().put("index.number_of_shards",1).put("index.number_of_replicas",1));
                client.indices().create(indexRequest,RequestOptions.DEFAULT);
            }catch (IOException ioe){
                logger.error(ioe.getMessage());
            }
        }
    }

    /**
     * 按指定索引字段和主键字段批量提交
     *
     * @param list              待提交数据
     * @param isEnforceIndex    是否强制索引名次
     * @param indexField        索引字段(索引名称)
     * @param KeyFiled          ID字段
     * @Name bulkPutIndex 批量导入
     * @Author John
     * @Date 2021/1/21 12:00:00
     * @Exception
     **/
    public boolean bulkPutIndex(List<Map<String,Object>> list,Boolean isEnforceIndex,String indexField,String KeyFiled){
        boolean flag=false;
        try {
            int size=list.size();
            BulkRequest bulkRequest=new BulkRequest();
            for(int i=0;i<size;i++){
                Map<String,Object> map =list.get(i);
                IndexRequest indexRequest=new IndexRequest();

                if(isEnforceIndex){
                    indexRequest=new IndexRequest(indexField);
                }else{
                    String indexName=map.get(indexField).toString();
                    if(indexName != null && !indexName.isEmpty()){
                        indexRequest=new IndexRequest(indexName);
                        map.remove(indexField);
                    }
                }

                if(!KeyFiled.isEmpty()){
                    String keyid=map.get(KeyFiled).toString();
                    indexRequest.id(keyid);
                }
                bulkRequest.add(indexRequest.source(map,XContentType.JSON));
            }

            if(bulkRequest.numberOfActions() > 0){
                BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT);
                if(!bulkResponse.hasFailures()){
                    flag=true;
                }else {
                    logger.error("批量提交失败");
                    //放入缓存记录
                    //logger.info(list);
                }
            }
        }catch (IOException io){
            logger.error(io.getMessage());
        }
        return flag;

    }

    public boolean insertOne(JSONObject jsonObject,String indexName){
        boolean flag=false;
        try{
            BulkRequest bulkRequest=new BulkRequest();
            bulkRequest.add(new IndexRequest(indexName).source(jsonObject.getInnerMap(),XContentType.JSON));
            if(bulkRequest.numberOfActions()>0){

                BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT);
                if(!bulkResponse.hasFailures()){
                    flag=true;
                }else {
                    logger.error("提交失败:"+jsonObject.toJSONString());
                }
            }
        }catch (IOException io){
            logger.error(io.getMessage());
        }
        return  flag;
    }

    /**
     * 分页查询
     *
     * @param page      页号
     * @param size      每页大小
     * @param indexName 索引名称
     * @Name searchall
     * @Author John
     * @Date 2021/1/21 14:00:00
     * @Return org.elasticsearch.action.search.SearchResponse
     * @Exception
     */
    public JSONObject searchPageData(int page,int size,JSONObject filterObject,String[] fileds,String orderName,SortOrder sortOrder,String indexName) throws IOException{

        //请求游标数据集合
        JSONObject allObject;
        List<Map<String,Object>> allMap = new ArrayList<>();
        long totalCount=0;

        if(isExists(indexName)){
            // 请求最大数据量
            Long maxNumber =Long.valueOf(page * size);
            Long minNumber =Long.valueOf((page-1) * size);
            Long minCount =minNumber % 10000;

            //游标一次性取10000条数据
            SearchRequest request=new SearchRequest(indexName);
            SearchSouceBuilder builder=new SearchSourceBuilder();

            //多条件查询处理
            BoolQueryBuilder booleanQueryBuilder =GetBoolQueryBuilder(filterObject);
            builder.query(booleanQueryBuilder).sort(orderName,sortOrder).fetchSource(fileds,null).size(10000)
                    .timeout(new TimeValue(60,TimeUnit.SECONDS));

            request.source(builder.trackTotalHits(true));

            //设置游标时间1分钟有效期
            Scroll scroll=new Scroll(TimeValue.timeValueMinutes(1L));
            request.scroll(scroll);
            SearchResponse searchResponse=client.search(request,RequestOptions.DEFAULT);
            String scrollId =searchResponse.getScrollId();

            //获取最大记录数
            totalCount=searchResponse.getHits().getTotalHits().value;

            //小分页在第几个游标数据中
            if(totalCount > 0){
                int pageNum =1;
                Long maxCount =0L;  //变动
                if(maxNumber > 10000){
                    pageNum=(int) Math.ceil(maxNumber / 10000);
                }

                if(maxNumber > totalCount){
                    maxCount = totalCount % 10000; //变动
                    if(maxCount == 0){
                        maxCount = 10000L;
                    }
                } else {
                        maxCount = maxNumber % 10000; //变动
                        if(maxCount == 0){
                            maxCount = 10000L;
                        }
                }
                for(int i=1;i<=pageNum;i++){
                    // 取出游标数据
                    if(pageNum == i){
                        SearchHit[] hits=searchResponse.getHits().getHits();
                        int fromHit = minCount.intValue();
                        int toHit = maxCount.intValue();
                        //数组赋值
                        SearchHit[] pageHits=Arrays.copyOfRange(hits,fromHit,toHit);

                        for(SearchHit hit : pageHits){
                            allMap.add(hit.getSourceAsMap());
                        }
                        break;
                    }
                    SearchScrollRequest searchScrollRequest=new SearchScrollRequest(scrollId);
                    searchScrollRequest.scroll(scroll);
                    SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);
                    scrollId=searchScrollResponse.getScrollId();
                }
                //及时清理ES快照,释放资源
                ClearScrollRequest clearScrollRequest=new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse=client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
                clearScrollResponse.isSucceeded();
            }
        }
        allObject = pageData(totalCount,page,size,allMap);
        return allObject;
    }

    /**
     * 分页查询数据转换
     *
     * @param total
     * @param page
     * @param size
     * @param mapList
     * @Name pageAuditData
     * @Author John
     * @Date 2021/1/21 15:00:00
     * @Return com.alibaba.fastjson.JSONObject
     * @Exception
     */
    private JSONObject pageData(long total,int page,List<Map<String,Object>> mapList){
        JSONObject allObject = new JSONObject();
        JSONObject pageObject = new JSONObject();

        //分页数据组装
        pageObject.put("page",page);
        pageObject.put("size",size);
        pageObject.put("total",total);

        //分页数据返回
        allObject.put("data",mapList);
        allObject.put("total",pageObject);
        return  allObject;
    }

    /**
     * 查询单条数据
     *
     * @param key           查询字段(主键字段)
     * @param value         查询字段值(主键字段值)
     * @param fileds        显示字段
     * @param indexName     索引名称
     * @return JSONObject
     * @param IOException
     */
    public JSONObject termOneSearch(String key,String value,String indexName)throws IOException{
        Map<String,Object> preMap=termOne(key,value,fileds,indexName);
        JSONObject rootObject =new JSONObject();
        rootObject.put("data",preMap);

        return rootObject;
    }

    /**
     * 单条查询数据转换
     *
     * @param searchResponse
     * @Name searchAuditData
     * @Author
     * @Date 2021/1/22
     * @Return com.alibab.fastjson.JSONObject
     * @Exception
     */
    private Map<String,Object> searchOneData(SearchResponse searchResponse){
        Map<String,Object> sourceAsMap = new HashMap<>();
        //获得单条响应的文档
        if(searchResponse.getHits().getTotalHits().value > 0){
            SearchHit searchHit=searchResponse.getHits().getHits()[0];
            sourceAsMap=searchHit.getSourceAsMap();
        }
        return sourceAsMap;
    }

    /**
     * 获取索引列表
     *
     * @param indexName 索引名称
     */
    public JSONObject getIndexList(String indexName) throws IOException{
        JSONObject allObject = new JSONObject();
        List<String> list = new ArrayList<String>();
        if(isExists(indexName + "*")){
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName + "*").includeDeFaults(true);
            GetIndexResponse indexResponse = client.indices().get(getIndexRequest,RequestOptions.DEFAULT);
            String[] indexs = indexResponse.getIndices();
            //倒序处理
            //Collections.reverse(Array.asList(indexs);
            list=Arrays.asList(indexs);
        }

        allObject.put("data",list);
        return allObject;
    }

    /**
     * 判断索引index是否存在
     *
     * @param indexName 索引名称
     */
    public boolean isExists(String indexName){
        boolean flag = false;
        try{
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            flag = client.indices().exists(getIndexRequest,RequestOptions.DEFAULT);
        }catch (IOException io){
            logger.error(io.getMessage());
        }
        return flag;
    }

    /**
     * 构造ES Bool查询条件
     *
     * @param filterObject
     */
    private BoolQueryBuilder GetBoolQueryBuilder(JSONObject filterObject){
        BoolQueryBuilder booleanQueryBuilder = new BoolQueryBuilder();

        //循环加载多条件
        for(Map.Entry<String,Object> entry : filterObject.entrySet()){
            if(entry.getKey() != null){
                booleanQueryBuilder.must(QueryBuilder.termsQuery(entry.getKey(),entry.getValue()));
            }
        }
        return booleanQueryBuilder;
    }

    /**
     * 单一主键精确查找
     *
     * @param key       查询字段(主键字段)
     * @param value     查询字段值(主键字段值)
     * @param fileds    显示字段
     * @param indexName 索引名称
     */
    private Map<String,Object> termOne(String key,String indexName) throws IOException{
        Map<String,Object> jsonObject = new HashMap<>();
        if(isExists(indexName)){
            SearchRequest request =new SearchRequest(indexName);
            SearchSourceBuilder builder =new SearchSourceBuilder();
            if(fileds.length > 0){
                builder.query(QueryBuilders.termsQuery(key,value)).fetchSource(fileds,null).from(0).size(1);
            }else {
                builder.query(QueryBuilders.termsQuery(key,value)).from(0).size(1);
            }

            request.source(builder);
            SearchResponse searchResponse =client.search(request,RequestOptions.DEFAULT);
            jsonObject =searchOneData(searchResponse);
        }
        return jsonObject;
    }

    /**
     * 单一主键精确查找
     *
     * @param id        字段名
     * @param indexName 索引名称
     */
    public Map<String,Object> deleteOne(String id,String indexName) throws IOException{

        Map<String,Object> jsonObject = new HashMap<>();
        if(isExists(indexName)){
            DeleteRequest request= new DeleteRequest(indexName);
            request.id(id);
            DeleteResponse searchResponse =client.delete(request,RequestOptions.DEFAULT);
            jsonObject.put("status",searchResponse.status().getStatus());
        }
        return jsonObject;
    }

    /**
     * 构造ES客户端初始化参数
     *
     * @param host  ES主机
     * @param port  ES主机端口
     */
    private RestClientBuilder restClientBuilder(String host,int port){
        RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host,port,"http"));

        Header[] defaultHeaders = new Header[]{new BasicHeader("Accept","*/*"),new BasicHeader("Charset","UTF-8")};
        restClientBuilder.setDefaultHeaders(defaultHeaders);
        restClientBuilder.setFailureListener(new RestClient.FailureListener(){
            //@Override
            //public void onFailure(Node node){
            // "监听某个es节点失败"
            // }
        });
        restClientBuilder.setRequestConfigCallback(builder -> builder.setConnectTimeout(50000).setSocketTimeout(1500000));
        return restClientBuilder;
    }

    /**
     * 获取全部数据
     *
     * @param filterObject
     * @param fileds
     * @param orderName
     * @param sortOrder
     * @param indexName
     */
    public List<Map<String,Objtc>> getAllData(JSONObject filterObject,String indexName){
        List<Map<String,Object>> datalist = new ArrayList<>();
        //Set<Map<String,Object>> datalist=new HashSet<>();  //去重可以考虑使用Set集合
        if(isExists(indexName)){
            SearchRequest searchRequest = new SearchRequest(indexName);
            //其他参数过滤
            SearchSourceBuilder builder=new SearchSourceBuilder();
            BoolQueryBuilder booleanQueryBuilder = GetBoolQueryBuilder(filterObject);
            builder.query(booleanQueryBuilder).size(10000).fetchSource(fileds,null);
            searchRequest.source(builder.trackTotalHits(true));
            Scroll scroll =new Scroll(TimeValue.timeValueMinutes(1L));
            searchRequest.scroll(scroll);
            try {
                SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT);
                String scrollId =searchResponse.getScrollUd();
                SearchHit[] hits=searchResponse.getHits().getHits();
                while(hits != null && hits.length >0){
                    for(SearchHit hit : hits){
                        datalist.add(hit.getSourceAsMap());
                    }

                    //每次游标数据
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                    searchScrollRequest.scroll(scroll);
                    SearchResponse searchScrollResponse =client.scroll(searchScrollRequest,RequestOptions.DEFAULT);
                    scrollId=searchScrollResponse.getScrollId();
                    hits=searchScrollResponse.getHits().getHits();
                }

                ClearScrollRequest clearScrollRequest=new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse=client.scroll(clearScrollRequest,RequestOptions.DEFAULT);
                clearScrollResponse.isSucceeded();
            }catch (IOException ex){
                logger.error("ES查询IO异常:"+ex.getMessage());
            }
        }else {
            logger.error("ES indexName:"+indexName +"不存在!");
        }
        return datalist;
    }

}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>