有什么办法可以扁平化电光流媒体中的嵌套JSON吗?

本教程将介绍有什么办法可以扁平化电光流媒体中的嵌套JSON吗?的处理方法,这篇教程是从别的地方看到的,然后加了一些国外程序员的疑问与解答,希望能对你有所帮助,好了,下面开始学习吧。

问题描述

我已经编写了一个数据集火花作业(批处理)代码来扁平化数据,运行正常,但当我尝试在火花流作业中使用相同的代码片段时,它抛出以下错误
必须使用WriteStream.start();

执行具有流来源的查询
那么,有什么方法可以在流作业中展平嵌套的JSON吗?
样本输入嵌套JSON-

{
"name":" Akash",
"age":26,
"watches":{
"name":"Apple",
"models":[
"Apple Watch Series 5",
"Apple Watch Nike"
]
},
"phones":[
{
"name":" Apple",
"models":[
"iphone X",
"iphone XR",
"iphone XS",
"iphone 11",
"iphone 11 Pro"
]
},
{
"name":" Samsung",
"models":[
"Galaxy Note10",
"Galaxy Note10+",
"Galaxy S10e",
"Galaxy S10",
"Galaxy S10+"
]
},
{
"name":" Google",
"models":[
"Pixel 3",
"Pixel 3a"
]
}
]
}

预期输出。
output after falttening

下面是代码片段。

private static org.apache.spark.sql.Dataset flattenJSONdf(
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> ds) {
  org.apache.spark.sql.types.StructField[] fields = ds.schema().fields();
  java.util.List<String> fieldsNames = new java.util.ArrayList<>();
  for (org.apache.spark.sql.types.StructField s : fields) {
fieldsNames.add(s.name());
  }

  for (int i = 0; i < fields.length; i++) {

org.apache.spark.sql.types.StructField field = fields[i];
org.apache.spark.sql.types.DataType fieldType = field.dataType();
String fieldName = field.name();

if (fieldType instanceof org.apache.spark.sql.types.ArrayType) {
 java.util.List<String> fieldNamesExcludingArray = new java.util.ArrayList<String>();
 for (String fieldName_index : fieldsNames) {
  if (!fieldName.equals(fieldName_index))
fieldNamesExcludingArray.add(fieldName_index);
 }

 java.util.List<String> fieldNamesAndExplode = new java.util.ArrayList<>(
fieldNamesExcludingArray);
 String s = String.format("explode_outer(%s) as %s", fieldName,
fieldName);
 fieldNamesAndExplode.add(s);

 String[] exFieldsWithArray = new String[fieldNamesAndExplode
.size()];
 org.apache.spark.sql.Dataset exploded_ds = ds
.selectExpr(fieldNamesAndExplode
  .toArray(exFieldsWithArray));

 // explodedDf.show();

 return flattenJSONdf(exploded_ds);

} else if (fieldType instanceof org.apache.spark.sql.types.StructType) {

 String[] childFieldnames_struct = ((org.apache.spark.sql.types.StructType) fieldType)
.fieldNames();

 java.util.List<String> childFieldnames = new java.util.ArrayList<>();
 for (String childName : childFieldnames_struct) {
  childFieldnames.add(fieldName + "." + childName);
 }

 java.util.List<String> newfieldNames = new java.util.ArrayList<>();
 for (String fieldName_index : fieldsNames) {
  if (!fieldName.equals(fieldName_index))
newfieldNames.add(fieldName_index);
 }

 newfieldNames.addAll(childFieldnames);

 java.util.List<org.apache.spark.sql.Column> renamedStrutctCols = new java.util.ArrayList<>();

 for (String newFieldNames_index : newfieldNames) {
  renamedStrutctCols.add(new org.apache.spark.sql.Column(
 newFieldNames_index.toString())
 .as(newFieldNames_index.toString()
.replace(".", "_")));
 }

 scala.collection.Seq renamedStructCols_seq = scala.collection.JavaConverters
.collectionAsScalaIterableConverter(renamedStrutctCols)
.asScala().toSeq();

 org.apache.spark.sql.Dataset ds_struct = ds
.select(renamedStructCols_seq);

 return flattenJSONdf(ds_struct);
}

  }
  return ds;
 }

推荐答案

Note代码位于scala&amp;我已使用Spark Structured Streaming

可以使用org.apache.spark.sql.functions.explode函数展平数组列。请检查以下代码。

import org.apache.spark.sql.types._

val schema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"phones","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"models","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"watches","type":{"type":"struct","fields":[{"name":"models","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
// schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(name,StringType,true), StructField(phones,ArrayType(StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true),true), StructField(watches,StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true))

val streamDF = spark.readStream.format("json").schema(schema).load("/tmp/jdata")
// streamDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

streamDF
.withColumn("watches_models",explode($"watches.models")).withColumn("watches_name",$"watches.name")
.withColumn("phones_models",explode($"phones.models")).withColumn("phones_models",explode($"phones_models"))
.withColumn("phones_name",explode($"phones.name"))
.drop("watches","phones")
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+--------------------+------------+--------------+-----------+
|age|  name|watches_models|watches_name| phones_models|phones_name|
+---+------+--------------------+------------+--------------+-----------+
| 26| Akash|Apple Watch Series 5| Apple|iphone X|Apple|
| 26| Akash|Apple Watch Series 5| Apple|iphone X| Samsung|
| 26| Akash|Apple Watch Series 5| Apple|iphone X|  Google|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XR|Apple|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XR| Samsung|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XR|  Google|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XS|Apple|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XS| Samsung|
| 26| Akash|Apple Watch Series 5| Apple|  iphone XS|  Google|
| 26| Akash|Apple Watch Series 5| Apple|  iphone 11|Apple|
| 26| Akash|Apple Watch Series 5| Apple|  iphone 11| Samsung|
| 26| Akash|Apple Watch Series 5| Apple|  iphone 11|  Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro|Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro|  Google|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10|Apple|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10|  Google|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+|Apple|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+| Samsung|
+---+------+--------------------+------------+--------------+-----------+
only showing top 20 rows

好了关于有什么办法可以扁平化电光流媒体中的嵌套JSON吗?的教程就到这里就结束了,希望趣模板源码网找到的这篇技术文章能帮助到大家,更多技术教程可以在站内搜索。

0
没有账号?注册  忘记密码?