Как реализовать слушатель сообщений с сохранением состояния с помощью Spring Kafka?

Я хотел бы реализовать прослушиватель с отслеживанием состояния с использованием API Spring Kafka.

Учитывая следующее:

  • ConcurrentKafkaListenerContainerFactory с параллелизмом, установленным на "n"
  • Аннотированный метод @KafkaListener в классе Spring @Service

Затем будет создан "n" KafkaMessageListenerContainers. Каждый из них будет иметь свой собственный KafkaConsumer, и, следовательно, будет «n» потребительских потоков - по одному на потребителя.

Когда сообщения используются, метод @KafkaListener будет вызываться с использованием того же потока, который опрашивал базовый KafkaConsumer. Поскольку имеется только экземпляр прослушивателя, этот прослушиватель должен быть потокобезопасным, поскольку будет одновременный доступ из «n» потоков.

Мне бы не хотелось думать о параллельном доступе, а о состоянии хранения в слушателе, который, как я знаю, будет доступен только одному потоку.

Как создать отдельный слушатель для каждого потребителя Kafka с помощью API Spring Kafka?

4 голоса | спросил A_M 2 PM00000070000005831 2018, 19:19:58

1 ответ


0
Ты прав;для каждого контейнера существует один экземпляр прослушивателя (независимо от того, настроен ли он как ---- +: = 0 =: + ---- или ---- +: = 1 =: + ---- ).Обходной путь - использовать прототип с областью действия ---- +: = 2 =: + ---- с n ---- +: = 3 =: + ---- bean (каждый из которых имеет 1 поток).Затем каждый контейнер получит свой экземпляр слушателя.Это невозможно с ---- +: = 4 =: + ---- POJO абстракция.Однако лучше использовать бобы без состояния.РЕДАКТИРОВАТЬЯ нашел другой обходной путь, используя ---- +: = 5 =: + ---- ...(Контейнерный параллелизм равен 3).Работает нормально:Единственная проблема заключается в том, что область не очищает сама по себе (например, когда контейнер останавливается и поток удаляется. Это может быть не критично, в зависимости от вашего варианта использования).Чтобы это исправить, нам понадобится некоторая помощь из контейнера (например, опубликовать событие в потоке слушателя, когда он остановлен).GH-762 .
ответил Gary Russell 2 PM00000070000004931 2018, 19:39:49

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132