Implementação do Akka-Stream mais lenta que a implementação de thread único
ATUALIZAÇÃO DE 30-10-2015
baseado em Roland Kuhn Awnser:
O Akka Streams está usando a passagem assíncrona de mensagens entre os atores para implementar estágios de processamento do fluxo. A passagem de dados através de um limite assíncrono tem uma sobrecarga que você está vendo aqui: seu cálculo parece demorar apenas cerca de 160ns (derivado da medição de thread único) enquanto a solução de streaming leva aproximadamente 1µs por elemento, que é dominado pela passagem de mensagens.
Outro equívoco é que dizer "fluxo" implica paralelismo: no seu código, toda a computação é executada seqüencialmente em um único ator (o estágio do mapa), de modo que nenhum benefício pode ser esperado sobre a solução primitiva de encadeamento único.
Para se beneficiar do paralelismo proporcionado pelo Akka Streams, é necessário ter vários estágios de processamento, cada um executando tarefas de
1µs por elemento, veja também os documentos.
Eu fiz algumas mudanças. Meu código agora se parece com:
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)
val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)
val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
import FlowGraph.Implicits._
val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
val mergeEvents = builder.add(Merge[Int](4))
dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)
(dispatchTuple.in, mergeEvents.out)
}
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink)
// in.via(eventChef).runWith(sink)
}
}
Não tenho certeza se entendi algo totalmente errado, mas ainda assim minha implementação com akka-streams é muito mais lenta (agora ainda mais lenta como antes), mas o que descobri é: Se eu aumentar o trabalho, por exemplo, fazendo alguma divisão, a implementação com akka -streams fica mais rápido. Portanto, se eu acertar (me corrija de outra forma), parece haver muita sobrecarga no meu exemplo. Então você só se beneficia do akka-streams se o código precisar fazer um trabalho pesado?
Sou relativamente novo no scala e no akka-stream. Escrevi um pequeno projeto de teste que cria alguns eventos até que um contador atinja um número específico. Para cada evento, o fatorial de um campo do evento está sendo calculado. Eu implementei isso duas vezes. Uma vez com akka-stream e outra sem akka-stream (thread único) e comparou o tempo de execução.
Eu não esperava isso: Quando eu crio um único evento, o tempo de execução de ambos os programas é quase o mesmo. Mas se eu criar 70.000.000 de eventos, a implementação sem o akka-streams é muito mais rápida. Aqui estão meus resultados (os seguintes dados são baseados em 24 medições):
Evento único sem akka-streams: 403 (+ - 2) msEvento único com akka-streams: 444 (+ -13) ms
70Mio eventos sem akka-streams: 11778 (+ -70) ms
70Mio eventos com akka-steams: 75424 (+ - 2959) msEntão, minha pergunta é: o que está acontecendo? Por que minha implementação com o akka-stream é mais lenta?
aqui meu código:
Implementação com Akka
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink).onComplete(_ => endAkka())
}
}
Implementação sem Akka
objeto SingleThread {
def main(args: Array[String]) {
println("SingleThread started at: " + SharedFunctions.startTime)
println("0%")
val i = createEvent(0)
val duration = new Duration(SharedFunctions.startTime, DateTime.now());
println("Time: " + duration.getMillis + " || Data: " + i)
}
def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
if (count == SharedFunctions.maxEventCount) count
else {
val e = SharedFunctions.transform((randDate, name, age, myFloat))
SharedFunctions.transform2(e)
val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
}
}
def createEvent(count: Int): Int = {
createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
}
}
SharedFunctions
object SharedFunctions {
val maxEventCount = 70000000
val startTime = DateTime.now
def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
def transform2(e : Event) : Int = factorial(e.getAgeYrs)
def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
val cProgress = calculatePercentage(fileSize, currentSize)
if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
cProgress
}
private def factorialWorker(n1: Int, n2: Int): Int = {
if (n1 == 0) n2
else factorialWorker(n1 -1, n2*n1)
}
def factorial (n : Int): Int = {
factorialWorker(n, 1)
}
}
Evento de Implementação
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public long timestampMS;
@Deprecated public CharSequence name;
@Deprecated public int ageYrs;
@Deprecated public float sizeCm;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Event() {}
/**
* All-args constructor.
*/
public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
this.timestampMS = timestampMS;
this.name = name;
this.ageYrs = ageYrs;
this.sizeCm = sizeCm;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public Object get(int field$) {
switch (field$) {
case 0: return timestampMS;
case 1: return name;
case 2: return ageYrs;
case 3: return sizeCm;
defau,lt: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, Object value$) {
switch (field$) {
case 0: timestampMS = (Long)value$; break;
case 1: name = (CharSequence)value$; break;
case 2: ageYrs = (Integer)value$; break;
case 3: sizeCm = (Float)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'timestampMS' field.
*/
public Long getTimestampMS() {
return timestampMS;
}
/**
* Sets the value of the 'timestampMS' field.
* @param value the value to set.
*/
public void setTimestampMS(Long value) {
this.timestampMS = value;
}
/**
* Gets the value of the 'name' field.
*/
public CharSequence getName() {
return name;
}
/**
* Sets the value of the 'name' field.
* @param value the value to set.
*/
public void setName(CharSequence value) {
this.name = value;
}
/**
* Gets the value of the 'ageYrs' field.
*/
public Integer getAgeYrs() {
return ageYrs;
}
/**
* Sets the value of the 'ageYrs' field.
* @param value the value to set.
*/
public void setAgeYrs(Integer value) {
this.ageYrs = value;
}
/**
* Gets the value of the 'sizeCm' field.
*/
public Float getSizeCm() {
return sizeCm;
}
/**
* Sets the value of the 'sizeCm' field.
* @param value the value to set.
*/
public void setSizeCm(Float value) {
this.sizeCm = value;
}
/** Creates a new Event RecordBuilder */
public static Event.Builder newBuilder() {
return new Event.Builder();
}
/** Creates a new Event RecordBuilder by copying an existing Builder */
public static Event.Builder newBuilder(Event.Builder other) {
return new Event.Builder(other);
}
/** Creates a new Event RecordBuilder by copying an existing Event instance */
public static Event.Builder newBuilder(Event other) {
return new Event.Builder(other);
}
/**
* RecordBuilder for Event instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
implements org.apache.avro.data.RecordBuilder<Event> {
private long timestampMS;
private CharSequence name;
private int ageYrs;
private float sizeCm;
/** Creates a new Builder */
private Builder() {
super(Event.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(Event.Builder other) {
super(other);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Creates a Builder by copying an existing Event instance */
private Builder(Event other) {
super(Event.SCHEMA$);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Gets the value of the 'timestampMS' field */
public Long getTimestampMS() {
return timestampMS;
}
/** Sets the value of the 'timestampMS' field */
public Event.Builder setTimestampMS(long value) {
validate(fields()[0], value);
this.timestampMS = value;
fieldSetFlags()[0] = true;
return this;
}
/** Checks whether the 'timestampMS' field has been set */
public boolean hasTimestampMS() {
return fieldSetFlags()[0];
}
/** Clears the value of the 'timestampMS' field */
public Event.Builder clearTimestampMS() {
fieldSetFlags()[0] = false;
return this;
}
/** Gets the value of the 'name' field */
public CharSequence getName() {
return name;
}
/** Sets the value of the 'name' field */
public Event.Builder setName(CharSequence value) {
validate(fields()[1], value);
this.name = value;
fieldSetFlags()[1] = true;
return this;
}
/** Checks whether the 'name' field has been set */
public boolean hasName() {
return fieldSetFlags()[1];
}
/** Clears the value of the 'name' field */
public Event.Builder clearName() {
name = null;
fieldSetFlags()[1] = false;
return this;
}
/** Gets the value of the 'ageYrs' field */
public Integer getAgeYrs() {
return ageYrs;
}
/** Sets the value of the 'ageYrs' field */
public Event.Builder setAgeYrs(int value) {
validate(fields()[2], value);
this.ageYrs = value;
fieldSetFlags()[2] = true;
return this;
}
/** Checks whether the 'ageYrs' field has been set */
public boolean hasAgeYrs() {
return fieldSetFlags()[2];
}
/** Clears the value of the 'ageYrs' field */
public Event.Builder clearAgeYrs() {
fieldSetFlags()[2] = false;
return this;
}
/** Gets the value of the 'sizeCm' field */
public Float getSizeCm() {
return sizeCm;
}
/** Sets the value of the 'sizeCm' field */
public Event.Builder setSizeCm(float value) {
validate(fields()[3], value);
this.sizeCm = value;
fieldSetFlags()[3] = true;
return this;
}
/** Checks whether the 'sizeCm' field has been set */
public boolean hasSizeCm() {
return fieldSetFlags()[3];
}
/** Clears the value of the 'sizeCm' field */
public Event.Builder clearSizeCm() {
fieldSetFlags()[3] = false;
return this;
}
@Override
public Event build() {
try {
Event record = new Event();
record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}