ElasticSearch原生接口
最后更新:2024-09-27 05:30:00
|
状态:未完成
|
相关数据库:
ElasticSearch-Elasticsearch
import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; /* PUT index_user/_bulk {"index":{"_index":"index_user", "_id":"10011"}} {"id":1001, "name":"a b", "age":20} {"index":{"_index":"index_user", "_id":"10012"}} {"id":1002, "name":"b c", "age":20} {"index":{"_index":"index_user", "_id":"10013"}} {"id":1003, "name":"c d", "age":30}*/ public boolean inserts(String table, Collection list){ boolean result = false; String pk = "_id"; String method = "PUT"; String endpoint = table+"/_bulk"; String body = null; StringBuilder builder = new StringBuilder(); for(Object entity:list){ Object _id = BeanUtil.getFieldValue(entity, pk); if (null == _id) { pk = "id"; _id = BeanUtil.getFieldValue(entity, pk); } builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n"); builder.append(BeanUtil.object2json(entity)).append("\n"); } Request request = new Request( method, endpoint); body = BeanUtil.object2json(builder.toString()); request.setJsonEntity(body); HttpResponse response = exe(request); if(response.getStatus() == 200 || response.getStatus() == 201){ result = true; } return result; } /** *PUT index_user/_bulk * {"index":{"_index":"index_user", "_id":"10011"}} * {"id":1001, "name":"a b", "age":20} * {"index":{"_index":"index_user", "_id":"10012"}} * {"id":1002, "name":"b c", "age":20} * {"index":{"_index":"index_user", "_id":"10013"}} * {"id":1003, "name":"c d", "age":30} * @param table * @param set * @return */ public boolean insert(String table, DataSet set){ boolean result = false; String method = "PUT"; String endpoint = "*/_bulk"; String body = null; StringBuilder builder = new StringBuilder(); for(DataRow row:set){ String pk = "_id"; Object _id = BeanUtil.getFieldValue(row, pk); if (null == _id) { pk = "id"; _id = BeanUtil.getFieldValue(row, pk); } row.remove("_id"); builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n"); builder.append(row.toJSON().replace(">",">").replace("<","<")).append("\n"); } Request request = new Request( method, endpoint); body = builder.toString(); request.setJsonEntity(body); HttpResponse response = exe(request); return result; } public boolean insert(String table, DataRow entity){ boolean result = false; String pk = "_id"; String method = "POST"; String endpoint = null; String body = null; //一般需要设置用于索引的主键 如法规id = l100 问答id = q100 Object _id = BeanUtil.getFieldValue(entity, pk); if (null == _id) { pk = "id"; _id = BeanUtil.getFieldValue(entity, pk); } endpoint = table + "/_doc/"; if (BasicUtil.isNotEmpty(_id)) { method = "PUT"; endpoint += _id; } entity.remove("_id"); Request request = new Request( method, endpoint); body = BeanUtil.object2json(entity).replace(">",">").replace("<","<"); request.setJsonEntity(body); HttpResponse response = exe(request); if(BasicUtil.isEmpty(_id)){ DataRow row = DataRow.parse(response.getText()); _id = row.getString(pk); if(BasicUtil.isNotEmpty(_id)){ BeanUtil.setFieldValue(entity, pk, _id); } } return result; } private HttpResponse exe(Request request){ HttpResponse result = new HttpResponse(); try { Response response = ((RestClient)RuntimeHolder.runtime().getProcessor()).performRequest(request); int status = response.getStatusLine().getStatusCode(); result.setStatus(status); //{"_index":"index_user","_id":"102","_version":3,"result":"updated","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":9,"_primary_term":1} String content = FileUtil.read(response.getEntity().getContent()).toString(); result.setText(content); log.warn("[status:{}]", status); }catch (Exception e){ e.printStackTrace(); } return result; } public DataSet search(String table, DataRow body){ DataSet set = null; String method = "POST"; String endpoint = table+"/_search"; Request request = new Request( method, endpoint); String json = body.toLowerKey(true).toJSON(); log.warn("[search][body:{}]", body); request.setJsonEntity(json); HttpResponse response = exe(request); if(response.getStatus() == 200) { String txt = response.getText(); DataRow row = DataRow.parseJson(txt); Object total = row.get("hits", "total", "value"); PageNavi navi = new DefaultPageNavi(); navi.setTotalRow(BasicUtil.parseInt(total,0)); navi.setPageRows(body.getInt("size", 10)); set = new DataSet(); set.setNavi(navi); DataSet hits = row.getRow("hits").getSet("hits"); for(DataRow hit:hits){ DataRow item = hit.getRow("_source"); item.put("_id", hit.get("_id")); DataRow highlight = hit.getRow("highlight"); if(null != highlight){ for(String key:highlight.keySet()){ List vals = highlight.getList(key); if(null != vals && vals.size()>0){ item.put(key, vals.get(0)); } } } set.add(item); } } return set; } /*GET _analyze { "analyzer": "ik_max_word", "text": ["马铃薯真好吃"] } */ public LinkedHashMap<String,DataRow> analyze(String key){ return analyze(key, null); } public LinkedHashMap<String,DataRow> analyze(String key, String mode){ LinkedHashMap<String,DataRow> maps = new LinkedHashMap<>(); DataRow body = new DataRow(KeyAdapter.KEY_CASE.SRC); if(BasicUtil.isEmpty(mode)){ mode = "ik_smart"; } body.put("analyzer", mode); body.put("text", new String[]{key}); Request request = new Request( "GET", "_analyze"); request.setJsonEntity(BeanUtil.object2json(body)); HttpResponse response = exe(request); if(response.getStatus() == 200) { DataRow row = DataRow.parseJson(response.getText()); DataSet tokens = row.getSet("tokens"); for(DataRow token:tokens){ String k = token.getString("token"); if(k.length() > 1){ maps.put(k, token); } } } return maps; }