千家信息网

如何使用Quarkus在Elasticsearch进行响应式

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,本篇内容主要讲解"如何使用Quarkus在Elasticsearch进行响应式",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用Quarkus在Ela
千家信息网最后更新 2024年11月20日如何使用Quarkus在Elasticsearch进行响应式

本篇内容主要讲解"如何使用Quarkus在Elasticsearch进行响应式",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用Quarkus在Elasticsearch进行响应式"吧!

创建 Quarkus 项目

mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \    -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \    -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \    -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \    -Dpath="/fruits" \    -Dextensions="resteasy-jackson,elasticsearch-rest-client"

Maven 设置

如您所见,Quarkus 中存在一个elasticsearch -rest-client ;然而,这是一个 Elasticsearch Java 低级 REST 客户端。如果我们想使用 Elasticsearch Java High Level REST Client,我们只需要将它作为依赖添加到pom.xml文件中:

    org.elasticsearch.client    elasticsearch-rest-high-level-client   7.4.0

请确保 Elasticsearch Java Low Level REST Client 的版本与 Elasticsearch Java High Level REST Client匹配。

由于我们以响应式方式使用 Elasticsearch,因此我更喜欢使用 Project Reactor。我们必须在依赖管理部分添加 BOM:

    io.projectreactor    reactor-bom    Dysprosium-SR2    pom    import

我们还必须添加 reactor-core 作为依赖项:

    io.projectreactor  reactor-core

我已经在一个库中分离了公共代码,所以我们应该将这个库添加到我们的示例项目中。为此,我们将使用Jitpack。这是一项很棒的服务。你只需要为 你的 Github 项目指出 正确的方法,它就会为它构建一个工件。这是我使用它的方式:

    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-core    elasticsearch-example.02    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-elasticsearch    elasticsearch-example.02    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-rest    elasticsearch-example.02

通过 Docker 启动 Elasticsearch

此外,我们应该启动 Elastisearch。最简单的方法是通过 Docker 运行它:

docker run -it --rm=true --name elasticsearch_quarkus_test \    -p 11027:9200 -p 11028:9300 \    -e "discovery.type=single-node" \    docker.elastic.co/elasticsearch/elasticsearch:7.4.0

连接到 Elasticsearch

让我们从将我们的服务连接到 Elasticsearch 开始--示例项目中的实现很简单--因此它将侦听 Quarkus 启动和关闭事件并初始化或终止连接:

package org.otaibe.quarkus.elasticsearch.example.service;import io.quarkus.runtime.ShutdownEvent;import io.quarkus.runtime.StartupEvent;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;import javax.enterprise.context.ApplicationScoped;import javax.enterprise.event.Observes;@ApplicationScoped@Getter@Setter@Slf4jpublic class ElasticsearchService extends AbstractElasticsearchService {    public void init(@Observes StartupEvent event) {        log.info("init started");        super.init();        log.info("init completed");    }    public void shutdown(@Observes ShutdownEvent event) {        log.info("shutdown started");        super.shutdown();        log.info("shutdown completed");    }}

连接到 Elasticsearch 的实际工作是在AbstractElasticsearchService 中完成的:

public abstract class AbstractElasticsearchService {    @ConfigProperty(name = "service.elastic-search.hosts")    String[] hosts;    @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")    Optional numThreads;    private RestHighLevelClient restClient;    private Sniffer sniffer;    @PostConstruct    public void init() {        log.info("init started");        List httpHosts = Arrays.stream(hosts)                .map(s -> StringUtils.split(s, ':'))                .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))                .collect(Collectors.toList());        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));        getNumThreads().ifPresent(integer ->                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(                        IOReactorConfig                                .custom()                                .setIoThreadCount(integer)                                .build())                ));        restClient = new RestHighLevelClient(builder);        sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();        log.info("init completed");    }}

如您所见,此处的连接是按照Elasticsearch 文档 中建议的方式完成的。我的实现取决于两个配置属性:

属性文件:

service.elastic-search.hosts=localhost:11027

这是从 Docker 启动后的 Elasticsearch 连接字符串。第二个可选属性是:属性文件

service.elastic-search.num-threads

这是客户端所需的线程数。

创建 POJO

现在,让我们创建域对象(Fruit):

package org.otaibe.quarkus.elasticsearch.example.domain;import com.fasterxml.jackson.annotation.JsonProperty;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructor(staticName = "of")public class Fruit {    public static final String ID = "id";    public static final String EXT_REF_ID = "ext_ref_id";    public static final String NAME = "name";    public static final String DESCRIPTION = "description";    public static final String VERSION = "version";    @JsonProperty(ID)    public String id;    @JsonProperty(EXT_REF_ID)    public String extRefId;    @JsonProperty(NAME)    public String name;    @JsonProperty(DESCRIPTION)    public String description;    @JsonProperty(VERSION)    public Long version;}

创建和实现 DAO

创建索引

让我们创建 FruitDaoImpl。它是一个高级类,用于填充 AbstractElasticsearchReactiveDaoImplementation 并实现所需的业务逻辑。这里的另一个重要部分是为 Fruit 类创建索引:

@Overrideprotected Mono createIndex() {    CreateIndexRequest request = new CreateIndexRequest(getTableName());    Map mapping = new HashMap();    Map propsMapping = new HashMap<>();    propsMapping.put(Fruit.ID, getKeywordTextAnalizer());    propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());    propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));    propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));    propsMapping.put(Fruit.VERSION, getLongFieldType());    mapping.put(PROPERTIES, propsMapping);    request.mapping(mapping);    return createIndex(request);}

