Aviso de causa UDF: CachedKafkaConsumer não está sendo executado no UninterruptibleThread (KAFKA-1894)

Em um habitualstruct_kafka_wordcount.py código,

Quando divido linhas em palavras porudf como abaixo,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)

o aviso continuará mostrando:

AVISO CachedKafkaConsumer: CachedKafkaConsumer não está sendo executado em UninterruptibleThread. Pode travar quando os métodos do CachedKafkaConsumerer são interrompidos devido ao KAFKA-1894

Por outro lado, quando eu divido as linhas em palavras porpyspark.sql.functions.split, tudo funciona bem.

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)

Por que isso aconteceu e como corrigir o aviso?

Este é o código que estou tentando executar na prática:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))

questionAnswers(1)

yourAnswerToTheQuestion