添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • RabbitMQ connector extension
  • Configuring the RabbitMQ Broker access
  • Receiving RabbitMQ messages
  • Inbound Metadata
  • Deserialization
  • Acknowledgement
  • Failure Management
  • Sending RabbitMQ messages
  • Serialization
  • Outbound Metadata
  • Acknowledgement
  • Configuring the RabbitMQ Exchange/Queue
  • Execution model and Blocking processing
  • Customizing the underlying RabbitMQ client
  • Health reporting
  • Dynamic Credentials
  • RabbitMQ Connector Configuration Reference
  • Incoming channel configuration
  • 传出通道配置
  • preview(预览) 中,不保证向后兼容和在生态系统中的存在。具体的改进可能需要改变配置或API,并且正在计划变得 稳定 。欢迎在我们的 邮件列表 中提供反馈,或在我们的 GitHub问题列表 中提出问题。

    For a full list of possible statuses, check our FAQ entry .

    To use the connector, you need to add the quarkus-smallrye-reactive-messaging-rabbitmq extension.

    You can add the extension to your project using:

    > ./mvnw quarkus:add-extensions -Dextensions="quarkus-smallrye-reactive-messaging-rabbitmq"

    Or just add the following dependency to your project:

    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging-rabbitmq</artifactId>
    </dependency>

    Once added to your project, you can map channels to RabbitMQ exchanges or queues by configuring the connector attribute:

    # Inbound
    mp.messaging.incoming.[channel-name].connector=smallrye-rabbitmq
    # Outbound
    mp.messaging.outgoing.[channel-name].connector=smallrye-rabbitmq

    outgoing channels are mapped to RabbitMQ exchanges and incoming channels are mapped to RabbitMQ queues as required by the broker.

    Configures the broker host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host Configures the broker port. You can do it per channel (using the port attribute) or globally using rabbitmq-port . The default is 5672 . Configures the broker username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username . Configures the broker password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password . Instructs the prices channel to be managed by the RabbitMQ connector import org.eclipse.microprofile.reactive.messaging.Incoming; import jakarta.enterprise.context.ApplicationScoped; @ApplicationScoped public class RabbitMQPriceConsumer { @Incoming("prices") public void consume(double price) { // process your price.

    Or, you can retrieve the Message<Double>:

    package inbound;
    import org.eclipse.microprofile.reactive.messaging.Incoming;
    import org.eclipse.microprofile.reactive.messaging.Message;
    import jakarta.enterprise.context.ApplicationScoped;
    import java.util.concurrent.CompletionStage;
    @ApplicationScoped
    public class RabbitMQPriceMessageConsumer {
        @Incoming("prices")
        public CompletionStage<Void> consume(Message<Double> price) {
            // process your price.
            // Acknowledge the incoming message, marking the RabbitMQ message as `accepted`.
            return price.ack();
    

    Inbound Metadata

    Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata in the metadata.

    Optional<IncomingRabbitMQMetadata> metadata = incoming.getMetadata(IncomingRabbitMQMetadata.class);
    metadata.ifPresent(meta -> {
        final Optional<String> contentEncoding = meta.getContentEncoding();
        final Optional<String> contentType = meta.getContentType();
        final Optional<String> correlationId = meta.getCorrelationId();
        final Optional<ZonedDateTime> creationTime = meta.getCreationTime(ZoneId.systemDefault());
        final Optional<Integer> priority = meta.getPriority();
        final Optional<String> replyTo = meta.getReplyTo();
        final Optional<String> userId = meta.getUserId();
        // Access a single String-valued header
        final Optional<String> stringHeader = meta.getHeader("my-header", String.class);
        // Access all headers
        final Map<String,Object> headers = meta.getHeaders();
        // ...
    

    No value

    application/json

    a JSON element which can be a JsonArray, JsonObject, String, …​etc if the buffer contains an array, object, string, …​etc

    No value

    Anything else

    byte[]

    @Outgoing("to-rabbitmq") public Multi<Price> prices() { (1) AtomicInteger count = new AtomicInteger(); return Multi.createFrom().ticks().every(Duration.ofMillis(1000)) .map(l -> new Price().setPrice(count.incrementAndGet())) .onOverflow().drop(); @ApplicationScoped public static class Consumer { List<Price> prices = new CopyOnWriteArrayList<>(); @Incoming("from-rabbitmq") public void consume(JsonObject p) { (2) Price price = p.mapTo(Price.class); (3) prices.add(price); public List<Price> list() { return prices;

    Acknowledgement

    When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.

    Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement setting for the channel; if that is set to true then your message will be automatically acknowledged on receipt.

    Failure Management

    If a message produced from a RabbitMQ message is nacked, a failure strategy is applied. The RabbitMQ connector supports three strategies, controlled by the failure-strategy channel setting:

    fail - fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected.

    accept - this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure.

    reject - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message.

    JsonObject or JsonArray

    Serialized String payload with content_type set to application/json

    io.vertx.mutiny.core.buffer.Buffer

    Binary content, with content_type set to application/octet-stream

    byte[]

    Binary content, with content_type set to application/octet-stream

    Any other class

    The payload is converted to JSON (using a Json Mapper) then serialized with content_type set to application/json

    final OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder()
            .withHeader("my-header", "xyzzy")
            .withRoutingKey("urgent")
            .withTimestamp(ZonedDateTime.now())
            .build();
    // Add `metadata` to the metadata of the outgoing message.
    return Message.of("Hello", Metadata.of(metadata));
    mp.messaging.incoming.prices.connector=smallrye-rabbitmq
    mp.messaging.incoming.prices.queue.name=my-queue
    mp.messaging.outgoing.orders.connector=smallrye-rabbitmq
    mp.messaging.outgoing.orders.exchange.name=my-order-queue

    If the exchange.name or queue.name attribute is not set, the connector uses the channel name.

    To use an existing queue, you need to configure the name and set the exchange’s or queue’s declare property to false. For example, if you have a RabbitMQ broker configured with a people exchange and queue, you need the following configuration:

    mp.messaging.incoming.people.connector=smallrye-rabbitmq
    mp.messaging.incoming.people.queue.name=people
    mp.messaging.incoming.people.queue.declare=false
    mp.messaging.outgoing.people.connector=smallrye-rabbitmq
    mp.messaging.outgoing.people.exchange.name=people
    mp.messaging.outgoing.people.exchange.declare=false

    Execution model and Blocking processing

    Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

    For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

    import io.smallrye.reactive.messaging.annotations.Blocking;
    import org.eclipse.microprofile.reactive.messaging.Incoming;
    import jakarta.enterprise.context.ApplicationScoped;
    import jakarta.transaction.Transactional;
    @ApplicationScoped
    public class PriceStorage {
        @Incoming("prices")
        @Blocking
        @Transactional
        public void store(int priceInUsd) {
            Price price = new Price();
            price.value = priceInUsd;
            price.persist();
    

    The connector uses the Vert.x RabbitMQ client underneath. More details about this client can be found in the Vert.x website.

    You can customize the underlying client configuration by producing an instance of RabbitMQOptions as follows:

    @Produces
    @Identifier("my-named-options")
    public RabbitMQOptions getNamedOptions() {
      PemKeyCertOptions keycert = new PemKeyCertOptions()
            .addCertPath("./tls/tls.crt")
            .addKeyPath("./tls/tls.key");
      PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
      // You can use the produced options to configure the TLS connection
      return new RabbitMQOptions()
            .setSsl(true)
            .setPemKeyCertOptions(keycert)
            .setPemTrustOptions(trust)
            .setUser("user1")
            .setPassword("password1")
            .setHost("localhost")
            .setPort(5672)
            .setVirtualHost("vhost1")
            .setConnectionTimeout(6000) // in milliseconds
            .setRequestedHeartbeat(60) // in seconds
            .setHandshakeTimeout(6000) // in milliseconds
            .setRequestedChannelMax(5)
            .setNetworkRecoveryInterval(500) // in milliseconds
            .setAutomaticRecoveryEnabled(true);
    

    This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-options-name attribute:

    mp.messaging.incoming.prices.client-options-name=my-named-options

    If you use the RabbitMQ connector with the quarkus-smallrye-health extension, it contributes to the readiness and liveness probes. The RabbitMQ connector reports the readiness and liveness of each channel managed by the connector.

    To disable health reporting, set the health-enabled attribute for the channel to false.

    On the inbound side (receiving messages from RabbitMQ), the check verifies that the receiver is connected to the broker.

    On the outbound side (sending records to RabbitMQ), the check verifies that the sender is not disconnected from the broker; the sender may still be in an initialised state (connection not yet attempted), but this is regarded as live/ready.

    Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It’s the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

    Quarkus and the RabbitMQ connector support Vault’s RabbitMQ secrets engine for generating short-lived dynamic credentials. This allows Vault to create and retire RabbitMQ credentials on a regular basis.

    First we need to enable Vault’s rabbitmq secret engine, configure it with RabbitMQ’s connection and authentication information, and create a Vault role my-role (replace 10.0.0.3 by the actual host that is running the RabbitMQ container):

    vault secrets enable rabbitmq
    vault write rabbitmq/config/connection \
        connection_uri=http://10.0.0.3:15672 \
        username=guest \
        password=guest
    vault write rabbitmq/roles/my-role \
        vhosts='{"/":{"write": ".*", "read": ".*"}}'
    cat <<EOF | vault policy write vault-rabbitmq-policy -
    path "secret/data/myapps/vault-rabbitmq-test/*" {
      capabilities = ["read"]
    path "rabbitmq/creds/my-role" {
      capabilities = [ "read" ]
    

    Now that Vault knows how to create users in RabbitMQ, we need to configure Quarkus to use a credentials-provider for RabbitMQ.

    First we tell Quarkus to request dynamic credentials using a credentials-provider named rabbitmq.

    quarkus.rabbitmq.credentials-provider = rabbitmq

    Next we configure the rabbitmq credentials provider. The credentials-role option must be set to the name of the role we created in Vault, in our case my-role. The credentials-mount option must be set to rabbitmq.

    quarkus.vault.credentials-provider.rabbitmq.credentials-role=my-role
    quarkus.vault.credentials-provider.rabbitmq.credentials-mount=rabbitmq

    credentials-provider-name

    (rabbitmq-credentials-provider-name)

    The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

    Type: string

    false

    connection-timeout

    The TCP connection timeout (ms); 0 is interpreted as no timeout

    Type: int

    false

    60000

    handshake-timeout

    The AMQP 0-9-1 protocol handshake timeout (ms)

    Type: int

    false

    10000

    automatic-recovery-enabled

    Whether automatic connection recovery is enabled

    Type: boolean

    false

    false

    automatic-recovery-on-initial-connection

    Whether automatic recovery on initial connections is enabled

    Type: boolean

    false

    reconnect-attempts

    (rabbitmq-reconnect-attempts)

    The number of reconnection attempts

    Type: int

    false

    reconnect-interval

    (rabbitmq-reconnect-interval)

    The interval (in seconds) between two reconnection attempts

    Type: int

    false

    network-recovery-interval

    How long (ms) will automatic recovery wait before attempting to reconnect

    Type: int

    false

    The AMQP username to use when connecting to the broker

    Type: string

    false

    guest

    include-properties

    Whether to include properties when a broker message is passed on the event bus

    Type: boolean

    false

    false

    requested-channel-max

    The initially requested maximum channel number

    Type: int

    false

    requested-heartbeat

    The initially requested heartbeat interval (seconds), zero for none

    Type: int

    false

    use-nio

    Whether usage of NIO Sockets is enabled

    Type: boolean

    false

    false

    virtual-host

    (rabbitmq-virtual-host)

    The virtual host to use when connecting to the broker

    Type: string

    false

    exchange.name

    The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

    Type: string

    false

    exchange.durable

    Whether the exchange is durable

    Type: boolean

    false

    exchange.auto-delete

    Whether the exchange should be deleted after use

    Type: boolean

    false

    false

    exchange.type

    The exchange type: direct, fanout, headers or topic (default)

    Type: string

    false

    topic

    exchange.declare

    Whether to declare the exchange; set to false if the exchange is expected to be set up independently

    Type: boolean

    false

    tracing.enabled

    Whether tracing is enabled (default) or disabled

    Type: boolean

    false

    tracing.attribute-headers

    A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

    Type: string

    false

    queue.name

    The queue from which messages are consumed.

    Type: string

    queue.durable

    Whether the queue is durable

    Type: boolean

    false

    queue.exclusive

    Whether the queue is for exclusive use

    Type: boolean

    false

    false

    queue.auto-delete

    Whether the queue should be deleted after use

    Type: boolean

    false

    false

    queue.declare

    Whether to declare the queue and binding; set to false if these are expected to be set up independently

    Type: boolean

    false

    queue.ttl

    If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead

    Type: long

    false

    queue.single-active-consumer

    If set to true, only one consumer can actively consume messages

    Type: boolean

    false

    false

    queue.x-queue-type

    If automatically declare queue, we can choose different types of queue [quorum, classic, stream]

    Type: string

    false

    classic

    queue.x-queue-mode

    If automatically declare queue, we can choose different modes of queue [lazy, default]

    Type: string

    false

    default

    max-incoming-internal-queue-size

    The maximum size of the incoming internal queue

    Type: int

    false

    connection-count

    The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.

    Type: int

    false

    auto-bind-dlq

    Whether to automatically declare the DLQ and bind it to the binder DLX

    Type: boolean

    false

    false

    dead-letter-queue-name

    The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended

    Type: string

    false

    dead-letter-exchange

    A DLX to assign to the queue. Relevant only if auto-bind-dlq is true

    Type: string

    false

    dead-letter-exchange-type

    The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true

    Type: string

    false

    direct

    dead-letter-routing-key

    A dead letter routing key to assign to the queue; if not supplied will default to the queue name

    Type: string

    false

    dlx.declare

    Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently

    Type: boolean

    false

    false

    dead-letter-queue-type

    If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream]

    Type: string

    false

    classic

    dead-letter-queue-mode

    If automatically declare DLQ, we can choose different modes of DLQ [lazy, default]

    Type: string

    false

    default

    failure-strategy

    The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are fail, accept, reject (default)

    Type: string

    false

    reject

    broadcast

    Whether the received RabbitMQ messages must be dispatched to multiple subscribers

    Type: boolean

    false

    false

    auto-acknowledgement

    Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement

    Type: boolean

    false

    false

    keep-most-recent

    Whether to discard old messages instead of recent ones

    Type: boolean

    false

    false

    routing-keys

    A comma-separated list of routing keys to bind the queue to the exchange

    Type: string

    false

    content-type-override

    Override the content_type attribute of the incoming message, should be a valid MINE type

    Type: string

    false

    max-outstanding-messages

    The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number

    Type: int

    false

    automatic-recovery-on-initial-connection

    Whether automatic recovery on initial connections is enabled

    Type: boolean

    false

    connection-timeout

    The TCP connection timeout (ms); 0 is interpreted as no timeout

    Type: int

    false

    60000

    default-routing-key

    The default routing key to use when sending messages to the exchange

    Type: string

    false

    default-ttl

    If specified, the time (ms) sent messages can remain in queues undelivered before they are dead

    Type: long

    false

    exchange.auto-delete

    Whether the exchange should be deleted after use

    Type: boolean

    false

    false

    exchange.declare

    Whether to declare the exchange; set to false if the exchange is expected to be set up independently

    Type: boolean

    false

    exchange.durable

    Whether the exchange is durable

    Type: boolean

    false

    exchange.name

    The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

    Type: string

    false

    exchange.type

    The exchange type: direct, fanout, headers or topic (default)

    Type: string

    false

    topic

    handshake-timeout

    The AMQP 0-9-1 protocol handshake timeout (ms)

    Type: int

    false

    10000

    (rabbitmq-host)

    The broker hostname

    Type: string

    false

    localhost

    include-properties

    Whether to include properties when a broker message is passed on the event bus

    Type: boolean

    false

    false

    max-inflight-messages

    The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number

    Type: long

    false

    max-outgoing-internal-queue-size

    The maximum size of the outgoing internal queue

    Type: int

    false

    network-recovery-interval

    How long (ms) will automatic recovery wait before attempting to reconnect

    Type: int

    false

    password

    (rabbitmq-password)

    The password used to authenticate to the broker

    Type: string

    false

    (rabbitmq-port)

    The broker port

    Type: int

    false

    reconnect-attempts

    (rabbitmq-reconnect-attempts)

    The number of reconnection attempts

    Type: int

    false

    reconnect-interval

    (rabbitmq-reconnect-interval)

    The interval (in seconds) between two reconnection attempts

    Type: int

    false

    requested-channel-max

    The initially requested maximum channel number

    Type: int

    false

    requested-heartbeat

    The initially requested heartbeat interval (seconds), zero for none

    Type: int

    false

    (rabbitmq-ssl)

    Whether the connection should use SSL

    Type: boolean

    false

    false

    tracing.attribute-headers

    A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

    Type: string

    false

    tracing.enabled

    Whether tracing is enabled (default) or disabled

    Type: boolean

    false

    trust-all

    (rabbitmq-trust-all)

    Whether to skip trust certificate verification

    Type: boolean

    false

    false

    trust-store-password

    (rabbitmq-trust-store-password)

    The password of the JKS trust store

    Type: string

    false

    trust-store-path

    (rabbitmq-trust-store-path)

    The path to a JKS trust store

    Type: string

    false

    credentials-provider-name

    (rabbitmq-credentials-provider-name)

    The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

    Type: string

    false

    use-nio

    Whether usage of NIO Sockets is enabled

    Type: boolean

    false

    false

    The AMQP username to use when connecting to the broker

    Type: string

    false

    guest

    username

    (rabbitmq-username)

    The username used to authenticate to the broker

    Type: string

    false

    virtual-host

    (rabbitmq-virtual-host)

    The virtual host to use when connecting to the broker

    Type: string

    false