春云Kafka StreamsUncaughtExceptionHandler

原学程将引见秋云Kafka StreamsUncaughtExceptionHandler的处置办法,这篇学程是从其余处所瞅到的,而后减了1些海外法式员的疑问与解问,愿望能对于您有所赞助,佳了,上面开端进修吧。

成绩描写

我正在测验考试将StreamsUncaughtExceptionHandler添减到我的Kafka流处置器中。该处置器是用Kafka函数编辑的。我检查了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包含到我的办事中,但是我的异常从未被它捕捉/处置。

设置装备摆设Bean:

@Autowired
UnCaughtExceptionHandler exceptionHandler;

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
 return new StreamsBuilderFactoryBeanConfigurer() {

  @Override
  public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
  }

  @Override
  public int getOrder() {
return Integer.MAX_VALUE;
  }

 };
}

自界说异常处置法式:

 @Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

  @Autowired
  private StreamBridge streamBridge;

  @Override
  public StreamThreadExceptionResponse handle(Throwable exception) {
 return StreamThreadExceptionResponse.REPLACE_THREAD;
  }
}

流处置函数:

@Autowired
private MyService service;

@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
 final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
 return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
 Optional<Output> outputResult = service.process(value);
 if (outputResult.isPresent()) {
  result.set(new KeyValue<>(key, outputResult.get()));
  return true;
 }
 return false;
})
  .map((messageKey, messageValue) -> result.get());
}

我愿望UnCaughtExceptionHandler处置由service.process()办法激发的所有异常。然则异常永久没有会退进Handle办法;相反,它们流传到根并杀逝世客户端。我也瞅过this solution,但是我想以更自力的方法处置它。

成绩:怎样应用StreamsUncaughtExceptionHandler处置所有处置异常?

    Spring Boot版原:二.六.三

    秋云溪流版原:三.二.一

    Spring-Cloud-Stream-Binder-Kafka-Streams:三.二.一

    Kafka-Streams:三.0.0

可复制示例:spring-cloud-kafka-streams-exception

推举谜底

以下是您不妨测验考试的多少种办法。

    测验考试在StreamsBuilderFactoryBean中的this line处树立断面,并检查设置装备摆设的值是甚么。这应当会给出1些线索。

    我留意到您在设置装备摆设的Impl中为定单树立了Integer.MAX_VALUE。默许情形下,StreamsBuilderFactoryBean应用阶段值Integer.MAX_VALUE - 一000,是以在工场Bean预备开动时,设置装备摆设器能够借弗成用,由于Integer.MAX_VALUE的优先级较矮。您不妨将定单变动为相似Integer.MAX_VALUE - 五000的实质,以保证在开开工厂Bean之前完整虚例化设置装备摆设Bean。

从这些选项开端,检查它们能否为该成绩供给了所有迹象。假如它依然存留,请随时与我们分享1个可反复应用的小示例运用法式。

佳了闭于秋云Kafka StreamsUncaughtExceptionHandler的学程便到这里便停止了,愿望趣模板源码网找到的这篇技巧文章能赞助到年夜野,更多技巧学程不妨在站内搜刮。

0
没有账号?注册  忘记密码?