使用Spark 2.3.2。
我正在尝试使用DataFrame的某些列的值,并将它们放入现有的JSON结构中。假设我有这个DataFrame:
val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")
// used as key for nested json structure
val app = "appX"
基本上,我想从本专栏获得
{
"foo": "bar",
"meta": {
"app1": {
"p": "2",
"o": "100"
},
"app2": {
"p": "5",
"o": "200"
}
}
}
对此:
{
"meta": {
"app1": {
"p": "2",
"o": "100"
},
"app2": {
"p": "5",
"o": "200"
},
"appX": {
"p": "10",
"o": "1337"
}
}
}
基于列p
和o
DataFrame。
我试过了:
def process(inputDF: DataFrame, appName: String): DataFrame = {
val res = inputDF
.withColumn(appName, to_json(expr("(p, o)")))
.withColumn("meta", struct(get_json_object('key, "$.meta")))
.selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
.select(to_json('myStruct).as("newMeta"))
res.show(false)
res
}
val resultDF = process(testDF, app)
val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)
StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")
但是这个断言是不匹配的,因为我不能
appX
进入其他两个应用程序的相同级别测试失败,并显示以下信息:
Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
meta
内容p
,o
列到map
数据类型。map(lit(appX),struct($“ p”,$“ o”))map_concat
函数连接数据。检查下面的代码。
scala> testDF.show(false)
+---------------------------------------------------------------------------------+---+----+
|key |p |o |
+---------------------------------------------------------------------------------+---+----+
|{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}|10 |1337|
+---------------------------------------------------------------------------------+---+----+
创建schema
以转换string
为json
。
scala> val schema = new StructType().add("foo",StringType).add("meta",MapType(StringType,new StructType().add("p",StringType).add("o",StringType)))
打印模式
scala> schema.printTreeString
root
|-- foo: string (nullable = true)
|-- meta: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- p: string (nullable = true)
| | |-- o: string (nullable = true)
val appX = "appX"
testDF
.withColumn("key",from_json($"key",schema)) // convert json string to json using predefined schema.
.withColumn(
"key",
struct(
$"key.foo", // foo value from key column.
map_concat(
$"key.meta", // extracting meta from key column.
map(
lit(appX), // Constant appX value
struct($"p",$"o") // wrapping p, o values into struct.
) // converting appX,p,o into map(appX -> (p,o))
)
.as("meta") // giving alias to match existing meta in key.
) // using struct to combine foo, meta columns.
)
.select(to_json(struct($"key")).as("json_data")) // converting key value into json format.
.show(false)
最终输出
+-----------------------------------------------------------------------------------------------------------------+
|json_data |
+-----------------------------------------------------------------------------------------------------------------+
|{"key":{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}}|
+-----------------------------------------------------------------------------------------------------------------+
Spark版本> = 2.4.0
在UDF
&Case类帮助下。
定义案例类以保存p
和o
列值
scala> case class PO(p:String,o:String)
定义UDF以合并 map。
scala> val map_concat = udf((mp:Map[String,PO],mpa:Map[String,PO]) => mp ++ mpa)
scala> df
.withColumn("key",from_json($"key",schema))
.withColumn(
"key",
to_json(
struct(
$"key.foo",
map_concat(
$"key.meta",
map(
lit(app),
struct($"p",$"o")
)
).as("meta")
)
)
)
.show(false)
最终输出
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|key |p |o |newMap |
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|[bar,Map(app1 -> [2,100], app2 -> [5,200])]|10 |1337|{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}|
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+