ElasticSearch原生接口

最后更新:2024-09-27 05:30:00 | 状态:未完成 | 相关数据库: ElasticSearch-Elasticsearch

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
/*
PUT index_user/_bulk
{"index":{"_index":"index_user", "_id":"10011"}}
{"id":1001, "name":"a b", "age":20}
{"index":{"_index":"index_user", "_id":"10012"}}
{"id":1002, "name":"b c", "age":20}
{"index":{"_index":"index_user", "_id":"10013"}}
{"id":1003, "name":"c d", "age":30}*/


public boolean inserts(String table, Collection list){
	boolean result = false;
	String pk = "_id";
	String method = "PUT";
	String endpoint = table+"/_bulk";
	String body = null;
	StringBuilder builder = new StringBuilder();
	for(Object entity:list){
		Object _id = BeanUtil.getFieldValue(entity, pk);
		if (null == _id) {
			pk = "id";
			_id = BeanUtil.getFieldValue(entity, pk);
		}
		builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n");
		builder.append(BeanUtil.object2json(entity)).append("\n");
	}
	Request request = new Request(
		method,
		endpoint);
	body = BeanUtil.object2json(builder.toString());
	request.setJsonEntity(body);
	HttpResponse response = exe(request);
	if(response.getStatus() == 200 ||  response.getStatus() == 201){
		result = true;
	}
	return result;
}

/**
 *PUT index_user/_bulk
 * {"index":{"_index":"index_user", "_id":"10011"}}
 * {"id":1001, "name":"a b", "age":20}
 * {"index":{"_index":"index_user", "_id":"10012"}}
 * {"id":1002, "name":"b c", "age":20}
 * {"index":{"_index":"index_user", "_id":"10013"}}
 * {"id":1003, "name":"c d", "age":30}
 * @param table
 * @param set
 * @return
 */
public boolean insert(String table, DataSet set){
	boolean result = false;
	String method = "PUT";
	String endpoint = "*/_bulk";
	String body = null;
	StringBuilder builder = new StringBuilder();
	for(DataRow row:set){
		String pk = "_id";
		Object _id = BeanUtil.getFieldValue(row, pk);
		if (null == _id) {
			pk = "id";
			_id = BeanUtil.getFieldValue(row, pk);
		}
		row.remove("_id");
		builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n");
		builder.append(row.toJSON().replace(">","&gt;").replace("<","&lt;")).append("\n");
	}
	Request request = new Request(
		method,
		endpoint);
	body = builder.toString();
	request.setJsonEntity(body);
	HttpResponse response = exe(request);
	return result;
}
public boolean insert(String table, DataRow entity){
	boolean result = false;
	String pk = "_id";
	String method = "POST";
	String endpoint = null;
	String body = null;
	//一般需要设置用于索引的主键 如法规id = l100 问答id = q100
	Object _id = BeanUtil.getFieldValue(entity, pk);
	if (null == _id) {
		pk = "id";
		_id = BeanUtil.getFieldValue(entity, pk);
	}
	endpoint = table + "/_doc/";
	if (BasicUtil.isNotEmpty(_id)) {
		method = "PUT";
		endpoint += _id;
	}
	entity.remove("_id");
	Request request = new Request(
		method,
		endpoint);
	body = BeanUtil.object2json(entity).replace(">","&gt;").replace("<","&lt;");
	request.setJsonEntity(body);
	HttpResponse response = exe(request);
	if(BasicUtil.isEmpty(_id)){
		DataRow row = DataRow.parse(response.getText());
		_id = row.getString(pk);
		if(BasicUtil.isNotEmpty(_id)){
			BeanUtil.setFieldValue(entity, pk, _id);
		}
	}
	return result;
}

private HttpResponse exe(Request request){
	HttpResponse result = new HttpResponse();
	try {
		Response response = ((RestClient)RuntimeHolder.runtime().getProcessor()).performRequest(request);
		int status = response.getStatusLine().getStatusCode();
		result.setStatus(status);
		//{"_index":"index_user","_id":"102","_version":3,"result":"updated","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":9,"_primary_term":1}
		String content = FileUtil.read(response.getEntity().getContent()).toString();
		result.setText(content);
		log.warn("[status:{}]", status);
	}catch (Exception e){
		e.printStackTrace();
	}
	return result;
}
public DataSet search(String table, DataRow body){
	DataSet set = null;
	String method = "POST";
	String endpoint = table+"/_search";
	Request request = new Request(
		method,
		endpoint);
	String json = body.toLowerKey(true).toJSON();
	log.warn("[search][body:{}]", body);
	request.setJsonEntity(json);
	HttpResponse response = exe(request);
	if(response.getStatus() == 200) {
		String txt = response.getText();
		DataRow row = DataRow.parseJson(txt);
		Object total = row.get("hits", "total", "value");
		PageNavi navi = new DefaultPageNavi();
		navi.setTotalRow(BasicUtil.parseInt(total,0));
		navi.setPageRows(body.getInt("size", 10));
		set = new DataSet();
		set.setNavi(navi);
		DataSet hits = row.getRow("hits").getSet("hits");
		for(DataRow hit:hits){
			DataRow item = hit.getRow("_source");
			item.put("_id", hit.get("_id"));
			DataRow highlight = hit.getRow("highlight");
			if(null != highlight){
				for(String key:highlight.keySet()){
					List vals = highlight.getList(key);
					if(null != vals && vals.size()>0){
						item.put(key, vals.get(0));
					}
				}
			}
			set.add(item);
		}

	}
	return set;
}
/*GET _analyze
{
"analyzer": "ik_max_word",
"text": ["马铃薯真好吃"]
}
*/
public LinkedHashMap<String,DataRow> analyze(String key){
	return analyze(key, null);
}
public LinkedHashMap<String,DataRow> analyze(String key, String mode){
	LinkedHashMap<String,DataRow> maps = new LinkedHashMap<>();
	DataRow body = new DataRow(KeyAdapter.KEY_CASE.SRC);
	if(BasicUtil.isEmpty(mode)){
		mode = "ik_smart";
	}
	body.put("analyzer", mode);
	body.put("text", new String[]{key});

	Request request = new Request(
		"GET",
		"_analyze");
	request.setJsonEntity(BeanUtil.object2json(body));
	HttpResponse response = exe(request);
	if(response.getStatus() == 200) {
		DataRow row = DataRow.parseJson(response.getText());
		DataSet tokens = row.getSet("tokens");
		for(DataRow token:tokens){
			String k = token.getString("token");
			if(k.length() > 1){
				maps.put(k, token);
			}
		}
	}
	return maps;
}
首页 最近更新 搜索 提交 回复