), таймер будет работать бесконечно.

у постоянно выполнять функцию в Python каждые 60 секунд навсегда (так же, какNSTimer в Цель C). Этот код будет работать как демон, и он по сути похож на вызов сценария python каждую минуту с использованием cron, но без необходимости его настройки пользователем.

Вэтот вопрос о cron, реализованном в Pythonрешение, кажется, эффективно простоспать() на х секунд. Мне не нужны такие расширенные функциональные возможности, так что, возможно, что-то подобное будет работать

while True:
    # Code executed here
    time.sleep(60)

Есть ли предсказуемые проблемы с этим кодом?

 Banana27 янв. 2015 г., 19:39
Мне все еще интересноЕсть ли предсказуемые проблемы с этим кодом?
 Simon23 янв. 2009 г., 22:12
Педантичный момент, но может быть критическим, ваш код выше кода не выполняется каждые 60 секунд, это ставит 60-секундный промежуток между выполнениями. Это происходит каждые 60 секунд, если ваш исполняемый код вообще не требует времени.
 jfs19 мар. 2014 г., 08:25
такжеtime.sleep(60) может вернуться как раньше, так и позже
 James Brady23 янв. 2009 г., 23:14
 litepresence21 февр. 2017 г., 15:28
«Предвидимая проблема» заключается в том, что вы не можете ожидать 60 итераций в час, просто используя time.sleep (60). Так что, если вы добавляете один элемент за итерацию и сохраняете список заданной длины ... среднее значение этого списка не будет представлять согласованный «период» времени; поэтому такие функции, как «скользящее среднее», могут ссылаться на слишком старые точки данных, что искажает ваши показания.

Ответы на вопрос(15)

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
 raviGupta28 авг. 2018 г., 13:11
На основе пользовательского ввода он будет повторять этот метод на каждом интервале времени.
 raviGupta28 авг. 2018 г., 13:12
Он будет повторяться, пока мы не остановим его вручную
Решение Вопроса

ИспользоватьПлановое модуль, который реализует планировщик событий общего назначения.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()
 Baishampayan Ghose23 янв. 2009 г., 22:13
Модуль sched предназначен для планирования функций для запуска через некоторое время. Как вы используете его для повторения вызова функции каждые x секунд без использования time.sleep ()?
 nosklo23 янв. 2009 г., 22:18
@Baishampayan: просто запланируйте новый пробег.
 jfs25 янв. 2017 г., 16:42
@JavaSa: потому что"делай свое дело" не мгновенный и ошибки отtime.sleep может накапливаться здесь. «Выполнять каждые X секунд» и «Выполнять с задержкой ~ X секунд повторно» - это не одно и то же. Смотрите такжеэтот комментарий
 Daniel F27 янв. 2013 г., 21:06
Тогда апшедулер наpackages.python.org/APScheduler Также следует упомянуть об этом.
 jfs28 окт. 2014 г., 17:45
примечание: эта версия может дрейфовать. Вы могли бы использоватьenterabs() чтобы избежать этого. Вотне дрейфующая версия для сравнения.

й функции этот код:

1) добавить first_interval, используемый для запуска таймера в определенное время (вызывающему необходимо вычислить first_interval и передать)

2) решить условие гонки в оригинальном коде. В исходном коде, если поток управления не смог отменить запущенный таймер («Остановите таймер и отмените выполнение действия таймера. Это будет работать, только если таймер все еще находится в стадии ожидания».)https://docs.python.org/2/library/threading.html), таймер будет работать бесконечно.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

чтобы вызвать 60 событий в час с большинством событий, происходящих в то же самое количество секунд после целой минуты:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

В зависимости от реальных условий вы можете получить отметки длины:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

но через 60 минут у вас будет 60 тиков; и большинство из них будет происходить с правильным смещением до минуты, которую вы предпочитаете.

В моей системе я получаю типичный дрейф <1/20 секунды, пока не возникнет необходимость в коррекции.

Преимущество этого метода - разрешение смещения часов; что может вызвать проблемы, если вы делаете такие вещи, как добавление одного элемента за тик и ожидаете, что 60 добавленных элементов в час. Отказ от учета дрейфа может привести к тому, что вторичные индикаторы, такие как скользящие средние, будут рассматривать данные слишком глубоко в прошлом, что приведет к ошибочному выводу.

Вы можете рассмотретьскрученный которая представляет собой сетевую библиотеку Python, которая реализуетОбразец реактора.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Хотя «while True: sleep (60)», вероятно, будет работать, Twisted, вероятно, уже реализует многие функции, которые вам в конечном итоге понадобятся (демонизация, ведение журнала или обработка исключений, как указано в bobince), и, вероятно, будет более надежным решением.

 Baishampayan Ghose23 янв. 2009 г., 22:27
Я знал, что Твист может сделать это. Спасибо, что поделились примером кода!

