Warm tip: This article is reproduced from serverfault.com, please click

其他-将值添加到Spark DataFrame列中的现有嵌套json中

(其他 - Add values into existing nested json in a Spark DataFrame column)

发布于 2020-11-27 16:05:11

使用Spark 2.3.2。

我正在尝试使用DataFrame的某些列的值,并将它们放入现有的JSON结构中。假设我有这个DataFrame:

val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")

// used as key for nested json structure
val app = "appX"

基本上,我想从本专栏获得

{
  "foo": "bar",
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    }
  }
}

对此:

{
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    },
    "appX": {
      "p": "10",
      "o": "1337"
    }
  }
}

基于列poDataFrame。

我试过了:

def process(inputDF: DataFrame, appName: String): DataFrame = {
  val res = inputDF
    .withColumn(appName, to_json(expr("(p, o)")))
    .withColumn("meta", struct(get_json_object('key, "$.meta")))
    .selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
    .select(to_json('myStruct).as("newMeta"))

  res.show(false)
  res
}

val resultDF = process(testDF, app)

val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)

StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")

但是这个断言是不匹配的,因为我不能

  • 使内容appX进入其他两个应用程序相同级别
  • 不知道如何正确处理引号,并且
  • 不知道如何将“ col1”重命名为“ meta”。

测试失败,并显示以下信息:

Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual   :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
Questioner
mike
Viewed
11
Srinivas 2020-12-03 00:37:24
  1. 提取meta内容
  2. 转换po列到map数据类型。map(lit(appX),struct($“ p”,$“ o”))
  3. 然后使用map_concat函数连接数据。

检查下面的代码。

scala> testDF.show(false)
+---------------------------------------------------------------------------------+---+----+
|key                                                                              |p  |o   |
+---------------------------------------------------------------------------------+---+----+
|{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}|10 |1337|
+---------------------------------------------------------------------------------+---+----+

创建schema以转换stringjson

scala> val schema = new StructType().add("foo",StringType).add("meta",MapType(StringType,new StructType().add("p",StringType).add("o",StringType)))

打印模式

scala> schema.printTreeString
root
 |-- foo: string (nullable = true)
 |-- meta: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- p: string (nullable = true)
 |    |    |-- o: string (nullable = true)
val appX = "appX"

testDF
.withColumn("key",from_json($"key",schema)) // convert json string to json using predefined schema.
.withColumn(
    "key",
    struct(
        $"key.foo", // foo value from key column.
        map_concat(
            $"key.meta", // extracting meta from key column.
            map(
                lit(appX), // Constant appX value
                struct($"p",$"o") // wrapping p, o values into struct.
            ) // converting appX,p,o into map(appX -> (p,o))
        )
        .as("meta") // giving alias to match existing meta in key.
    ) // using struct to combine foo, meta columns.
)
.select(to_json(struct($"key")).as("json_data")) // converting key value into json format.
.show(false)

最终输出

+-----------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------+
|{"key":{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}}|
+-----------------------------------------------------------------------------------------------------------------+

Spark版本> = 2.4.0

UDF&Case类帮助下。

定义案例类以保存po列值

scala> case class PO(p:String,o:String)

定义UDF以合并 map。

scala> val map_concat = udf((mp:Map[String,PO],mpa:Map[String,PO]) => mp ++ mpa)
scala> df
.withColumn("key",from_json($"key",schema))
.withColumn(
    "key",
    to_json(
        struct(
            $"key.foo",
            map_concat(
                $"key.meta",
                map(
                    lit(app),
                    struct($"p",$"o")
                )
            ).as("meta")
        )
    )
)
.show(false)

最终输出

+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|key                                        |p  |o   |newMap                                                                                                   |
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|[bar,Map(app1 -> [2,100], app2 -> [5,200])]|10 |1337|{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}|
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+