对 Elasticsearch 的真正创建索引调用是在父类 ( AbstractElasticsearchReactiveDaoImplementation ) 中实现的:

protected Mono createIndex(CreateIndexRequest request) {    return Flux.create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener() {        @Override        public void onResponse(CreateIndexResponse createIndexResponse) {            log.info("CreateIndexResponse: {}", createIndexResponse);            fluxSink.next(createIndexResponse.isAcknowledged());            fluxSink.complete();        }        @Override        public void onFailure(Exception e) {            log.error("unable to create index", e);            fluxSink.error(new RuntimeException(e));        }    }))            .next();}

玩转 DAO

大多数 CRUD 操作在AbstractElasticsearchReactiveDaoImplementation中实现 。

它有 save、 update、 findById和 deleteById 公共方法。它也有findByExactMatch和 findByMatch保护方法。FindBy*当需要填充业务逻辑时,这些 方法在后代类中非常有用。

业务查找方法在FruitDaoImpl 类中实现 :

public Flux findByExternalRefId(String value) {    return findByMatch(Fruit.EXT_REF_ID, value);}public Flux findByName(String value) {    return findByMatch(Fruit.NAME, value);}public Flux findByDescription(String value) {    return findByMatch(Fruit.NAME, value);}public Flux findByNameOrDescription(String value) {    Map query = new HashMap<>();    query.put(Fruit.NAME, value);    query.put(Fruit.DESCRIPTION, value);    return findByMatch(query);}

在Service类中封装 DAO

FruitDaoImpl 封装在 FruitService 中:

@ApplicationScoped@Getter@Setter@Slf4jpublic class FruitService {    @Inject    FruitDaoImpl dao;    public Mono save(Fruit entity) {        return getDao().save(entity);    }    public Mono findById(Fruit entity) {        return getDao().findById(entity);    }    public Mono findById(String id) {        return Mono.just(Fruit.of(id, null, null, null, null))                .flatMap(entity -> findById(entity));    }    public Flux findByExternalRefId(String value) {        return getDao().findByExternalRefId(value);    }    public Flux findByName(String value) {        return getDao().findByName(value);    }    public Flux findByDescription(String value) {        return getDao().findByDescription(value);    }    public Flux findByNameOrDescription(String value) {        return getDao().findByNameOrDescription(value);    }    public Mono delete(Fruit entity) {        return Mono.just(entity.getId())                .filter(s -> StringUtils.isNotBlank(s))                .flatMap(s -> getDao().deleteById(entity))                .defaultIfEmpty(false);    }}

测试 FruitService

该 FruitServiceTests 写入,以测试基本功能。它还用于确保 Fruit 类字段被正确索引并且全文搜索按预期工作:

@Testpublic void manageFruitTest() {    Fruit apple = getTestUtils().createApple();    Fruit apple1 = getFruitService().save(apple).block();    Assertions.assertNotNull(apple1.getId());    Assertions.assertTrue(apple1.getVersion() > 0);    log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));    List fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();    Assertions.assertTrue(fruitList.size() > 0);    List fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();    Assertions.assertTrue(fruitList1.size() > 0);    //Ensure that the full text search is working - it is 'Apples' in description    List fruitList2 = getFruitService().findByDescription("apple").collectList().block();    Assertions.assertTrue(fruitList2.size() > 0);    //Ensure that the full text search is working - it is 'Apple' in name    List fruitList3 = getFruitService().findByName("apples").collectList().block();    Assertions.assertTrue(fruitList3.size() > 0);    Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();    Assertions.assertTrue(deleteAppleResult);}

