Правильно ли использовать многопоточную очередь в python?
Я пытаюсь использовать Очередь в Python, который будет многопоточным. Я просто хотел знать, правильный ли подход я использую или нет. И если я делаю что-то излишнее или если есть лучший подход, который я должен использовать.
Я пытаюсь получить новые запросы из таблицы и запланировать их, используя некоторую логику для выполнения некоторой операции, такой как выполнение запроса.
Так что здесь из основного потока я породил отдельный поток для очереди.
if __name__=='__main__':
request_queue = SetQueue(maxsize=-1)
worker = Thread(target=request_queue.process_queue)
worker.setDaemon(True)
worker.start()
while True:
try:
#Connect to the database get all the new requests to be verified
db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
#Get new requests for verification
verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %
(username_testschema, 'INITIATED'))
#If there are some requests to be verified, put them in the queue.
if len(verify_these) > 0:
for row in verify_these:
print "verifying : %s" % row[0]
verify_id = row[0]
request_queue.put(verify_id)
except Exception as e:
logger.exception(e)
finally:
time.sleep(10)
Теперь в классе Setqueue у меня есть функция process_queue, которая используется для обработки 2 верхних запросов при каждом запуске, которые были добавлены в очередь.
'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''
class SetQueue(Queue.Queue):
def _init(self, maxsize):
Queue.Queue._init(self, maxsize)
self.all_items = set()
def _put(self, item):
if item not in self.all_items:
Queue.Queue._put(self, item)
self.all_items.add(item)
'''
The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
This way max two requests per run will be processed.
'''
def process_queue(self):
while True:
scheduler_obj = Scheduler()
try:
if self.qsize() > 0:
for i in range(2):
job_id = self.get()
t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
t.start()
for i in range(2):
t.join(timeout=1)
self.task_done()
except Exception as e:
logger.exception(
"QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
finally:
time.sleep(10)
Я хочу посмотреть, правильное ли мое понимание и могут ли быть какие-то проблемы с ним.
Таким образом, основной поток, запущенный в то время, как True в основной функции соединяется с базой данных, получает новые запросы и помещает их в очередь. Рабочий поток (демон) для очереди продолжает получать новые запросы от очереди и потоков, не являющихся демонами, которые выполняют обработку, и, поскольку время ожидания для соединения равно 1, рабочий поток будет продолжать принимать новые запросы без блокировки, и его дочерний поток будет продолжать обработку в фоновом режиме. Правильный?
Таким образом, в случае выхода из основного процесса они не будут убиты, пока не завершат свою работу, но поток рабочего демона завершится. Сомнение: если родитель - это демон, а ребенок - не демон, и если родитель выходит, выходит ли ребенок?).
Я также читал здесь:Дэвид Бизли многопроцессорный
Дэвид Бизли использует пул в качестве потокового сопроцессора, где он пытается решить аналогичную проблему. Поэтому я должен следовать его шагам: - 1. Создать пул процессов. 2. Откройте поток, как я делаю для request_queue 3. В этой теме
def process_verification_queue(self):
while True:
try:
if self.qsize() > 0:
job_id = self.get()
pool.apply_async(Scheduler.verify_func, args=(job_id,))
except Exception as e:
logger.exception("QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
Используйте процесс из пула и запускайте verify_func параллельно. Это даст мне больше производительности?