- Русские Блоги
- Модуль расписания библиотеки времени выполнения задач Python
- Введение
- Пример использования
- Связанные вопросы:
- 1. Как выполнять работу параллельно?
- 2. Используйте очереди?
- 3. Запустить задачу только один раз?
- 4. Отменить задание?
- 5. Запускать задачи через случайные промежутки времени?
- 6. Как вызвать исключение?
- Examples¶
- Run a job every x minute¶
- Use a decorator to schedule a job¶
- Pass arguments to a job¶
- Cancel a job¶
- Run a job once¶
- Get all jobs¶
- Cancel all jobs¶
- Get several jobs, filtered by tags¶
- Cancel several jobs, filtered by tags¶
- Run a job at random intervals¶
- Run a job until a certain time¶
- Time until the next execution¶
- Run all jobs now, regardless of their scheduling¶
Русские Блоги
Модуль расписания библиотеки времени выполнения задач Python
Введение
Легкая библиотека времени выполнения задач Python -Официальный документ
Пример использования
import schedule import time def job(): print("I'm working. ") def job1(name): # С параметрами print(name) schedule.every(10).minutes.do(job) # Выполнять каждые 10 минут schedule.every().hour.do(job) # Выполнять каждый час schedule.every().day.at("10:30").do(job) # Выполнять один раз в день в 10:30 schedule.every().monday.do(job) #Execute один раз в неделю в понедельник schedule.every().wednesday.at("13:15").do(job) #Execute один раз в неделю в среду schedule.every().wednesday.at("13:15").do(job1,'waiwen') # Входящие параметры while True: schedule.run_pending() time.sleep(1) Скопировать код
Связанные вопросы:
1. Как выполнять работу параллельно?
Если 50 задач запускаются каждые 10 минут, каждая задача занимает 1 минуту, поэтому, когда наступают следующие 10 минут, предыдущий раунд задач все еще выполняется, а затем начинается новый раунд задач. Способ решения этой проблемы — создать поток для каждой задачи и позволить задачам работать в потоке параллельно.
import threading import time import schedule def job(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) while 1: schedule.run_pending() time.sleep(1) Скопировать код
2. Используйте очереди?
Используйте совместное использование очереди и несколько worker_threads.
import Queue import time import threading import schedule def job(): print("I'm working") def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = Queue.Queue() schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1) Скопировать код
3. Запустить задачу только один раз?
def job_that_executes_once(): # Do some work . return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) Скопировать код
4. Отменить задание?
def greet(name): print('Hello <>'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks') Скопировать код
5. Запускать задачи через случайные промежутки времени?
def my_job(): # This job will execute every 5 to 10 seconds. print('Foo') schedule.every(5).to(10).seconds.do(my_job) Скопировать код
6. Как вызвать исключение?
Модуль не будет активно генерировать исключения, но вы можете использовать декораторы для перехвата исключений целевой задачи.
import functools def catch_exceptions(job_func, cancel_on_failure=False): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task) Скопировать код
Для более подробного использования посетитеОфициальный документ
Examples¶
Eager to get started? This page gives a good introduction to Schedule. It assumes you already have Schedule installed. If you do not, head over to Installation .
Run a job every x minute¶
import schedule import time def job(): print("I'm working. ") # Run job every 3 second/minute/hour/day/week, # Starting 3 second/minute/hour/day/week from now schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # Run job every minute at the 23rd second schedule.every().minute.at(":23").do(job) # Run job every hour at the 42nd minute schedule.every().hour.at(":42").do(job) # Run jobs every 5th hour, 20 minutes and 30 seconds in. # If current time is 02:00, first execution is at 06:20:30 schedule.every(5).hours.at("20:30").do(job) # Run job every day at specific HH:MM and next HH:MM:SS schedule.every().day.at("10:30").do(job) schedule.every().day.at("10:30:42").do(job) schedule.every().day.at("12:42", "Europe/Amsterdam").do(job) # Run job on a specific day of the week schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
Use a decorator to schedule a job¶
Use the @repeat to schedule a function. Pass it an interval using the same syntax as above while omitting the .do() .
from schedule import every, repeat, run_pending import time @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
The @repeat decorator does not work on non-static class methods.
Pass arguments to a job¶
do() passes extra arguments to the job function
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') from schedule import every, repeat @repeat(every().second, "World") @repeat(every().day, "Mars") def hello(planet): print("Hello", planet)
Cancel a job¶
To remove a job from the scheduler, use the schedule.cancel_job(job) method
import schedule def some_task(): print('Hello world') job = schedule.every().day.at('22:30').do(some_task) schedule.cancel_job(job)
Run a job once¶
Return schedule.CancelJob from a job to remove it from the scheduler.
import schedule import time def job_that_executes_once(): # Do some work that only needs to happen once. return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
Get all jobs¶
To retrieve all jobs from the scheduler, use schedule.get_jobs()
import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
Cancel all jobs¶
To remove all jobs from the scheduler, use schedule.clear()
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().second.do(greet) schedule.clear()
Get several jobs, filtered by tags¶
You can retrieve a group of jobs from the scheduler, selecting them by a unique identifier.
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend')
Will return a list of every job tagged as friend .
Cancel several jobs, filtered by tags¶
You can cancel the scheduling of a group of jobs selecting them by a unique identifier.
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks')
Will prevent every job tagged as daily-tasks from running again.
Run a job at random intervals¶
def my_job(): print('Foo') # Run every 5 to 10 seconds. schedule.every(5).to(10).seconds.do(my_job)
every(A).to(B).seconds executes the job function every N seconds such that A
Run a job until a certain time¶
import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # run job until a 18:30 today schedule.every(1).hours.until("18:30").do(job) # run job until a 2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # Schedule a job to run for the next 8 hours schedule.every(1).hours.until(timedelta(hours=8)).do(job) # Run my_job until today 11:33:42 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # run job until a specific datetime schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
The until method sets the jobs deadline. The job will not run after the deadline.
Time until the next execution¶
Use schedule.idle_seconds() to get the number of seconds until the next job is scheduled to run. The returned value is negative if the next scheduled jobs was scheduled to run in the past. Returns None if no jobs are scheduled.
import schedule import time def job(): print('Hello') schedule.every(5).seconds.do(job) while 1: n = schedule.idle_seconds() if n is None: # no more jobs break elif n > 0: # sleep exactly the right amount of time time.sleep(n) schedule.run_pending()
Run all jobs now, regardless of their scheduling¶
To run all jobs regardless if they are scheduled to run or not, use schedule.run_all() . Jobs are re-scheduled after finishing, just like they would if they were executed using run_pending() .
import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # Add the delay_seconds argument to run the jobs with a number # of seconds delay in between. schedule.run_all(delay_seconds=10)