Aviso de causa UDF: CachedKafkaConsumer não está sendo executado no UninterruptibleThread (KAFKA-1894)
Em um habitualstruct_kafka_wordcount.py código,
Quando divido linhas em palavras porudf
como abaixo,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
o aviso continuará mostrando:
AVISO CachedKafkaConsumer: CachedKafkaConsumer não está sendo executado em UninterruptibleThread. Pode travar quando os métodos do CachedKafkaConsumerer são interrompidos devido ao KAFKA-1894
Por outro lado, quando eu divido as linhas em palavras porpyspark.sql.functions.split
, tudo funciona bem.
words = lines.select(
explode(
split(lines.value, ' ')
)
)
Por que isso aconteceu e como corrigir o aviso?
Este é o código que estou tentando executar na prática:
pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))