Java realizes the addition, deletion, modification and query code of ES database
•
Java
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexReqest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.sortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class ESService {
private Logger logger=LoggerFactory.getLogger(this.getClass());
private RestHighLevelClient client;
public ESService(String host,int port,int connectTimeOut,int socketTimeOut){
RestClientBuilder restClientBuilder=restClientBuilder(host,port);
restClientBuilder.setRequestConfigCallback(
builder -> builder.setConnectTimeOut(connectTimeOut).setSocketTimeOut(socketTimeOut));
client = new RestHighLevelClient(restClientBuilder);
}
public boolean deleteDoc(String index,String id){
try{
DeleteRequest deleteRequest=new DeleteRequest(index,id);
DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT);
int status=deleteResponse.status().getStatus();
if(status != 200){
return false;
}
return true;
}catch (Excption e){
logger.error(e.getMessage());
return false;
}
}
public boolean deleteIndex(String index){
try{
DeleteRequest deleteRequest=new DeleteRequest(index);
DeleteRequest deleteResponse=client.delete(deleteRequest,RequestOptions.DEFAULT);
int status=deleteResponse.status().getStatus();
if(status != 200){
return false;
}
return true;
}catch (Exception e){
logger.error(e.getMessage());
return false;
}
}
/**
* 如果没有索引,创建一个
* @param query
*/
public void createIndex(String indexName){
boolean indexExists = isExists(indexName);
if(!indexExists){
try{
CreateIndexRequest indexRequest=new CreateIndexRequest(indexName);
indexRequest.settings(
Settings.builder().put("index.number_of_shards",1).put("index.number_of_replicas",1));
client.indices().create(indexRequest,RequestOptions.DEFAULT);
}catch (IOException ioe){
logger.error(ioe.getMessage());
}
}
}
/**
* 按指定索引字段和主键字段批量提交
*
* @param list 待提交数据
* @param isEnforceIndex 是否强制索引名次
* @param indexField 索引字段(索引名称)
* @param KeyFiled ID字段
* @Name bulkPutIndex 批量导入
* @Author John
* @Date 2021/1/21 12:00:00
* @Exception
**/
public boolean bulkPutIndex(List<Map<String,Object>> list,Boolean isEnforceIndex,String indexField,String KeyFiled){
boolean flag=false;
try {
int size=list.size();
BulkRequest bulkRequest=new BulkRequest();
for(int i=0;i<size;i++){
Map<String,Object> map =list.get(i);
IndexRequest indexRequest=new IndexRequest();
if(isEnforceIndex){
indexRequest=new IndexRequest(indexField);
}else{
String indexName=map.get(indexField).toString();
if(indexName != null && !indexName.isEmpty()){
indexRequest=new IndexRequest(indexName);
map.remove(indexField);
}
}
if(!KeyFiled.isEmpty()){
String keyid=map.get(KeyFiled).toString();
indexRequest.id(keyid);
}
bulkRequest.add(indexRequest.source(map,XContentType.JSON));
}
if(bulkRequest.numberOfActions() > 0){
BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT);
if(!bulkResponse.hasFailures()){
flag=true;
}else {
logger.error("批量提交失败");
//放入缓存记录
//logger.info(list);
}
}
}catch (IOException io){
logger.error(io.getMessage());
}
return flag;
}
public boolean insertOne(JSONObject jsonObject,String indexName){
boolean flag=false;
try{
BulkRequest bulkRequest=new BulkRequest();
bulkRequest.add(new IndexRequest(indexName).source(jsonObject.getInnerMap(),XContentType.JSON));
if(bulkRequest.numberOfActions()>0){
BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT);
if(!bulkResponse.hasFailures()){
flag=true;
}else {
logger.error("提交失败:"+jsonObject.toJSONString());
}
}
}catch (IOException io){
logger.error(io.getMessage());
}
return flag;
}
/**
* 分页查询
*
* @param page 页号
* @param size 每页大小
* @param indexName 索引名称
* @Name searchall
* @Author John
* @Date 2021/1/21 14:00:00
* @Return org.elasticsearch.action.search.SearchResponse
* @Exception
*/
public JSONObject searchPageData(int page,int size,JSONObject filterObject,String[] fileds,String orderName,SortOrder sortOrder,String indexName) throws IOException{
//请求游标数据集合
JSONObject allObject;
List<Map<String,Object>> allMap = new ArrayList<>();
long totalCount=0;
if(isExists(indexName)){
// 请求最大数据量
Long maxNumber =Long.valueOf(page * size);
Long minNumber =Long.valueOf((page-1) * size);
Long minCount =minNumber % 10000;
//游标一次性取10000条数据
SearchRequest request=new SearchRequest(indexName);
SearchSouceBuilder builder=new SearchSourceBuilder();
//多条件查询处理
BoolQueryBuilder booleanQueryBuilder =GetBoolQueryBuilder(filterObject);
builder.query(booleanQueryBuilder).sort(orderName,sortOrder).fetchSource(fileds,null).size(10000)
.timeout(new TimeValue(60,TimeUnit.SECONDS));
request.source(builder.trackTotalHits(true));
//设置游标时间1分钟有效期
Scroll scroll=new Scroll(TimeValue.timeValueMinutes(1L));
request.scroll(scroll);
SearchResponse searchResponse=client.search(request,RequestOptions.DEFAULT);
String scrollId =searchResponse.getScrollId();
//获取最大记录数
totalCount=searchResponse.getHits().getTotalHits().value;
//小分页在第几个游标数据中
if(totalCount > 0){
int pageNum =1;
Long maxCount =0L; //变动
if(maxNumber > 10000){
pageNum=(int) Math.ceil(maxNumber / 10000);
}
if(maxNumber > totalCount){
maxCount = totalCount % 10000; //变动
if(maxCount == 0){
maxCount = 10000L;
}
} else {
maxCount = maxNumber % 10000; //变动
if(maxCount == 0){
maxCount = 10000L;
}
}
for(int i=1;i<=pageNum;i++){
// 取出游标数据
if(pageNum == i){
SearchHit[] hits=searchResponse.getHits().getHits();
int fromHit = minCount.intValue();
int toHit = maxCount.intValue();
//数组赋值
SearchHit[] pageHits=Arrays.copyOfRange(hits,fromHit,toHit);
for(SearchHit hit : pageHits){
allMap.add(hit.getSourceAsMap());
}
break;
}
SearchScrollRequest searchScrollRequest=new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);
SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);
scrollId=searchScrollResponse.getScrollId();
}
//及时清理ES快照,释放资源
ClearScrollRequest clearScrollRequest=new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse=client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
clearScrollResponse.isSucceeded();
}
}
allObject = pageData(totalCount,page,size,allMap);
return allObject;
}
/**
* 分页查询数据转换
*
* @param total
* @param page
* @param size
* @param mapList
* @Name pageAuditData
* @Author John
* @Date 2021/1/21 15:00:00
* @Return com.alibaba.fastjson.JSONObject
* @Exception
*/
private JSONObject pageData(long total,int page,List<Map<String,Object>> mapList){
JSONObject allObject = new JSONObject();
JSONObject pageObject = new JSONObject();
//分页数据组装
pageObject.put("page",page);
pageObject.put("size",size);
pageObject.put("total",total);
//分页数据返回
allObject.put("data",mapList);
allObject.put("total",pageObject);
return allObject;
}
/**
* 查询单条数据
*
* @param key 查询字段(主键字段)
* @param value 查询字段值(主键字段值)
* @param fileds 显示字段
* @param indexName 索引名称
* @return JSONObject
* @param IOException
*/
public JSONObject termOneSearch(String key,String value,String indexName)throws IOException{
Map<String,Object> preMap=termOne(key,value,fileds,indexName);
JSONObject rootObject =new JSONObject();
rootObject.put("data",preMap);
return rootObject;
}
/**
* 单条查询数据转换
*
* @param searchResponse
* @Name searchAuditData
* @Author
* @Date 2021/1/22
* @Return com.alibab.fastjson.JSONObject
* @Exception
*/
private Map<String,Object> searchOneData(SearchResponse searchResponse){
Map<String,Object> sourceAsMap = new HashMap<>();
//获得单条响应的文档
if(searchResponse.getHits().getTotalHits().value > 0){
SearchHit searchHit=searchResponse.getHits().getHits()[0];
sourceAsMap=searchHit.getSourceAsMap();
}
return sourceAsMap;
}
/**
* 获取索引列表
*
* @param indexName 索引名称
*/
public JSONObject getIndexList(String indexName) throws IOException{
JSONObject allObject = new JSONObject();
List<String> list = new ArrayList<String>();
if(isExists(indexName + "*")){
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName + "*").includeDeFaults(true);
GetIndexResponse indexResponse = client.indices().get(getIndexRequest,RequestOptions.DEFAULT);
String[] indexs = indexResponse.getIndices();
//倒序处理
//Collections.reverse(Array.asList(indexs);
list=Arrays.asList(indexs);
}
allObject.put("data",list);
return allObject;
}
/**
* 判断索引index是否存在
*
* @param indexName 索引名称
*/
public boolean isExists(String indexName){
boolean flag = false;
try{
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
flag = client.indices().exists(getIndexRequest,RequestOptions.DEFAULT);
}catch (IOException io){
logger.error(io.getMessage());
}
return flag;
}
/**
* 构造ES Bool查询条件
*
* @param filterObject
*/
private BoolQueryBuilder GetBoolQueryBuilder(JSONObject filterObject){
BoolQueryBuilder booleanQueryBuilder = new BoolQueryBuilder();
//循环加载多条件
for(Map.Entry<String,Object> entry : filterObject.entrySet()){
if(entry.getKey() != null){
booleanQueryBuilder.must(QueryBuilder.termsQuery(entry.getKey(),entry.getValue()));
}
}
return booleanQueryBuilder;
}
/**
* 单一主键精确查找
*
* @param key 查询字段(主键字段)
* @param value 查询字段值(主键字段值)
* @param fileds 显示字段
* @param indexName 索引名称
*/
private Map<String,Object> termOne(String key,String indexName) throws IOException{
Map<String,Object> jsonObject = new HashMap<>();
if(isExists(indexName)){
SearchRequest request =new SearchRequest(indexName);
SearchSourceBuilder builder =new SearchSourceBuilder();
if(fileds.length > 0){
builder.query(QueryBuilders.termsQuery(key,value)).fetchSource(fileds,null).from(0).size(1);
}else {
builder.query(QueryBuilders.termsQuery(key,value)).from(0).size(1);
}
request.source(builder);
SearchResponse searchResponse =client.search(request,RequestOptions.DEFAULT);
jsonObject =searchOneData(searchResponse);
}
return jsonObject;
}
/**
* 单一主键精确查找
*
* @param id 字段名
* @param indexName 索引名称
*/
public Map<String,Object> deleteOne(String id,String indexName) throws IOException{
Map<String,Object> jsonObject = new HashMap<>();
if(isExists(indexName)){
DeleteRequest request= new DeleteRequest(indexName);
request.id(id);
DeleteResponse searchResponse =client.delete(request,RequestOptions.DEFAULT);
jsonObject.put("status",searchResponse.status().getStatus());
}
return jsonObject;
}
/**
* 构造ES客户端初始化参数
*
* @param host ES主机
* @param port ES主机端口
*/
private RestClientBuilder restClientBuilder(String host,int port){
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host,port,"http"));
Header[] defaultHeaders = new Header[]{new BasicHeader("Accept","*/*"),new BasicHeader("Charset","UTF-8")};
restClientBuilder.setDefaultHeaders(defaultHeaders);
restClientBuilder.setFailureListener(new RestClient.FailureListener(){
//@Override
//public void onFailure(Node node){
// "监听某个es节点失败"
// }
});
restClientBuilder.setRequestConfigCallback(builder -> builder.setConnectTimeout(50000).setSocketTimeout(1500000));
return restClientBuilder;
}
/**
* 获取全部数据
*
* @param filterObject
* @param fileds
* @param orderName
* @param sortOrder
* @param indexName
*/
public List<Map<String,Objtc>> getAllData(JSONObject filterObject,String indexName){
List<Map<String,Object>> datalist = new ArrayList<>();
//Set<Map<String,Object>> datalist=new HashSet<>(); //去重可以考虑使用Set集合
if(isExists(indexName)){
SearchRequest searchRequest = new SearchRequest(indexName);
//其他参数过滤
SearchSourceBuilder builder=new SearchSourceBuilder();
BoolQueryBuilder booleanQueryBuilder = GetBoolQueryBuilder(filterObject);
builder.query(booleanQueryBuilder).size(10000).fetchSource(fileds,null);
searchRequest.source(builder.trackTotalHits(true));
Scroll scroll =new Scroll(TimeValue.timeValueMinutes(1L));
searchRequest.scroll(scroll);
try {
SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT);
String scrollId =searchResponse.getScrollUd();
SearchHit[] hits=searchResponse.getHits().getHits();
while(hits != null && hits.length >0){
for(SearchHit hit : hits){
datalist.add(hit.getSourceAsMap());
}
//每次游标数据
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);
SearchResponse searchScrollResponse =client.scroll(searchScrollRequest,RequestOptions.DEFAULT);
scrollId=searchScrollResponse.getScrollId();
hits=searchScrollResponse.getHits().getHits();
}
ClearScrollRequest clearScrollRequest=new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse=client.scroll(clearScrollRequest,RequestOptions.DEFAULT);
clearScrollResponse.isSucceeded();
}catch (IOException ex){
logger.error("ES查询IO异常:"+ex.getMessage());
}
}else {
logger.error("ES indexName:"+indexName +"不存在!");
}
return datalist;
}
}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
二维码
