1.引入依賴 2.配置信息: 3.es配置啟動類: 4.操作工具類: ...
1.引入依賴
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.0</version> </dependency>
2.配置信息:
/** * 讀取client配置信息 * @author * */ @Configuration @Getter @Setter public class ClientConfig { /** * elk集群地址 */ @Value("${elasticsearch.ip}") private String esHostName; /** * 埠 */ @Value("${elasticsearch.port}") private Integer esPort; /** * 集群名稱 */ @Value("${elasticsearch.cluster.name}") private String esClusterName; /** * 連接池 */ @Value("${elasticsearch.pool}") private Integer esPoolSize; /** * 是否服務啟動時重新創建索引 */ @Value("${elasticsearch.regenerateIndexEnabled}") private Boolean esRegenerateIndexFlag; /** * 是否服務啟動時索引數據同步 */ @Value("${elasticsearch.syncDataEnabled}") private Boolean esSyncDataEnabled; }
3.es配置啟動類:
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetAddress; /** * es配置啟動類 * @author * */ @Configuration public class ElasticsearchConfig { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class); @Autowired ClientConfig clientConfig; @Bean public TransportClient init() { LOGGER.info("初始化開始。。。。。"); TransportClient transportClient = null; try { /** * 配置信息 * client.transport.sniff 增加嗅探機制,找到ES集群 * thread_pool.search.size 增加線程池個數,暫時設為5 */ Settings esSetting = Settings.builder() .put("client.transport.sniff", true) .put("thread_pool.search.size", clientConfig.getEsPoolSize()) .build(); //配置信息Settings自定義 transportClient = new PreBuiltTransportClient(esSetting); TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(clientConfig.getEsHostName()), clientConfig.getEsPort()); transportClient.addTransportAddresses(transportAddress); } catch (Exception e) { LOGGER.error("elasticsearch TransportClient create error!!!", e); } return transportClient; } }
4.操作工具類:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.InputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; public class ElasticsearchUtils { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtils.class); @Autowired private TransportClient transportClient; private static TransportClient client; @PostConstruct public void init() { client = this.transportClient; } /** * 創建索引以及設置其內容 * @param index * @param indexType * @param filePath:json文件路徑 */ public static void createIndex(String index,String indexType,String filePath) throws RuntimeException { try { StringBuffer strBuf = new StringBuffer(); //解析json配置 ClassPathResource resource = new ClassPathResource(filePath); InputStream inputStream = resource.getInputStream(); int len = 0; byte[] buf = new byte[1024]; while((len=inputStream.read(buf)) != -1) { strBuf.append(new String(buf, 0, len, "utf-8")); } inputStream.close(); //創建索引 createIndex(index); //設置索引元素 putMapping(index, indexType, strBuf.toString()); }catch(Exception e){ throw new RuntimeException(e.getMessage()); } } /** * 創建索引 * * @param index 索引名稱 * @return */ public static boolean createIndex(String index){ try { if (isIndexExist(index)) { //索引庫存在則刪除索引 deleteIndex(index); } CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).setSettings(Settings.builder().put("index.number_of_shards", 5) .put("index.number_of_replicas", 1) ) .get(); LOGGER.info("創建索引 {} 執行狀態 {}", index , indexresponse.isAcknowledged()); return indexresponse.isAcknowledged(); }catch (Exception e) { throw new RuntimeException(e.getMessage()); } } /** * 創建索引 * * @param index 索引名稱 * @param indexType 索引類型 * @param mapping 創建的mapping結構 * @return */ public static boolean putMapping(String index,String indexType,String mapping) throws RuntimeException { if (!isIndexExist(index)) { throw new RuntimeException("創建索引庫"+index+"mapping"+mapping+"結構失敗,索引庫不存在!"); } try { PutMappingResponse indexresponse = client.admin().indices().preparePutMapping(index).setType(indexType).setSource(mapping, XContentType.JSON).get(); LOGGER.info("索引 {} 設置 mapping {} 執行狀態 {}", index ,indexType, indexresponse.isAcknowledged()); return indexresponse.isAcknowledged(); }catch (Exception e) { throw new RuntimeException(e.getMessage()); } } /** * 判斷索引是否存在 * * @param index * @return */ public static boolean isIndexExist(String index) { IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)) .actionGet(); return inExistsResponse.isExists(); } /** * 刪除索引 * * @param index * @return */ public static boolean deleteIndex(String index) throws RuntimeException{ if (!isIndexExist(index)) { return true; } try { DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet(); if (dResponse.isAcknowledged()) { LOGGER.info("delete index " + index + " successfully!"); } else { LOGGER.info("Fail to delete index " + index); } return dResponse.isAcknowledged(); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } /** * 數據添加 * * @param jsonObject * 要增加的數據 * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @return */ public static String addData(JSONObject jsonObject, String index, String type) { return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase()); } /** * 數據添加,正定ID * * @param jsonObject * 要增加的數據 * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @param id * 數據ID * @return */ public static String addData(JSONObject jsonObject, String index, String type, String id)throws RuntimeException { try { IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get(); LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId()); return response.getId(); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } /** * 批量數據添加, * * @param list * 要增加的數據 * @param pkName * 主鍵id * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @return */ public static <T> void addBatchData(List<T> list, String pkName, String index, String type) { if(list == null || list.isEmpty()) { return; } // 創建BulkPorcessor對象 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) { // TODO Auto-generated method stub } // 執行出錯時執行 @Override public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) { // TODO Auto-generated method stub } @Override public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) { // TODO Auto-generated method stub } }) // 1w次請求執行一次bulk .setBulkActions(1000) // 1gb的數據刷新一次bulk // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 併發請求數量, 0不併發, 1併發允許執行 .setConcurrentRequests(1) // 設置退避, 100ms後執行, 最大請求3次 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build(); for (T vo : list) { if(getPkValueByName(vo, pkName)!= null) { String id = getPkValueByName(vo, pkName).toString(); bulkProcessor.add(new IndexRequest(index, type, id).source(JSON.toJSONString(vo), XContentType.JSON)); } } bulkProcessor.close(); } /** * 根據主鍵名稱獲取實體類主鍵屬性值 * * @param clazz * @param pkName * @return */ private static Object getPkValueByName(Object clazz, String pkName) { try { String firstLetter = pkName.substring(0, 1).toUpperCase(); String getter = "get" + firstLetter + pkName.substring(1); Method method = clazz.getClass().getMethod(getter, new Class[] {}); Object value = method.invoke(clazz, new Object[] {}); return value; } catch (Exception e) { return null; } } /** * 通過ID 更新數據 * * @param jsonObject * 要增加的數據 * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @param id * 數據ID * @return */ public static void updateDataById(JSONObject jsonObject, String index, String type, String id) throws RuntimeException { try{ UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(index).type(type).id(id).doc(jsonObject); client.update(updateRequest); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } /** * 批量數據更新, * * @param list * 要增加的數據 * @param pkName * 主鍵id * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @return */ public static <T> void updateBatchData(List<T> list, String pkName, String index, String type) { // 創建BulkPorcessor對象 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) { // TODO Auto-generated method stub } // 執行出錯時執行 @Override public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) { // TODO Auto-generated method stub } @Override public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) { // TODO Auto-generated method stub } }) // 1w次請求執行一次bulk .setBulkActions(1000) // 1gb的數據刷新一次bulk // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 併發請求數量, 0不併發, 1併發允許執行 .setConcurrentRequests(1) // 設置退避, 100ms後執行, 最大請求3次 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build(); for (T vo : list) { String id = getPkValueByName(vo, pkName).toString(); bulkProcessor.add(new UpdateRequest(index, type, id).doc(JSON.toJSONString(vo), XContentType.JSON)); } bulkProcessor.close(); } /** * 通過ID獲取數據 * * @param index * 索引,類似資料庫 * @param type * 類型,類似表 * @param id * 數據ID * @param fields * 需要顯示的欄位,逗號分隔(預設為全部欄位) * @return */ public static Map<String, Object> searchDataById(String index, String type, String id, String fields) { GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id); if (StringUtils.isNotEmpty(fields)) { getRequestBuilder.setFetchSource(fields.split(","), null); } GetResponse getResponse = getRequestBuilder.execute().actionGet(); return getResponse.getSource(); } /** * 使用分詞查詢 * * @param index * 索引名稱 * @param type * 類型名稱,可傳入多個type逗號分隔 * @param clz * 數據對應實體類 * @param fields * 需要顯示的欄位,逗號分隔(預設為全部欄位) * @param boolQuery * 查詢條件 * @return */ public static <T> List<T> searchListData(String index, String type, Class<T> clz, String fields,BoolQueryBuilder boolQuery) { return searchListData(index, type, clz, 0, fields, null, null,boolQuery); } /** * 使用分詞查詢 * * @param index * 索引名稱 * @param type * 類型名稱,可傳入多個type逗號分隔 * @param clz * 數據對應實體類 * @param size * 文檔大小限制 * @param fields * 需要顯示的欄位,逗號分隔(預設為全部欄位) * @param sortField * 排序欄位 * @param highlightField * 高亮欄位 * @param boolQuery * 查詢條件 * @return */ public static <T> List<T> searchListData(String index, String type, Class<T> clz, Integer size, String fields, String sortField, String highlightField,BoolQueryBuilder boolQuery) throws RuntimeException{ SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type.split(",")); } // 高亮(xxx=111,aaa=222) if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); // 設置高亮欄位 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } searchRequestBuilder.setQuery(boolQuery); if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } searchRequestBuilder.setFetchSource(true); if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.DESC); } if (size != null && size > 0) { searchRequestBuilder.setSize(size); } searchRequestBuilder.setScroll(new TimeValue(1000)); searchRequestBuilder.setSize(10000); // 列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢 LOGGER.info("\n{}", searchRequestBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long totalHits = searchResponse.getHits().totalHits; if(LOGGER.isDebugEnabled()) { long length = searchResponse.getHits().getHits().length; LOGGER.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length); } if (searchResponse.status().getStatus() ==200) { // 解析對象 return setSearchResponse(clz, searchResponse, highlightField); } return null; } /** * 高亮結果集 特殊處理 * * @param clz * 數據對應實體類 * @param searchResponse * * @param highlightField * 高亮欄位 */ private static <T> List<T> setSearchResponse(Class<T> clz, SearchResponse searchResponse, String highlightField) { List<T> sourceList = new ArrayList<T>(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { searchHit.getSourceAsMap().put("id", searchHit.getId()); StringBuffer stringBuffer = new StringBuffer(); if (StringUtils.isNotEmpty(highlightField)) { // System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap()); HighlightField highlight = searchHit.getHighlightFields().get(highlightField); if(highlight == null) { continue; } Text[] text = highlight.getFragments(); if (text != null) { for (Text str : text) { stringBuffer.append(str.string()); } // 遍歷 高亮結果集,覆蓋 正常結果集 searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString()); } } T t = JSON.parseObject(JSON.toJSONString(searchHit.getSourceAsMap()), clz); sourceList.add(t); } return sourceList; } }