Как использовать Kafka Spring Cloud Stream по умолчанию, а также использовать сообщение Kafka, сгенерированное слиянием API?

Я создаю микросервисный компонент, который будет по умолчанию принимать сообщения Kafka Spring Cloud Stream (SCS), генерируемые другими (SCS) компонентами.

Но у меня также есть требование использовать сообщения Kafka от других компонентов, которые используют сливной API.

У меня есть пример репозитория, который показывает, что я пытаюсь сделать.

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

Ниже приведена конфигурация приложения с привязкой к входу SCS и слитной привязкой к входу.

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group

С помощью описанной выше конфигурации я получаю обоих потребителей, созданных в конфигурации SCS по умолчанию Например, класс org.apache.kafka.common.serialization.ByteArrayDeserializer является десериализатором значений для обеих входных привязок.

Если я удаляю комментарии в вышеупомянутой конфигурации, я получаю обоих потребителей с конфигурацией, отправляемой из моего клиента Confluent Например, класс io.confluent.kafka.serializers.KafkaAvroDeserializer является десериализатором значений для обеих входных привязок.

Я понимаю, потому что конфигурация находится в связывателе Кафки, она будет применяться ко всем потребителям, определенным с этим связывателем.

Можно ли как-то определить эти конкретные свойства, чтобы они применялись только к конкретной привязке потребителя, а все остальные входные привязки могли использовать конфигурацию SCS по умолчанию?

4 голоса | спросил Donal Hurley 8 PM00000040000000031 2018, 16:19:00

1 ответ


0
Вы можете установить привязанные к привязке свойства потребителя и производителя через свойство ---- +: = 0 =: + ---- .Смотрите справочное руководство .При использовании нестандартных сериализаторов /десериализаторов вы должны установить ---- +: = 2 =: + ---- и ---- +: = 3 =: + ---- для производителей и потребителей соответственно.Снова смотрите справочное руководство.
ответил Gary Russell 8 PM00000050000005031 2018, 17:22:50

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132