千家信息网

如何使用Quarkus在Elasticsearch进行响应式

发表于:2025-01-21 作者:千家信息网编辑
千家信息网最后更新 2025年01月21日,本篇内容主要讲解"如何使用Quarkus在Elasticsearch进行响应式",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用Quarkus在Ela
千家信息网最后更新 2025年01月21日如何使用Quarkus在Elasticsearch进行响应式

本篇内容主要讲解"如何使用Quarkus在Elasticsearch进行响应式",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何使用Quarkus在Elasticsearch进行响应式"吧!

创建 Quarkus 项目

mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \    -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \    -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \    -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \    -Dpath="/fruits" \    -Dextensions="resteasy-jackson,elasticsearch-rest-client"

Maven 设置

如您所见,Quarkus 中存在一个elasticsearch -rest-client ;然而,这是一个 Elasticsearch Java 低级 REST 客户端。如果我们想使用 Elasticsearch Java High Level REST Client,我们只需要将它作为依赖添加到pom.xml文件中:

    org.elasticsearch.client    elasticsearch-rest-high-level-client   7.4.0

请确保 Elasticsearch Java Low Level REST Client 的版本与 Elasticsearch Java High Level REST Client匹配。

由于我们以响应式方式使用 Elasticsearch,因此我更喜欢使用 Project Reactor。我们必须在依赖管理部分添加 BOM:

    io.projectreactor    reactor-bom    Dysprosium-SR2    pom    import

我们还必须添加 reactor-core 作为依赖项:

    io.projectreactor  reactor-core

我已经在一个库中分离了公共代码,所以我们应该将这个库添加到我们的示例项目中。为此,我们将使用Jitpack。这是一项很棒的服务。你只需要为 你的 Github 项目指出 正确的方法,它就会为它构建一个工件。这是我使用它的方式:

    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-core    elasticsearch-example.02    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-elasticsearch    elasticsearch-example.02    com.github.tpenakov.otaibe-commons-quarkus    otaibe-commons-quarkus-rest    elasticsearch-example.02

通过 Docker 启动 Elasticsearch

此外,我们应该启动 Elastisearch。最简单的方法是通过 Docker 运行它:

docker run -it --rm=true --name elasticsearch_quarkus_test \    -p 11027:9200 -p 11028:9300 \    -e "discovery.type=single-node" \    docker.elastic.co/elasticsearch/elasticsearch:7.4.0

连接到 Elasticsearch

让我们从将我们的服务连接到 Elasticsearch 开始--示例项目中的实现很简单--因此它将侦听 Quarkus 启动和关闭事件并初始化或终止连接:

package org.otaibe.quarkus.elasticsearch.example.service;import io.quarkus.runtime.ShutdownEvent;import io.quarkus.runtime.StartupEvent;import lombok.Getter;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;import javax.enterprise.context.ApplicationScoped;import javax.enterprise.event.Observes;@ApplicationScoped@Getter@Setter@Slf4jpublic class ElasticsearchService extends AbstractElasticsearchService {    public void init(@Observes StartupEvent event) {        log.info("init started");        super.init();        log.info("init completed");    }    public void shutdown(@Observes ShutdownEvent event) {        log.info("shutdown started");        super.shutdown();        log.info("shutdown completed");    }}

连接到 Elasticsearch 的实际工作是在AbstractElasticsearchService 中完成的:

public abstract class AbstractElasticsearchService {    @ConfigProperty(name = "service.elastic-search.hosts")    String[] hosts;    @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")    Optional numThreads;    private RestHighLevelClient restClient;    private Sniffer sniffer;    @PostConstruct    public void init() {        log.info("init started");        List httpHosts = Arrays.stream(hosts)                .map(s -> StringUtils.split(s, ':'))                .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))                .collect(Collectors.toList());        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));        getNumThreads().ifPresent(integer ->                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(                        IOReactorConfig                                .custom()                                .setIoThreadCount(integer)                                .build())                ));        restClient = new RestHighLevelClient(builder);        sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();        log.info("init completed");    }}

如您所见,此处的连接是按照Elasticsearch 文档 中建议的方式完成的。我的实现取决于两个配置属性:

属性文件:

service.elastic-search.hosts=localhost:11027

这是从 Docker 启动后的 Elasticsearch 连接字符串。第二个可选属性是:属性文件

service.elastic-search.num-threads

这是客户端所需的线程数。

创建 POJO

现在,让我们创建域对象(Fruit):

package org.otaibe.quarkus.elasticsearch.example.domain;import com.fasterxml.jackson.annotation.JsonProperty;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructor(staticName = "of")public class Fruit {    public static final String ID = "id";    public static final String EXT_REF_ID = "ext_ref_id";    public static final String NAME = "name";    public static final String DESCRIPTION = "description";    public static final String VERSION = "version";    @JsonProperty(ID)    public String id;    @JsonProperty(EXT_REF_ID)    public String extRefId;    @JsonProperty(NAME)    public String name;    @JsonProperty(DESCRIPTION)    public String description;    @JsonProperty(VERSION)    public Long version;}

创建和实现 DAO

创建索引

让我们创建 FruitDaoImpl。它是一个高级类,用于填充 AbstractElasticsearchReactiveDaoImplementation 并实现所需的业务逻辑。这里的另一个重要部分是为 Fruit 类创建索引:

@Overrideprotected Mono createIndex() {    CreateIndexRequest request = new CreateIndexRequest(getTableName());    Map mapping = new HashMap();    Map propsMapping = new HashMap<>();    propsMapping.put(Fruit.ID, getKeywordTextAnalizer());    propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());    propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));    propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));    propsMapping.put(Fruit.VERSION, getLongFieldType());    mapping.put(PROPERTIES, propsMapping);    request.mapping(mapping);    return createIndex(request);}