чтобы неблокирующий способ выполнял вашу функцию периодически, вместо блокирующего бесконечного цикла, я бы использовал многопоточный таймер. Таким образом, ваш код может продолжать работать и выполнять другие задачи, при этом ваша функция вызывается каждые n секунд. Я часто использую эту технику для распечатки информации о проделанной работе в длинных задачах с ЦП, дисками и сетью

Вот код, который я разместил в похожем вопросе, с элементами start () и stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Функции:

Только стандартная библиотека, без внешних зависимостейstart() а такжеstop() безопасно звонить несколько раз, даже если таймер уже запущен / остановленвызываемая функция может иметь позиционные и именованные аргументыТы можешь изменитьсяinterval в любое время, это будет эффективно после следующего запуска. То же самое дляargs, kwargs и дажеfunction!
 Rich Episcopo21 дек. 2016 г., 21:08
Вdef _run(self) Я пытаюсь обернуть голову, почему ты звонишьself.start() доself.function(), Можете ли вы уточнить? Я бы подумал позвонивstart() первыйself.is_running всегда будетFalse так что тогда мы всегда будем раскручивать новую нить.
 MestreLion06 февр. 2017 г., 16:12
Да, @RichieEpiscopo, призыв к.function() после.start() это запустить функцию при t = 0. И я не думаю, что это будет проблемой, еслиfunction занимает больше времени, чемinterval, но да, в коде могут быть некоторые гоночные условия.
 Rich Episcopo22 дек. 2016 г., 00:09
Я думаю, что я дошел до сути. Решение @ MestreLion запускает функцию каждыйx секунд (т. е. t = 0, t = 1x, t = 2x, t = 3x, ...), где в исходном коде примера кода эмблемы запускается функция сx второй интервал между Кроме того, это решение, я считаю, имеет ошибку, еслиinterval короче, чем времяfunction выполнить. В этом случае,self._timer будет перезаписано вstart функция.
 eraoul05 дек. 2016 г., 01:21
Это решение кажется дрейфующим со временем; Мне нужна была версия, которая призвана вызывать функцию каждые n секунд без дрейфа. Я выложу обновление в отдельном вопросе.
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

вы можете использовать его, чтобы запустить в своем собственном потоке:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Это решение сочетает в себе несколько функций, редко встречающихся в других решениях:

Обработка исключений: Насколько это возможно на этом уровне, исключения обрабатываются должным образом, т.е. е. войти в систему для целей отладки, не прерывая нашу программу.Нет цепочки: Общая цепочечная реализация (для планирования следующего события), которую вы найдете во многих ответах, является хрупкой в ​​том аспекте, что если что-то пойдет не так в механизме планирования (threading.Timer или что-то еще), это приведет к разрыву цепи. Тогда дальнейших казней не будет, даже если причина проблемы уже устранена. Простой цикл и ожидание с простымsleep() гораздо надежнее по сравнению.Нет дрейфа: Мое решение точно отслеживает время, в которое оно должно работать. Нет смещения в зависимости от времени выполнения (как и во многих других решениях).Пропуская: Мое решение будет пропускать задачи, если одно выполнение занимало слишком много времени (например, делать X каждые пять секунд, но X занимало 6 секунд). Это стандартное поведение cron (и не без причины). Многие другие решения затем просто выполняют задачу несколько раз подряд без какой-либо задержки. В большинстве случаев (например, задачи по очистке) это нежелательно. Если этоявляется хотел, просто используйтеnext_time += delay вместо.
 PirateApp21 февр. 2019 г., 10:34
спасибо, что поделился, единственное, что меня беспокоило, это то, что мне нужно было получить доступ к переменной для чтения, чтение переменной в 2-х потоках - плохая идея, нет, поэтому вопрос
 PirateApp21 февр. 2019 г., 08:52
upvoted! как вы делаете это без сна, у меня есть подписчик redis с входящими данными в реальном времени, и поэтому я не могу позволить себе спать, но мне нужно что-то запускать каждую минуту
 Sebastian Stark16 сент. 2018 г., 15:59
лучший ответ не дрейфовать.
 Alfe21 февр. 2019 г., 10:33
@PirateApp Я бы сделал это в другой теме. Вымог сделайте это в том же потоке, но затем вы запрограммируете свою собственную систему планирования, которая слишком сложна для комментариев.
 Alfe21 февр. 2019 г., 10:38
В Python, благодаря GIL, доступ к переменным в двух потоках совершенно безопасен. И простое чтение в двух потоках никогда не должно быть проблемой (также не в других многопоточных средах). Только запись из двух разных потоков в системе без GIL (например, в Java, C ++ и т. Д.) Требует некоторой явной синхронизации.

http://cronus.readthedocs.org может помочь?

Для v0.2 работает следующий фрагмент

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

который не «крадет игру» (например,Плановое модуль, который был представлен ранее), т.е. он позволяет другим вещам работать параллельно:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1() а такжеdo_something2() может работать параллельно и с любым интервалом скорости. Здесь 2-й будет выполнен вдвое быстрее. Обратите внимание, что я использовал простой счетчик в качестве условия для завершения любой функции. Вы можете использовать любое другое условие, которое вам нравится, или нет, если вы хотите выполнить функцию до завершения программы (например, часы).

 nosklo15 мар. 2018 г., 20:39
