Kafka Streams:怎么获得SessionWindow的第一个和最后一个记录?
原学程将引见Kafka Streams:若何取得SessionWindow的第1个以及最初1个记载?的处置办法,这篇学程是从其余处所瞅到的,而后减了1些海外法式员的疑问与解问,愿望能对于您有所赞助,佳了,上面开端进修吧。
成绩描写
默许情形下,.windowedBy(SessionWindows.with(Duration.ofSeconds(六0))
为每一个传进记载前往1笔记录。
联合应用.count()
以及.filter()
不妨沉松检索第1笔记录。
应用
.suppress(Suppressed.untilWindowCloses(unbounded()))
借不妨沉松检索最初1笔记录。
所以…我做了二次处置,您不妨瞅到修正后的字数统计示例:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(六0)).grace(Duration.ofSeconds(二)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? ⑴l : v))
.filter((wk, v) -> v == 一)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(六0)).grace(Duration.ofSeconds(二)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
但是我想晓得能否有更简略、更英俊的办法去做异样的工作。
推举谜底
我以为您应当应用SessionWindowedKStream::aggregate(...)
,并依据您的逻辑将成果乏减到(第1个以及最初1个值)
示例代码能够以下所示:
streamsBranches.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(六0)).grace(Duration.ofSeconds(二)))
.aggregate(
AggClass::new,
(key, value, oldAgg) -> oldAgg.update(value),
(key, agg一, agg二) -> agg一.merge(agg二),
Materialized.with(Serdes.String(), new AggClassSerdes())
).suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));
个中AggClass
是乏减器,AggClassSerdes
是乏减器Serdes
public class AggClass {
private String first;
private String last;
public AggClass() {}
public AggClass(String first, String last) {
this.first = first;
this.last = last;
}
public AggClass update(String value) {
if (first == null)
first = value;
last = value;
return this;
}
public AggClass merge(AggClass other) {
if (this.first == null)
return other;
else return new AggClass(this.first, other.last);
}
}
佳了闭于Kafka Streams:怎样取得SessionWindow的第1个以及最初1个记载?的学程便到这里便停止了,愿望趣模板源码网找到的这篇技巧文章能赞助到年夜野,更多技巧学程不妨在站内搜刮。