对 Elasticsearch 的真正创建索引调用是在父类 ( AbstractElasticsearchReactiveDaoImplementation ) 中实现的:

protected Mono createIndex(CreateIndexRequest request) {    return Flux.create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener() {        @Override        public void onResponse(CreateIndexResponse createIndexResponse) {            log.info("CreateIndexResponse: {}", createIndexResponse);            fluxSink.next(createIndexResponse.isAcknowledged());            fluxSink.complete();        }        @Override        public void onFailure(Exception e) {            log.error("unable to create index", e);            fluxSink.error(new RuntimeException(e));        }    }))            .next();}

玩转 DAO

大多数 CRUD 操作在AbstractElasticsearchReactiveDaoImplementation中实现 。

它有 save、 update、 findById和 deleteById 公共方法。它也有findByExactMatch和 findByMatch保护方法。FindBy*当需要填充业务逻辑时,这些 方法在后代类中非常有用。

业务查找方法在FruitDaoImpl 类中实现 :

public Flux findByExternalRefId(String value) {    return findByMatch(Fruit.EXT_REF_ID, value);}public Flux findByName(String value) {    return findByMatch(Fruit.NAME, value);}public Flux findByDescription(String value) {    return findByMatch(Fruit.NAME, value);}public Flux findByNameOrDescription(String value) {    Map query = new HashMap<>();    query.put(Fruit.NAME, value);    query.put(Fruit.DESCRIPTION, value);    return findByMatch(query);}

在Service类中封装 DAO

FruitDaoImpl 封装在 FruitService 中:

@ApplicationScoped@Getter@Setter@Slf4jpublic class FruitService {    @Inject    FruitDaoImpl dao;    public Mono save(Fruit entity) {        return getDao().save(entity);    }    public Mono findById(Fruit entity) {        return getDao().findById(entity);    }    public Mono findById(String id) {        return Mono.just(Fruit.of(id, null, null, null, null))                .flatMap(entity -> findById(entity));    }    public Flux findByExternalRefId(String value) {        return getDao().findByExternalRefId(value);    }    public Flux findByName(String value) {        return getDao().findByName(value);    }    public Flux findByDescription(String value) {        return getDao().findByDescription(value);    }    public Flux findByNameOrDescription(String value) {        return getDao().findByNameOrDescription(value);    }    public Mono delete(Fruit entity) {        return Mono.just(entity.getId())                .filter(s -> StringUtils.isNotBlank(s))                .flatMap(s -> getDao().deleteById(entity))                .defaultIfEmpty(false);    }}

测试 FruitService

该 FruitServiceTests 写入,以测试基本功能。它还用于确保 Fruit 类字段被正确索引并且全文搜索按预期工作:

@Testpublic void manageFruitTest() {    Fruit apple = getTestUtils().createApple();    Fruit apple1 = getFruitService().save(apple).block();    Assertions.assertNotNull(apple1.getId());    Assertions.assertTrue(apple1.getVersion() > 0);    log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));    List fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();    Assertions.assertTrue(fruitList.size() > 0);    List fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();    Assertions.assertTrue(fruitList1.size() > 0);    //Ensure that the full text search is working - it is 'Apples' in description    List fruitList2 = getFruitService().findByDescription("apple").collectList().block();    Assertions.assertTrue(fruitList2.size() > 0);    //Ensure that the full text search is working - it is 'Apple' in name    List fruitList3 = getFruitService().findByName("apples").collectList().block();    Assertions.assertTrue(fruitList3.size() > 0);    Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();    Assertions.assertTrue(deleteAppleResult);}

添加 REST 端点

因为这是一个示例项目,完整的 CRUD 功能不会作为 REST 端点添加。只有save和 findById被添加为 REST 端点。它们被添加到 FruitResource 中。那里的方法返回 CompletionStage,这确保我们的应用程序中不会有阻塞的线程。

测试 REST 端点

添加FruitResourceTest以测试 RESTendpoints:

