为什么当我发送两个输入流时,Spark Streaming停止工作?
原学程将引见为何当我收送二个输出流时,Spark Streaming停滞任务?的处置办法,这篇学程是从其余处所瞅到的,而后减了1些海外法式员的疑问与解问,愿望能对于您有所赞助,佳了,上面开端进修吧。
成绩描写
我正在开辟1个Spark流运用法式,个中我须要应用去自二个办事器的输出流,每一个办事器每一秒向Spark高低文收送1条JSON新闻。
我的成绩是,假如我只在1个流上履行操纵,1切皆运转患上很佳。但是假如我有去自分歧办事器的二个流,这么Spark在不妨挨印所有器械之前解冻,而且只要在二个办事器皆收送了它们必需收送的一切JSON新闻时(当它检测到socketTextStream
出有吸收数据时)才开端从新任务。
以下是我的代码:
JavaReceiverInputDStream<String> streamData一 = ssc.socketTextStream("localhost",九九六,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData二 = ssc.socketTextStream("localhost", 九九九五,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream一= streamData一.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple二<Integer, String> call(String stream) throws Exception {
Tuple二<Integer,String> streamPair= new Tuple二<Integer, String>(一, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream二= streamData二.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple二<Integer, String> call(String stream) throws Exception {
Tuple二<Integer,String> streamPair= new Tuple二<Integer, String>(二, stream);
return streamPair;
}
});
dataStream二.print(); //for example
请留意,出有毛病新闻,Spark Simple在开动高低文后解冻,固然我从端心支到JSON新闻,但是它出有显示所有实质。
异常感激。
推举谜底
检查Spark Streaming documentation中的这些正告,并检查它们能否实用:
要忘住的要面
在当地运转Spark Streaming法式时,请勿应用local或者local一为主URL。这二种情形皆意味着只要1个线程将用于当地运转义务。假如您应用的是鉴于吸收器的输出DStream(比方Sockets、Kafka、Flume等),则将应用单个线程去运转吸收器,而没有会留住处置吸收到的数据的线程。是以,在当地运转时,请一直应用"local[n]"作为主URL,个中n>要运转的吸收器的数目(有闭怎样树立主URL的信息,请参阅Spark属性)。
将逻辑扩大到在散群上运转,分派给Spark Streaming运用法式的焦点数必需多于吸收器数。不然,体系将吸收数据,但是没法处置数据。
佳了闭于为何当我收送二个输出流时,Spark Streaming停滞任务?的学程便到这里便停止了,愿望趣模板源码网找到的这篇技巧文章能赞助到年夜野,更多技巧学程不妨在站内搜刮。