Agregar parte de la columna del esquema principal al elemento secundario en JSON anidado en el marco de datos de chispa

Tengo debajo de xml que estoy tratando de cargar en el marco de datos de chispa.

   <?xml version="1.0"?>
<env:ContentEnvelope xsi:schemaLocation="http">
    <env:Header>
        <env:Info>
            <env:Id>urn:uuid:6d2af93bfbfc49da9805aebb6a38996d</env:Id>
            <env:TimeStamp>20171122T07:56:09+00:00</env:TimeStamp>
        </env:Info>
        <fun:OrgId>18227</fun:OrgId>
        <fun:DataPartitionId>1</fun:DataPartitionId>
    </env:Header>
    <env:Body minVers="0.0" majVers="1" contentSet="Fundamental">
        <env:ContentItem action="Overwrite">
            <env:Data xsi:type="sr:FinancialSourceDataItem">
                <sr:Source sourceId="344" organizationId="4295906830">
                    <sr:FilingDateTime>20171111T17:00:00+00:00</sr:FilingDateTime>
                    <sr:SourceTypeCode>10K</sr:SourceTypeCode>
                    <sr:StatementDate>20171030T00:00:00+00:00</sr:StatementDate>
                    <sr:IsFilingDateTimeEstimated>false</sr:IsFilingDateTimeEstimated>
                    <sr:ContainsPreliminaryData>false</sr:ContainsPreliminaryData>
                    <sr:CapitalChangeAdjustmentDate>20171030T00:00:00+00:00</sr:CapitalChangeAdjustmentDate>
                    <sr:CumulativeAdjustmentFactor>1.00000</sr:CumulativeAdjustmentFactor>
                    <sr:ContainsRestatement>false</sr:ContainsRestatement>
                    <sr:FilingDateTimeUTCOffset>300</sr:FilingDateTimeUTCOffset>
                    <sr:ThirdPartySourceCode>SS</sr:ThirdPartySourceCode>
                    <sr:ThirdPartySourcePriority>1</sr:ThirdPartySourcePriority>
                    <sr:Auditors>
                        <sr:Auditor auditorId="3541">
                            <sr:AuditorOpinionCode>UNQ</sr:AuditorOpinionCode>
                            <sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
                            <sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
                            <sr:AuditorEnumerationId>3024068</sr:AuditorEnumerationId>
                            <sr:AuditorOpinionId>3010546</sr:AuditorOpinionId>
                            <sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
                        </sr:Auditor>
                        <sr:Auditor auditorId="9574">
                            <sr:AuditorOpinionCode>UWE</sr:AuditorOpinionCode>
                            <sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
                            <sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
                            <sr:AuditorEnumerationId>3030421</sr:AuditorEnumerationId>
                            <sr:AuditorOpinionId>3010547</sr:AuditorOpinionId>
                            <sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
                        </sr:Auditor>
                    </sr:Auditors>
                    <sr:SourceTypeId>3011835</sr:SourceTypeId>
                    <sr:ThirdPartySourceCodeId>1000716240</sr:ThirdPartySourceCodeId>
                </sr:Source>
            </env:Data>
        </env:ContentItem>
    </env:Body>
</env:ContentEnvelope>

La etiqueta principal es<env:ContentEnvelope> Luego hay dos encabezados de la primera parte (<env:Header>) y otro es cuerpo (<env:Body)

Los detalles en el cuerpo como<fun:OrgId> y<fun:DataPartitionId> será igual para todas las filas en el<env:Body.

De esto quiero crear dos marcos de datos.

Uno para<sr:Source y el segundo para<sr:Auditor

Para ambos marcos de datosaction="Overwrite" será lo mismo que una columna común.

También porque<sr:Auditor es hijo de<sr:Source tan pocas columnas comosourceId="344" organizationId="4295906830" se repetirá en el<sr:Auditor marco de datos.

Esto es lo que he hecho hasta ahora para lograr esto

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")

    val dfHeader = dfContentEnvelope.withColumn("Header", (dfContentEnvelope("env:Header"))).select("Header.*")
    val dfDataPartitionId =dfHeader.select("fun:DataPartitionId")
    //dfDataPartitionId.show()

   //val dfBody = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:Body").load("s3://trfsmallfffile/XML")
    val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
    val dfType=dfContentItem.select("env:Data.*")
    //dfType.show()

      val srSource = dfType.withColumn("srSource", (dfType("sr:Source"))).select("srSource.*").drop("sr:Auditors").filter($"srSource".isNotNull)
     val srSourceAuditor = dfType.withColumn("srSource", explode(dfType("sr:Source.sr:Auditors.sr:Auditor"))).select("srSource.*")

Entonces, mi pregunta es cómo puedo obtener el marco de datos para padres<sr:Source y marco de datos hijo para<sr:Auditor con organizationId y sourceId desde el marco de datos padre a hijo?

Respuestas a la pregunta(1)

Su respuesta a la pregunta