package org.otaibe.quarkus.elasticsearch.example.web.controller;import io.quarkus.test.junit.QuarkusTest;import lombok.AccessLevel;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.eclipse.microprofile.config.inject.ConfigProperty;import org.junit.jupiter.api.Assertions;import org.junit.jupiter.api.Test;import org.otaibe.commons.quarkus.core.utils.JsonUtils;import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;import org.otaibe.quarkus.elasticsearch.example.service.FruitService;import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;import javax.inject.Inject;import javax.ws.rs.core.HttpHeaders;import javax.ws.rs.core.MediaType;import javax.ws.rs.core.Response;import javax.ws.rs.core.UriBuilder;import java.net.URI;import java.util.Optional;import static io.restassured.RestAssured.given;@QuarkusTest@Getter(value = AccessLevel.PROTECTED)@Slf4jpublic class FruitResourceTest {    @ConfigProperty(name = "service.http.host")    Optional host;    @Inject    TestUtils testUtils;    @Inject    JsonUtils jsonUtils;    @Inject    FruitService service;    @Test    public void restEndpointsTest() {        log.info("restEndpointsTest start");        Fruit apple = getTestUtils().createApple();        Fruit savedApple = given()                .when()                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)                .body(apple)                .post(getUri(FruitResource.ROOT_PATH))                .then()                .statusCode(200)                .extract()                .as(Fruit.class);        String id = savedApple.getId();        Assertions.assertTrue(StringUtils.isNotBlank(id));        URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)                .path(id)                .build();        Fruit foundApple = given()                .when().get(getUri(findByIdPath.getPath()).getPath())                .then()                .statusCode(200)                .extract()                .as(Fruit.class);        Assertions.assertEquals(savedApple, foundApple);        Boolean deleteResult = getService().delete(foundApple).block();        Assertions.assertTrue(deleteResult);        given()                .when().get(findByIdPath.getPath())                .then()                .statusCode(Response.Status.NOT_FOUND.getStatusCode()) ;        log.info("restEndpointsTest end");    }    private URI getUri(String path) {        return getUriBuilder(path)                .build();    }    private UriBuilder getUriBuilder(String path) {        return getHost()                .map(uri -> UriBuilder.fromUri(uri))                .map(uriBuilder -> uriBuilder.path(path))                .orElse(UriBuilder                        .fromPath(path)                );    }}

构建本地可执行文件

在构建本机可执行文件之前,我们必须注册我们的 Fruit 域对象。这样做的原因是我们的 FruitResource 返回 CompletionStage,因此,应用程序的实际返回类型是未知的,因此我们必须显式注册它以进行反射。在 Quarkus 中至少有两种方法可以做到这一点:

  1. 通过 @RegisterForReflection 注释。

  2. 通过 反射-config.json。

我个人更喜欢第二种方法,因为您要注册的类可能在第三方库中,并且不可能将 @RegisterForReflection 放在 那里。

现在, reflection-config.json 看起来像这样:

[  {    "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",    "allDeclaredConstructors" : true,    "allPublicConstructors" : true,    "allDeclaredMethods" : true,    "allPublicMethods" : true,    "allDeclaredFields" : true,    "allPublicFields" : true  }]

下一步是让 Quarkus 知道 reflection-config.json 文件。您应该将此行添加到pom.xml文件中的 native配置文件中:

-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json

您现在可以构建您的本机应用程序:

mvn clean package -Pnative

并启动它:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

该服务将在http://localhost:11025上可用,因为这是application.properties 中明确指定的端口。

quarkus.http.port=11025

测试本机构建

该 FruitResourceTest 预计以下可选属性:

属性文件:

service.http.host

如果存在,测试请求将命中指定的主机。如果您启动本机可执行文件:

shell:

./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner

并使用以下代码执行测试/构建:

shell:

mvn package -D %test.service.http .host = http://localhost:11025

测试将针对本机构建运行。

到此,相信大家对"如何使用Quarkus在Elasticsearch进行响应式"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

文件 方法 测试 这是 属性 项目 本机 端点 索引 业务 实际 应用程序 方式 示例 程序 中实 应用 服务 代码 内容 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全法强化了 网络安全演练阶总结 计算机网络技术最主要技术 服务器加密文件怎么删除 成都市网络安全游园会 软件开发边界问题 控制系统网络安全防范 井陉网络安全宣传 初步理解互联网科技 中国食品安全的数据库 需要数据库的生活场景 深圳市开创网络技术有限公司 服务器配置如何设置 3360安全卫士网络安全防护 外挂服务器逃跑吧少年 等级保护网络安全解释 网站服务器配置教程 宝山区网络安全工控机生产厂家 家里电脑访问不了公司的服务器 广西统筹建设网络安全态势感知 单片机软件开发用什么语言 信息技术教案《网络安全》 青浦区自动化软件开发口碑推荐 数据库检测sql注入 大兴区咨询软件开发平台 湖北前端软件开发价钱是多少 简要铁路如何落实网络安全 十次啦美国服务器 腾讯 护苗网络安全课 聊天平台服务器一个月多少钱
0