电光为什么在三角洲湖表中写空

本教程将介绍电光为什么在三角洲湖表中写空的处理方法,这篇教程是从别的地方看到的,然后加了一些国外程序员的疑问与解答,希望能对你有所帮助,好了,下面开始学习吧。

电光为什么在三角洲湖表中写空 教程 第1张

问题描述

我的Java应用程序使用连接到的Spark Structured Streaming不断获取封装在RDMessage对象中的传感器测量记录(IoT),该对象记录协议中用于控制的消息类型。
当消息到达时,将使用Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class)检查它们并将其转换为数据集。

虽然流被正确读取并且RDMeasurement对象被正确创建,但是输出流被设置为无或零,具体取决于数据类型。当我更改格式(.format("console"))时,我在DeltaFrame表或控制台中看到这一点。

我这里错过了什么?出什么问题了?

请参阅下面最重要的Java代码段

public final class SocketRDMeasurement {

 public static void main(String[] args) throws Exception {
  SparkSession spark = SparkSession
 .builder()
 .appName("SSSocketRDMeasurement")
 .master("local[*]")
 .getOrCreate();

  Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
  Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
  Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);

  Dataset<Row> records = spark
 .readStream()
 .format("socket")
 .option("host", host)
 .option("port", port)
 .load();

  Dataset<String> inputReceived = records.as(Encoders.STRING());

  Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
 .map((MapFunction<String, StringArray>) x ->
  new StringArray(x),
stringArrayEncoder);

  Dataset<RDMessage> messages = input.map(
 (MapFunction<StringArray, RDMessage>) 
  r -> new RDMessage(r), messageEncoder);

  Dataset<RDMeasurement> measurements = messages
 .map((MapFunction<RDMessage, RDMeasurement>) r ->
new RDMeasurement(), measurementEncoder);

  // The code executes without warning or error but despite the 
  // objects being created correctly the output of dataset is
  // is saved with nulls/nan
  StreamingQuery query = measurements.writeStream()
 .outputMode("append")
 .format("delta")
 .option("checkpointLocation",
  "/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
 .start("/opt/data/delta/ss-socket-rd-measurement");

  query.awaitTermination();
 }
}

public class StringArray implements Serializable {
 private String[] tokens;
 public StringArray(String tokens) {
  this.tokens = tokens.split(",");
 }

 // getters, setters and toString goes here
}

public class RDMeasurement implements Serializable {
 private String dataSourceName = null;
 private double dt = 0.0f;
 private double t0 = 0f;
 private double endTimestamp = 0L;
 private double[] valuesArray;

 public RDMeasurement() { }

 public RDMeasurement(String dataSourceName, double t0, 
  double dt, double endTimestamp, double[] valuesArray) {

  this.dataSourceName = dataSourceName;
  this.t0 = t0;
  this.dt = dt;
  this.endTimestamp = endTimestamp;
  this.valuesArray = valuesArray;
 }

 // getters, setters and toString goes here
}

public class RDMessage implements Serializable {
 String type;
 RDMeasurement rdMeasurement;

 public RDMessage(String type, RDMeasurement rdMeasurement) {
  this.type = type;
  this.rdMeasurement = rdMeasurement;
 }

 public RDMessage(StringArray stringArray) {
  this(stringArray.getTokens()[0] ,
 new RDMeasurement(stringArray.getTokens()[1],
Double.parseDouble(stringArray.getTokens()[2]),
Double.parseDouble(stringArray.getTokens()[3]),
Double.parseDouble(stringArray.getTokens()[4]),
toDoubleArray(5, stringArray))
  );
 }

 private static double[] toDoubleArray(int skip, StringArray stringArray) {
  double[] ret = new double[stringArray.getTokens().length - 5];
  for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
  }
  return ret;
 }

 // getters, setters and toString goes here
}

每行输入遵循以下格式:

V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00

推荐答案

重构Java代码并添加调试代码段后,我能够识别错误。

参见重构:

StreamingQuery query = dataStreamReader.load()
  .as(Encoders.STRING())
  .map((MapFunction<String, StringArray>) x -> new StringArray(x),
 stringArrayEncoder)
  .map((MapFunction<StringArray, RDMessage>)
 r -> new RDMessage(r), messageEncoder)
  .map((MapFunction<RDMessage, RDMeasurement>) e ->
 e.getRdMeasurement(), measurementEncoder)
  /*
  .map((MapFunction<RDMeasurement, String>) e -> {
if (e.getDataSourceName() != null) {
 System.out.println("•••> " + e);
}
return e.toString();
  }, Encoders.STRING())
  .map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
 measurementEncoder) 
  */
  .writeStream()
  .outputMode("append")
  .format("console")
  .start();
query.awaitTermination();

上面注释的代码使我能够识别问题。

好了关于电光为什么在三角洲湖表中写空的教程就到这里就结束了,希望趣模板源码网找到的这篇技术文章能帮助到大家,更多技术教程可以在站内搜索。