Implementação do Akka-Stream mais lenta que a implementação de thread único


baseado em Roland Kuhn Awnser:

O Akka Streams está usando a passagem assíncrona de mensagens entre os atores para implementar estágios de processamento do fluxo. A passagem de dados através de um limite assíncrono tem uma sobrecarga que você está vendo aqui: seu cálculo parece demorar apenas cerca de 160ns (derivado da medição de thread único) enquanto a solução de streaming leva aproximadamente 1µs por elemento, que é dominado pela passagem de mensagens.

Outro equívoco é que dizer "fluxo" implica paralelismo: no seu código, toda a computação é executada seqüencialmente em um único ator (o estágio do mapa), de modo que nenhum benefício pode ser esperado sobre a solução primitiva de encadeamento único.

Para se beneficiar do paralelismo proporcionado pelo Akka Streams, é necessário ter vários estágios de processamento, cada um executando tarefas de

1µs por elemento, veja também os documentos.

Eu fiz algumas mudanças. Meu código agora se parece com:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~>
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~>
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~>
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~>

    (, mergeEvents.out)

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime,
    println("Time: " + duration.getMillis + " || Data: " + counter)

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
   // in.via(eventChef).runWith(sink)


Não tenho certeza se entendi algo totalmente errado, mas ainda assim minha implementação com akka-streams é muito mais lenta (agora ainda mais lenta como antes), mas o que descobri é: Se eu aumentar o trabalho, por exemplo, fazendo alguma divisão, a implementação com akka -streams fica mais rápido. Portanto, se eu acertar (me corrija de outra forma), parece haver muita sobrecarga no meu exemplo. Então você só se beneficia do akka-streams se o código precisar fazer um trabalho pesado?

Sou relativamente novo no scala e no akka-stream. Escrevi um pequeno projeto de teste que cria alguns eventos até que um contador atinja um número específico. Para cada evento, o fatorial de um campo do evento está sendo calculado. Eu implementei isso duas vezes. Uma vez com akka-stream e outra sem akka-stream (thread único) e comparou o tempo de execução.

Eu não esperava isso: Quando eu crio um único evento, o tempo de execução de ambos os programas é quase o mesmo. Mas se eu criar 70.000.000 de eventos, a implementação sem o akka-streams é muito mais rápida. Aqui estão meus resultados (os seguintes dados são baseados em 24 medições):

Evento único sem akka-streams: 403 (+ - 2) ms

Evento único com akka-streams: 444 (+ -13) ms

70Mio eventos sem akka-streams: 11778 (+ -70) ms

70Mio eventos com akka-steams: 75424 (+ - 2959) ms

Então, minha pergunta é: o que está acontecendo? Por que minha implementação com o akka-stream é mais lenta?

aqui meu código:

Implementação com Akka

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime,
    println("Time: " + duration.getMillis + " || Data: " + counter)

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink).onComplete(_ => endAkka())


Implementação sem Akka