添加 REST 端点

因为这是一个示例项目,完整的 CRUD 功能不会作为 REST 端点添加。只有save和 findById被添加为 REST 端点。它们被添加到 FruitResource 中。那里的方法返回 CompletionStage,这确保我们的应用程序中不会有阻塞的线程。

测试 REST 端点

添加FruitResourceTest以测试 RESTendpoints:

package org.otaibe.quarkus.elasticsearch.example.web.controller;import io.quarkus.test.junit.QuarkusTest;import lombok.AccessLevel;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.eclipse.microprofile.config.inject.ConfigProperty;import org.junit.jupiter.api.Assertions;import org.junit.jupiter.api.Test;import org.otaibe.commons.quarkus.core.utils.JsonUtils;import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;import org.otaibe.quarkus.elasticsearch.example.service.FruitService;import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;import javax.inject.Inject;import javax.ws.rs.core.HttpHeaders;import javax.ws.rs.core.MediaType;import javax.ws.rs.core.Response;import javax.ws.rs.core.UriBuilder;import java.net.URI;import java.util.Optional;import static io.restassured.RestAssured.given;@QuarkusTest@Getter(value = AccessLevel.PROTECTED)@Slf4jpublic class FruitResourceTest {    @ConfigProperty(name = "service.http.host")    Optional host;    @Inject    TestUtils testUtils;    @Inject    JsonUtils jsonUtils;    @Inject    FruitService service;    @Test    public void restEndpointsTest() {        log.info("restEndpointsTest start");        Fruit apple = getTestUtils().createApple();        Fruit savedApple = given()                .when()                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)                .body(apple)                .post(getUri(FruitResource.ROOT_PATH))                .then()                .statusCode(200)                .extract()                .as(Fruit.class);        String id = savedApple.getId();        Assertions.assertTrue(StringUtils.isNotBlank(id));        URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)                .path(id)                .build();        Fruit foundApple = given()                .when().get(getUri(findByIdPath.getPath()).getPath())                .then()                .statusCode(200)                .extract()                .as(Fruit.class);        Assertions.assertEquals(savedApple, foundApple);        Boolean deleteResult = getService().delete(foundApple).block();        Assertions.assertTrue(deleteResult);        given()                .when().get(findByIdPath.getPath())                .then()                .statusCode(Response.Status.NOT_FOUND.getStatusCode()) ;        log.info("restEndpointsTest end");    }    private URI getUri(String path) {        return getUriBuilder(path)                .build();    }    private UriBuilder getUriBuilder(String path) {        return getHost()                .map(uri -> UriBuilder.fromUri(uri))                .map(uriBuilder -> uriBuilder.path(path))                .orElse(UriBuilder                        .fromPath(path)                );    }}

构建本地可执行文件

在构建本机可执行文件之前,我们必须注册我们的 Fruit 域对象。这样做的原因是我们的 FruitResource 返回 CompletionStage,因此,应用程序的实际返回类型是未知的,因此我们必须显式注册它以进行反射。在 Quarkus 中至少有两种方法可以做到这一点:

  1. 通过 @RegisterForReflection 注释。

  2. 通过 反射-config.json。

我个人更喜欢第二种方法,因为您要注册的类可能在第三方库中,并且不可能将 @RegisterForReflection 放在 那里。

现在, reflection-config.json 看起来像这样:

[  {    "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",    "allDeclaredConstructors" : true,    "allPublicConstructors" : true,    "allDeclaredMethods" : true,    "allPublicMethods" : true,    "allDeclaredFields" : true,    "allPublicFields" : true  }]

下一步是让 Quarkus 知道 reflection-config.json 文件。您应该将此行添加到pom.xml文件中的 native配置文件中:

-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json

您现在可以构建您的本机应用程序:

mvn clean package -Pnative

并启动它:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

该服务将在http://localhost:11025上可用,因为这是application.properties 中明确指定的端口。

quarkus.http.port=11025

测试本机构建

该 FruitResourceTest 预计以下可选属性:

属性文件:

service.http.host

如果存在,测试请求将命中指定的主机。如果您启动本机可执行文件:

shell:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

并使用以下代码执行测试/构建:

shell:

mvn package -D %test.service.http .host = http://localhost:11025

测试将针对本机构建运行。

到此,相信大家对"如何使用Quarkus在Elasticsearch进行响应式"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0