¿Pliegue monádico con mónada estatal en espacio constante (montón y pila)?

¿Es posible realizar un pliegue en la mónada de estado en el espacio de pila y pila constante? ¿O es una técnica funcional diferente que se adapta mejor a mi problema?

Las siguientes secciones describen el problema y un caso de uso motivador. Estoy usando Scala, pero las soluciones en Haskell también son bienvenidas.

Doblar en elState Mónada llena el montón

Supongamos Scalaz 7. Considere un pliegue monádico en la mónada estatal. Para evitar el desbordamiento de la pila, repasaremos el pliegue.

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

Para una gran colección.col, esto llenará el montón.

Creo que durante el pliegue, se crea un cierre (un mobit estatal) para cada valor en la colección (elx: R parámetro), llenando el montón. Ninguno de ellos puede ser evaluado hastarun 0 Se ejecuta, proporcionando el estado inicial.

¿Se puede evitar este uso de pila O (n)?

Más específicamente, ¿se puede proporcionar el estado inicial antes del pliegue para que la mónada estatal pueda ejecutarse durante cada enlace, en lugar de anidar los cierres para una evaluación posterior?

¿O puede construirse el pliegue de tal manera que se ejecute perezosamente después de que la mónada estatal searun? De esta manera, la próximax: R el cierre no se crearía hasta después de que los anteriores se hayan evaluado y hecho adecuados para la recolección de basura.

¿O hay un mejor paradigma funcional para este tipo de trabajo?

Ejemplo de aplicación

Pero quizás estoy usando la herramienta equivocada para el trabajo. A continuación se muestra la evolución de un caso de uso de ejemplo. ¿Estoy vagando por el camino equivocado aquí?

Considerarmuestreo de reservorio, es decir, recogiendo en una pasada un uniforme al azark Elementos de una colección demasiado grande para caber en la memoria. En Scala, tal función podría ser

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

y si es acorralado en elTraversableOnce tipo podría ser utilizado como este

val tenRandomInts = (Int.Min to Int.Max) sample 10

El trabajo realizado porsample es esencialmente unfold:

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

Sin embargo,update es un estado depende den, el número de elementos ya vistos. (También depende de un RNG, pero por simplicidad supongo que es global y con estado. Las técnicas utilizadas para manejarn Se extendería trivialmente). Entonces, ¿cómo manejar este estado?

La solución impura es simple y se ejecuta con una pila y un montón constantes.

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

Pero ¿qué pasa con una solución puramente funcional?update debe tomarn como un parámetro adicional y devuelva el nuevo valor junto con la muestra actualizada. Podríamos incluirn en el estado implícito, el acumulador de pliegue, por ejemplo,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

Pero eso oscurece la intención; Sólo pretendemos realmente acumular el vector de muestra. Este problema parece estar listo para la mónada estatal y un pliegue de izquierda monádico. Intentemoslo de nuevo.

Usaremos Scalaz 7, con estas importaciones.

import scalaz._
import Scalaz._
import scalaz.std.iterable_

y operar sobre unaIterable[A], ya que Scalaz no soporta el plegado monádico de unTraversable.

sample ahora está definido

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

donde la actualización es

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

Desafortunadamente, esto sopla la pila en una colección grande.

Así que vamos a replantearlo.sample es ahora

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

donde la actualización es

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

Esto corrige el desbordamiento de pila, pero aún sopla el montón para colecciones muy grandes (o montones muy pequeños). Se crea una función anónima por valor en la colección durante el pliegue (creo que se cierra sobre cadax: A parámetro), consumiendo el montón antes de que el trampolín incluso se ejecute. (FWIW, la versión de estado también tiene este problema; el desbordamiento de pila simplemente aparece primero con colecciones más pequeñas).

Respuestas a la pregunta(2)

Su respuesta a la pregunta