objeto SingleThread {

  def main(args: Array[String]) {
    println("SingleThread started at: " + SharedFunctions.startTime)
    val i = createEvent(0)
    val duration = new Duration(SharedFunctions.startTime,;
    println("Time: " + duration.getMillis + " || Data: " + i)

  def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
    if (count == SharedFunctions.maxEventCount) count
    else {
      val e = SharedFunctions.transform((randDate, name, age, myFloat))
      val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, - SharedFunctions.startTime.getMillis)
      createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)

  def createEvent(count: Int): Int = {
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)


object SharedFunctions {
  val maxEventCount = 70000000
  val startTime =

  def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
  def transform2(e : Event) : Int = factorial(e.getAgeYrs)

  def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
  def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
    val cProgress = calculatePercentage(fileSize, currentSize)
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")

  private def factorialWorker(n1: Int, n2: Int): Int = {
    if (n1 == 0) n2
    else factorialWorker(n1 -1, n2*n1)
  def factorial (n : Int): Int = {
    factorialWorker(n, 1)

Evento de Implementação

 * Autogenerated by Avro

public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public long timestampMS;
  @Deprecated public CharSequence name;
  @Deprecated public int ageYrs;
  @Deprecated public float sizeCm;

   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>. 
  public Event() {}

   * All-args constructor.
  public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
    this.timestampMS = timestampMS; = name;
    this.ageYrs = ageYrs;
    this.sizeCm = sizeCm;

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call. 
  public Object get(int field$) {
    switch (field$) {
    case 0: return timestampMS;
    case 1: return name;
    case 2: return ageYrs;
    case 3: return sizeCm;
    defau,lt: throw new org.apache.avro.AvroRuntimeException("Bad index");
  // Used by DatumReader.  Applications should not call. 
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: timestampMS = (Long)value$; break;
    case 1: name = (CharSequence)value$; break;
    case 2: ageYrs = (Integer)value$; break;
    case 3: sizeCm = (Float)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

   * Gets the value of the 'timestampMS' field.
  public Long getTimestampMS() {
    return timestampMS;

   * Sets the value of the 'timestampMS' field.
   * @param value the value to set.
  public void setTimestampMS(Long value) {
    this.timestampMS = value;

   * Gets the value of the 'name' field.
  public CharSequence getName() {
    return name;

   * Sets the value of the 'name' field.
   * @param value the value to set.
  public void setName(CharSequence value) { = value;

   * Gets the value of the 'ageYrs' field.
  public Integer getAgeYrs() {
    return ageYrs;

   * Sets the value of the 'ageYrs' field.
   * @param value the value to set.
  public void setAgeYrs(Integer value) {
    this.ageYrs = value;

   * Gets the value of the 'sizeCm' field.
  public Float getSizeCm() {
    return sizeCm;

   * Sets the value of the 'sizeCm' field.
   * @param value the value to set.
  public void setSizeCm(Float value) {
    this.sizeCm = value;

  /** Creates a new Event RecordBuilder */
  public static Event.Builder newBuilder() {
    return new Event.Builder();

  /** Creates a new Event RecordBuilder by copying an existing Builder */
  public static Event.Builder newBuilder(Event.Builder other) {
    return new Event.Builder(other);

  /** Creates a new Event RecordBuilder by copying an existing Event instance */
  public static Event.Builder newBuilder(Event other) {
    return new Event.Builder(other);

   * RecordBuilder for Event instances.
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
    implements<Event> {

    private long timestampMS;
    private CharSequence name;
    private int ageYrs;
    private float sizeCm;

    /** Creates a new Builder */
    private Builder() {

    /** Creates a Builder by copying an existing Builder */
    private Builder(Event.Builder other) {
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      if (isValidValue(fields()[1], { = data().deepCopy(fields()[1].schema(),;
        fieldSetFlags()[1] = true;
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;

    /** Creates a Builder by copying an existing Event instance */
    private Builder(Event other) {
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      if (isValidValue(fields()[1], { = data().deepCopy(fields()[1].schema(),;
        fieldSetFlags()[1] = true;
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;

    /** Gets the value of the 'timestampMS' field */
    public Long getTimestampMS() {
      return timestampMS;

    /** Sets the value of the 'timestampMS' field */
    public Event.Builder setTimestampMS(long value) {
      validate(fields()[0], value);
      this.timestampMS = value;
      fieldSetFlags()[0] = true;
      return this; 

    /** Checks whether the 'timestampMS' field has been set */
    public boolean hasTimestampMS() {
      return fieldSetFlags()[0];

    /** Clears the value of the 'timestampMS' field */
    public Event.Builder clearTimestampMS() {
      fieldSetFlags()[0] = false;
      return this;

    /** Gets the value of the 'name' field */
    public CharSequence getName() {
      return name;

    /** Sets the value of the 'name' field */
    public Event.Builder setName(CharSequence value) {
      validate(fields()[1], value); = value;
      fieldSetFlags()[1] = true;
      return this; 

    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[1];

    /** Clears the value of the 'name' field */
    public Event.Builder clearName() {
      name = null;
      fieldSetFlags()[1] = false;
      return this;

    /** Gets the value of the 'ageYrs' field */
    public Integer getAgeYrs() {
      return ageYrs;

    /** Sets the value of the 'ageYrs' field */
    public Event.Builder setAgeYrs(int value) {
      validate(fields()[2], value);
      this.ageYrs = value;
      fieldSetFlags()[2] = true;
      return this; 

    /** Checks whether the 'ageYrs' field has been set */
    public boolean hasAgeYrs() {
      return fieldSetFlags()[2];

    /** Clears the value of the 'ageYrs' field */
    public Event.Builder clearAgeYrs() {
      fieldSetFlags()[2] = false;
      return this;

    /** Gets the value of the 'sizeCm' field */
    public Float getSizeCm() {
      return sizeCm;

    /** Sets the value of the 'sizeCm' field */
    public Event.Builder setSizeCm(float value) {
      validate(fields()[3], value);
      this.sizeCm = value;
      fieldSetFlags()[3] = true;
      return this; 

    /** Checks whether the 'sizeCm' field has been set */
    public boolean hasSizeCm() {
      return fieldSetFlags()[3];

    /** Clears the value of the 'sizeCm' field */
    public Event.Builder clearSizeCm() {
      fieldSetFlags()[3] = false;
      return this;

    public Event build() {
      try {
        Event record = new Event();
        record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]); = fieldSetFlags()[1] ? : (CharSequence) defaultValue(fields()[1]);
        record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
        record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);

