如何使用Quarkus在Elasticsearch进行响应式
本篇内容主要讲解"如何使用Quarkus在Elasticsearch进行响应式",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用Quarkus在Elasticsearch进行响应式"吧!
创建 Quarkus 项目
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \ -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \ -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \ -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \ -Dpath="/fruits" \ -Dextensions="resteasy-jackson,elasticsearch-rest-client"
Maven 设置
如您所见,Quarkus 中存在一个elasticsearch -rest-client ;然而,这是一个 Elasticsearch Java 低级 REST 客户端。如果我们想使用 Elasticsearch Java High Level REST Client,我们只需要将它作为依赖添加到pom.xml文件中:
org.elasticsearch.client elasticsearch-rest-high-level-client 7.4.0
请确保 Elasticsearch Java Low Level REST Client 的版本与 Elasticsearch Java High Level REST Client匹配。
由于我们以响应式方式使用 Elasticsearch,因此我更喜欢使用 Project Reactor。我们必须在依赖管理部分添加 BOM:
io.projectreactor reactor-bom Dysprosium-SR2 pom import
我们还必须添加 reactor-core 作为依赖项:
io.projectreactor reactor-core
我已经在一个库中分离了公共代码,所以我们应该将这个库添加到我们的示例项目中。为此,我们将使用Jitpack。这是一项很棒的服务。你只需要为 你的 Github 项目指出 正确的方法,它就会为它构建一个工件。这是我使用它的方式:
com.github.tpenakov.otaibe-commons-quarkus otaibe-commons-quarkus-core elasticsearch-example.02 com.github.tpenakov.otaibe-commons-quarkus otaibe-commons-quarkus-elasticsearch elasticsearch-example.02 com.github.tpenakov.otaibe-commons-quarkus otaibe-commons-quarkus-rest elasticsearch-example.02
通过 Docker 启动 Elasticsearch
此外,我们应该启动 Elastisearch。最简单的方法是通过 Docker 运行它:
docker run -it --rm=true --name elasticsearch_quarkus_test \ -p 11027:9200 -p 11028:9300 \ -e "discovery.type=single-node" \ docker.elastic.co/elasticsearch/elasticsearch:7.4.0
连接到 Elasticsearch
让我们从将我们的服务连接到 Elasticsearch 开始--示例项目中的实现很简单--因此它将侦听 Quarkus 启动和关闭事件并初始化或终止连接:
package org.otaibe.quarkus.elasticsearch.example.service;import io.quarkus.runtime.ShutdownEvent;import io.quarkus.runtime.StartupEvent;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;import javax.enterprise.context.ApplicationScoped;import javax.enterprise.event.Observes;@ApplicationScoped@Getter@Setter@Slf4jpublic class ElasticsearchService extends AbstractElasticsearchService { public void init(@Observes StartupEvent event) { log.info("init started"); super.init(); log.info("init completed"); } public void shutdown(@Observes ShutdownEvent event) { log.info("shutdown started"); super.shutdown(); log.info("shutdown completed"); }}
连接到 Elasticsearch 的实际工作是在AbstractElasticsearchService 中完成的:
public abstract class AbstractElasticsearchService { @ConfigProperty(name = "service.elastic-search.hosts") String[] hosts; @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10") Optional numThreads; private RestHighLevelClient restClient; private Sniffer sniffer; @PostConstruct public void init() { log.info("init started"); List httpHosts = Arrays.stream(hosts) .map(s -> StringUtils.split(s, ':')) .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1]))) .collect(Collectors.toList()); RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])); getNumThreads().ifPresent(integer -> builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig( IOReactorConfig .custom() .setIoThreadCount(integer) .build()) )); restClient = new RestHighLevelClient(builder); sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build(); log.info("init completed"); }}
如您所见,此处的连接是按照Elasticsearch 文档 中建议的方式完成的。我的实现取决于两个配置属性:
属性文件:
service.elastic-search.hosts=localhost:11027
这是从 Docker 启动后的 Elasticsearch 连接字符串。第二个可选属性是:属性文件
service.elastic-search.num-threads
这是客户端所需的线程数。
创建 POJO
现在,让我们创建域对象(Fruit):
package org.otaibe.quarkus.elasticsearch.example.domain;import com.fasterxml.jackson.annotation.JsonProperty;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructor(staticName = "of")public class Fruit { public static final String ID = "id"; public static final String EXT_REF_ID = "ext_ref_id"; public static final String NAME = "name"; public static final String DESCRIPTION = "description"; public static final String VERSION = "version"; @JsonProperty(ID) public String id; @JsonProperty(EXT_REF_ID) public String extRefId; @JsonProperty(NAME) public String name; @JsonProperty(DESCRIPTION) public String description; @JsonProperty(VERSION) public Long version;}
创建和实现 DAO
创建索引
让我们创建 FruitDaoImpl。它是一个高级类,用于填充 AbstractElasticsearchReactiveDaoImplementation 并实现所需的业务逻辑。这里的另一个重要部分是为 Fruit 类创建索引:
@Overrideprotected Mono createIndex() { CreateIndexRequest request = new CreateIndexRequest(getTableName()); Map mapping = new HashMap(); Map propsMapping = new HashMap<>(); propsMapping.put(Fruit.ID, getKeywordTextAnalizer()); propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer()); propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH)); propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH)); propsMapping.put(Fruit.VERSION, getLongFieldType()); mapping.put(PROPERTIES, propsMapping); request.mapping(mapping); return createIndex(request);}
对 Elasticsearch 的真正创建索引调用是在父类 ( AbstractElasticsearchReactiveDaoImplementation ) 中实现的:
protected Mono createIndex(CreateIndexRequest request) { return Flux.create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { log.info("CreateIndexResponse: {}", createIndexResponse); fluxSink.next(createIndexResponse.isAcknowledged()); fluxSink.complete(); } @Override public void onFailure(Exception e) { log.error("unable to create index", e); fluxSink.error(new RuntimeException(e)); } })) .next();}
玩转 DAO
大多数 CRUD 操作在AbstractElasticsearchReactiveDaoImplementation中实现 。
它有 save、 update、 findById和 deleteById 公共方法。它也有findByExactMatch和 findByMatch保护方法。FindBy*当需要填充业务逻辑时,这些 方法在后代类中非常有用。
业务查找方法在FruitDaoImpl 类中实现 :
public Flux findByExternalRefId(String value) { return findByMatch(Fruit.EXT_REF_ID, value);}public Flux findByName(String value) { return findByMatch(Fruit.NAME, value);}public Flux findByDescription(String value) { return findByMatch(Fruit.NAME, value);}public Flux findByNameOrDescription(String value) { Map query = new HashMap<>(); query.put(Fruit.NAME, value); query.put(Fruit.DESCRIPTION, value); return findByMatch(query);}
在Service类中封装 DAO
FruitDaoImpl 封装在 FruitService 中:
@ApplicationScoped@Getter@Setter@Slf4jpublic class FruitService { @Inject FruitDaoImpl dao; public Mono save(Fruit entity) { return getDao().save(entity); } public Mono findById(Fruit entity) { return getDao().findById(entity); } public Mono findById(String id) { return Mono.just(Fruit.of(id, null, null, null, null)) .flatMap(entity -> findById(entity)); } public Flux findByExternalRefId(String value) { return getDao().findByExternalRefId(value); } public Flux findByName(String value) { return getDao().findByName(value); } public Flux findByDescription(String value) { return getDao().findByDescription(value); } public Flux findByNameOrDescription(String value) { return getDao().findByNameOrDescription(value); } public Mono delete(Fruit entity) { return Mono.just(entity.getId()) .filter(s -> StringUtils.isNotBlank(s)) .flatMap(s -> getDao().deleteById(entity)) .defaultIfEmpty(false); }}
测试 FruitService
该 FruitServiceTests 写入,以测试基本功能。它还用于确保 Fruit 类字段被正确索引并且全文搜索按预期工作:
@Testpublic void manageFruitTest() { Fruit apple = getTestUtils().createApple(); Fruit apple1 = getFruitService().save(apple).block(); Assertions.assertNotNull(apple1.getId()); Assertions.assertTrue(apple1.getVersion() > 0); log.info("saved result: {}", getJsonUtils().toStringLazy(apple1)); List fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block(); Assertions.assertTrue(fruitList.size() > 0); List fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block(); Assertions.assertTrue(fruitList1.size() > 0); //Ensure that the full text search is working - it is 'Apples' in description List fruitList2 = getFruitService().findByDescription("apple").collectList().block(); Assertions.assertTrue(fruitList2.size() > 0); //Ensure that the full text search is working - it is 'Apple' in name List fruitList3 = getFruitService().findByName("apples").collectList().block(); Assertions.assertTrue(fruitList3.size() > 0); Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block(); Assertions.assertTrue(deleteAppleResult);}
添加 REST 端点
因为这是一个示例项目,完整的 CRUD 功能不会作为 REST 端点添加。只有save和 findById被添加为 REST 端点。它们被添加到 FruitResource 中。那里的方法返回 CompletionStage
测试 REST 端点
添加FruitResourceTest以测试 RESTendpoints:
package org.otaibe.quarkus.elasticsearch.example.web.controller;import io.quarkus.test.junit.QuarkusTest;import lombok.AccessLevel;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.eclipse.microprofile.config.inject.ConfigProperty;import org.junit.jupiter.api.Assertions;import org.junit.jupiter.api.Test;import org.otaibe.commons.quarkus.core.utils.JsonUtils;import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;import org.otaibe.quarkus.elasticsearch.example.service.FruitService;import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;import javax.inject.Inject;import javax.ws.rs.core.HttpHeaders;import javax.ws.rs.core.MediaType;import javax.ws.rs.core.Response;import javax.ws.rs.core.UriBuilder;import java.net.URI;import java.util.Optional;import static io.restassured.RestAssured.given;@QuarkusTest@Getter(value = AccessLevel.PROTECTED)@Slf4jpublic class FruitResourceTest { @ConfigProperty(name = "service.http.host") Optional host; @Inject TestUtils testUtils; @Inject JsonUtils jsonUtils; @Inject FruitService service; @Test public void restEndpointsTest() { log.info("restEndpointsTest start"); Fruit apple = getTestUtils().createApple(); Fruit savedApple = given() .when() .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) .body(apple) .post(getUri(FruitResource.ROOT_PATH)) .then() .statusCode(200) .extract() .as(Fruit.class); String id = savedApple.getId(); Assertions.assertTrue(StringUtils.isNotBlank(id)); URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH) .path(id) .build(); Fruit foundApple = given() .when().get(getUri(findByIdPath.getPath()).getPath()) .then() .statusCode(200) .extract() .as(Fruit.class); Assertions.assertEquals(savedApple, foundApple); Boolean deleteResult = getService().delete(foundApple).block(); Assertions.assertTrue(deleteResult); given() .when().get(findByIdPath.getPath()) .then() .statusCode(Response.Status.NOT_FOUND.getStatusCode()) ; log.info("restEndpointsTest end"); } private URI getUri(String path) { return getUriBuilder(path) .build(); } private UriBuilder getUriBuilder(String path) { return getHost() .map(uri -> UriBuilder.fromUri(uri)) .map(uriBuilder -> uriBuilder.path(path)) .orElse(UriBuilder .fromPath(path) ); }}
构建本地可执行文件
在构建本机可执行文件之前,我们必须注册我们的 Fruit 域对象。这样做的原因是我们的 FruitResource 返回 CompletionStage
通过 @RegisterForReflection 注释。
通过 反射-config.json。
我个人更喜欢第二种方法,因为您要注册的类可能在第三方库中,并且不可能将 @RegisterForReflection 放在 那里。
现在, reflection-config.json 看起来像这样:
[ { "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit", "allDeclaredConstructors" : true, "allPublicConstructors" : true, "allDeclaredMethods" : true, "allPublicMethods" : true, "allDeclaredFields" : true, "allPublicFields" : true }]
下一步是让 Quarkus 知道 reflection-config.json 文件。您应该将此行添加到pom.xml文件中的 native
配置文件中:
-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json
您现在可以构建您的本机应用程序:
mvn clean package -Pnative
并启动它:
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
该服务将在http://localhost:11025上可用,因为这是application.properties 中明确指定的端口。
quarkus.http.port=11025
测试本机构建
该 FruitResourceTest 预计以下可选属性:
属性文件:
service.http.host
如果存在,测试请求将命中指定的主机。如果您启动本机可执行文件:
shell:
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
并使用以下代码执行测试/构建:
shell:
mvn package -D %test.service.http .host = http://localhost:11025
测试将针对本机构建运行。
到此,相信大家对"如何使用Quarkus在Elasticsearch进行响应式"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!