Instruções de seleção da API Java do Spark Datastax
Estou usando um tutorial aqui neste Github para executar o spark no cassandra usando um projeto java maven:https://github.com/datastax/spark-cassandra-connector.
Eu descobri como usar instruções CQL diretas, como já fiz uma pergunta sobre isso aqui:Consultando dados no Cassandra via Spark em um projeto Java Maven
No entanto, agora estou tentando usar a API java datastax com medo de que meu código original na minha pergunta original não funcione para a versão Datastax do Spark e Cassandra. Por algum motivo estranho, não me deixa usar.where
mesmo que esteja descrito na documentação que eu posso usar essa declaração exata. Aqui está o meu código:
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App implements Serializable
{
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person() { }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getfname() { return fname; }
public void setfname(String fname) { this.fname = fname; }
public String getlname() { return lname; }
public void setlname(String lname) { this.lname = lname; }
public String getrole() { return role; }
public void setrole(String role) { this.role = role; }
// other methods, constructors, etc.
}
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
aqui está o erro:
14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$fetchTokenRange(CassandraRDD.scala:317)
at com.datastax.spark.connector.rdd.CassandraRDD$anonfun$13.apply(CassandraRDD.scala:338)
at com.datastax.spark.connector.rdd.CassandraRDD$anonfun$13.apply(CassandraRDD.scala:338)
at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.rdd.RDD$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.SparkContext$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$fetchTokenRange(CassandraRDD.scala:317)
at com.datastax.spark.connector.rdd.CassandraRDD$anonfun$13.apply(CassandraRDD.scala:338)
at com.datastax.spark.connector.rdd.CassandraRDD$anonfun$13.apply(CassandraRDD.scala:338)
at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.rdd.RDD$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.SparkContext$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$anonfun$starthttps://github.com/datastax/spark-cassandra-connector$anonEu descobri como usar instruções CQL diretas, como já fiz uma pergunta sobre isso aqui:$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
Eu sei que meu erro está especificamente nesta seção:
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
Quando eu removo o.where()
, funciona. Mas diz especificamente no github que você deve ser capaz de executar as funções .where e .map, respectivamente. Alguém tem algum tipo de raciocínio para isso? ou solução? Obrigado.
editar Eu recebo o erro de desaparecer quando eu uso esta instrução:
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("id=?", "1").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
Não faço ideia do porquê dessa opção funcionar, mas não do resto das minhas variações. Aqui estão as instruções que eu executei no meu cql para que você saiba como é o meu espaço para chaves:
session.execute("DROP KEYSPACE IF EXISTS tester");
session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0001," +
"'Angel'," +
"'Pay'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0002," +
"'John'," +
"'Doe'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0003," +
"'Jane'," +
"'Doe'," +
"'IT Analyst'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0001," +
"'Angel'," +
"'Pay'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0002," +
"'John'," +
"'Doe'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0003," +
"'Jane'," +
"'Doe'," +
"'IT Analyst'" +
");");
session.execute(
"INSERT INTO tester.dept (id, dname) " +
"VALUES (" +
"1553," +
"'Commerce'" +
");");