Filtrar filas en función de los valores de las columnas en la escala de marco de datos de chispa

Tengo un marco de datos (chispa):

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

Quiero crear un nuevo marco de datos:

3 0
3 1
4 1

Necesito eliminar todas las filas después de 1 (valor) para cada id. Intenté con funciones de ventana en spark dateframe (Scala). Pero no pude encontrar una solución. Parece ser que voy en una dirección equivocada.

Estoy buscando una solución en Scala. Gracias

Salida usando monotonically_increasing_id

 scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]

scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]

scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

La solución no funcionaría si hiciéramos una transformación ordenada en el marco de datos original. Esa vez elmonotonically_increasing_id () se genera en función del DF original en lugar del DF ordenado. He omitido ese requisito antes.

Todas las sugerencias son bienvenidas.

Respuestas a la pregunta(4)

Su respuesta a la pregunta