Связанные проблемы на GitHub

аюсь установить длительную подписку Pull на тему Google Cloud PubSub. Я использую код, очень похожий на пример, приведенный в документацииВотт.е.

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Проблема в том, что иногда я получаю следующую трассировку:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
    raise exception
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/path/to/grpc/_channel.py", line 348, in __next__
    return self._next()
  File "/path/to/grpc/_channel.py", line 342, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

Я видел, что это упоминалось вДругой вопрос но здесь я спрашиваю, как правильно обращаться с этим в Python. Я пытался обернуть запрос в исключение, но, кажется, он работает в фоновом режиме, и я не могу повторить в случае этой ошибки.

Ответы на вопрос(1)

Решение Вопроса

который работает для меня, это обычайpolicy_class, По умолчанию один имеетon_exception функция, которая игнорируетDEADLINE_EXCEEDED, Вы можете создать класс, который наследует по умолчанию, а также игнорируетUNAVAILABLE, Моя выглядит так:

from google.cloud import pubsub
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc

class AvailablePolicy(thread.Policy):
    def on_exception(self, exception):
        """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE.

        I'm not sure what triggers that error, but if you ignore it, your
        subscriber seems to work just fine. It's probably an intermittent
        thing and it reconnects later if you just give it a chance.
        """
        # If this is UNAVAILABLE, then we want to retry.
        # That entails just returning None.
        unavailable = grpc.StatusCode.UNAVAILABLE
        if getattr(exception, 'code', lambda: None)() == unavailable:
            return
        # For anything else, fallback on super.
        super(AvailablePolicy, self).on_exception(exception)

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy)
# Continue to set up as normal.

Это очень похоже наоригинал on_exception просто игнорирует другую ошибку. Если вы хотите, вы можете добавить некоторую регистрацию всякий раз, когда генерируется исключение, и убедиться, что все по-прежнему работает. Будущие сообщения все еще будут поступать.

 Krista Davis24 окт. 2017 г., 20:36
FWIW, естьпроблема с процессором примерно через час. Я думаю, что проблема заключается в утечке потока каждый раз, когда вы игнорируете ошибку (другие получают ее послеDEADLINE_EXCEEDED), но я не мог найти решение. Я вернулся к использованию старого API и делал свое собственное периодическое вытягивание вместо использования потокового материала.Этот пример было полезно для настройки этого.
 Blackus27 окт. 2017 г., 18:16
 adrpino25 окт. 2017 г., 15:45
Спасибо за ответ ... кажется чрезмерным из-за чего-то, что должно быть очень легко. Для выполнения поставленной задачи я использовалgolang клиентская библиотека, которая работает как шарм

Ваш ответ на вопрос