@Apostolos все, что вам нужно сделать, это использоватьTkinter основной цикл вместоПлановое mainloop, поэтому он работает точно так же, но позволяет интерфейсам tkinter продолжать отвечать. Если вы не используете tkinter для других целей, это ничего не меняет в отношении решения sched. Вы можете использовать две или более запланированные функции с разными интервалами вsched решение, и оно будет работать точно так же, как у вас.
 Apostolos19 мар. 2018 г., 09:17
Нет, это не работает так же. Я объяснил это. Один «блокирует» программу (т.е. останавливает поток, вы больше ничего не можете сделать - даже не запускаете другую запланированную работу, как вы предлагаете), пока она не закончится, а другая освобождает ваши руки / освобождает (то есть вы можете сделать другие вещи после того, как он начался. Вам не нужно ждать, пока он не закончится. Это огромная разница. Если бы вы попробовали метод, который я представил, вы бы увидели сами. Я попробовал ваш. Почему бы вам не попробуй мой тоже?
 Bryan Oakley14 мар. 2018 г., 13:23
Будьте осторожны с вашей формулировкой:after не позволяет вещам работать параллельно. Tkinter является однопоточным и может делать только одно за раз. Если что-то запланированоafter работает, он не работает параллельно с остальным кодом. Если обаdo_something1 а такжеdo_something2 запланированы для запуска в то же время, они будут работать последовательно, а не параллельно.

которое позволяет избежать дрейфа с течением времени:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

что исключение убьет демона навсегда. Возможно, вы захотите заключить в ловушку исключений и регистратор.

Просто закрепите вашу петлю времени на системных часах. Легко.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))
 TemporalWolf14 июл. 2016 г., 21:22
Работает фантастически. Там нет необходимости вычитатьstarttime если вы начнете с синхронизации до определенного времени:time.sleep(60 - time.time() % 60) работал нормально для меня. Я использовал это какtime.sleep(1200 - time.time() % 1200) и это дает мне логи на:00 :20 :40именно так, как я хотел.
 Will16 февр. 2016 г., 05:40
я предпочитаюfrom time import time, sleep из-за экзистенциальных последствий;)
 jfs18 сент. 2014 г., 09:09
+1. твой иtwisted ответ являются единственными ответами, которые запускают функцию каждыйx секунд. Остальные выполняют функцию с задержкойx секунд после каждого звонка.
 Mayhem01 янв. 2016 г., 23:48
Если вы добавите в этот код какой-нибудь код, который занял бы больше одной секунды ... Это выкинуло бы время ожидания и начало отставать. Принятый ответ в этом случае правильный ... Любой может зациклить простую команду печати и он запускается каждую секунду без задержки ...
 jfs01 авг. 2016 г., 22:51
@AntonSchigur, чтобы избежать дрейфа после нескольких итераций. Отдельная итерация может начаться немного раньше или позже в зависимости отsleep(), timer() точность и время выполнения тела цикла, но в среднем итерации всегда происходят на границах интервала (даже если некоторые из них пропущены):while keep_doing_it(): sleep(interval - timer() % interval), Сравните это простоwhile keep_doing_it(): sleep(interval) где ошибки могут накапливаться после нескольких итераций.

например, показать текущее местное время

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

Один из возможных ответов:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
 Alfe12 апр. 2018 г., 17:57
Это занято ожиданием, поэтому очень плохо.
 Noel22 янв. 2019 г., 12:44
Хорошее решение для тех, кто ищет неблокирующий таймер.

Более простой способ, которым я верю:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Таким образом, ваш код выполняется, затем он ждет 60 секунд, затем он выполняется снова, ждет, выполняет и т.д. ... Не нужно усложнять вещи: D

 Sean Cain07 авг. 2013 г., 23:16
Ключевое слово True должно быть в верхнем регистре
 kommradHomer18 сент. 2013 г., 12:09
На самом деле это не ответ: time sleep () может использоваться только для ожидания X секунд после каждого выполнения. Например, если выполнение вашей функции занимает 0,5 секунды, и вы используете time.sleep (1), это означает, что ваша функция выполняется каждые 1,5 секунды, а не 1. Вы должны использовать другие модули и / или потоки, чтобы убедиться, что что-то работает в течение Y раз в каждой секунде X
 Leonard Lepadatu17 нояб. 2017 г., 14:09
На мой взгляд код должен позвонитьtime.sleep() вwhile True петля как:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
 jfs21 сент. 2014 г., 06:56
@kommradHomer:Ответ Дэйва Роува демонстрирует, что выМожно использованиеtime.sleep() запускать что-то каждые X секунд
 fatuhoku01 сент. 2013 г., 23:37
На самом деле это самый подходящий ответ на вопрос!

Ваш ответ на вопрос