IllegalArgumentException com Spark collect () no Jupyter

Eu tenho uma configuração com o Jupyter 4.3.0, Python 3.6.3 (Anaconda) e PySpark 2.2.1.

O exemplo a seguir falhará ao executar o Jupyter:

sc = SparkContext.getOrCreate()

rdd = sc.parallelize(['A','B','C'])
rdd.collect()

Abaixo está o rastreamento completo da pilha:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-35-0d4a2ca9edf4> in <module>()
      2 
      3 rdd = sc.parallelize(['A','B','C'])
----> 4 rdd.collect()

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$anonO mesmo exemplo é executado com sucesso usando o cliente pyspark. Também é executado com êxito (no Jupyter ou no cliente pyspark) ao usar$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$anonO mesmo exemplo é executado com sucesso usando o cliente pyspark. Também é executado com êxito (no Jupyter ou no cliente pyspark) ao usar$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$anonO exemplo a seguir falhará ao executar o Jupyter:$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$anonO exemplo a seguir falhará ao executar o Jupyter:$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$anonfun$org$apache$spark$util$ClosureCleaner$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$anonfun$org$apache$spark$util$ClosureCleaner$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

O mesmo exemplo é executado com sucesso usando o cliente pyspark. Também é executado com êxito (no Jupyter ou no cliente pyspark) ao usartake() ao invés decollect().

Alguma idéia do que pode estar acontecendo?Esta postagem sugere que possa haver algum bug no Spark 2.2.1. Prefiro não fazer o downgrade para o Spark 2.2.0, conforme sugerido, se possível.

ATUALIZAR: Estou executando no macOS High Sierra (10.13.3). Aqui está a saída desc._conf.getAll():

[('spark.sql.catalogImplementation', 'hive'),
 ('spark.app.id', 'local-1517752276379'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '55920'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.host', '192.168.1.5')]

E aqui estão mais algumas configurações de integração Jupyter-PySpark:

~/.ipython/profile_pyspark/startup/00-pyspark-setup.py

import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

~/Library/Jupyter/kernels/pyspark/kernel.json

{
    "display_name": "PySpark (Spark 2.2.1)",
    "language": "python",
    "argv": [
        "/Users/rodrygo/anaconda3/bin/python3",
        "-m",
        "ipykernel",
        "--profile=pyspark",
        "-f",
        "{connection_file}"
    ],
    "env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/usr/local/Cellar/apache-spark/2.2.1/libexec/"
    }
}

questionAnswers(2)

yourAnswerToTheQuestion