7. windwos linux 定时任务

windows 计划任务

# 计算机管理 compmgmt.msc ---> 计划任务 ---> 创建计划任务 ---> 名称-触发器-执行python.exe 参数xxx.py 起始目录 完成

# 测试脚本
import sys
import datetime

format_time = datetime.datetime.now()

with open(file="forpy.log",mode="a",encoding="utf-8") as lf:
    lf.write(str(format_time))
    lf.write("\n")
    lf.write(str(sys.argv))
    lf.write("\n")

Python 计划任务

# python执行计划任务 
# 利用while True: + sleep()实现定时任务
import datetime
import time
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)  # 暂停5秒
if __name__ == "__main__":
    loop_monitor()

# 使用Timeloop库运行定时任务
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print("2s job current time : {}".format(time.ctime()))
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print("5s job current time : {}".format(time.ctime()))
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print("10s job current time : {}".format(time.ctime()))

# 利用内置模块sched实现定时任务
import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成调度器
    s.enter(5, 1, time_printer, ())
    s.run()
if __name__ == "__main__":
    loop_monitor()

# 利用调度模块schedule实现定时任务
import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

# 装饰器: 通过 @repeat() 装饰静态方法
import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)
# 传递参数: 

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()
# 装饰器同样能传递参数: 

from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()
# 取消任务: 

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()
# 运行一次任务: 

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)
# 根据标签检索任务: 

# 检索所有任务: schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
# 根据标签取消任务: 

# 取消所有任务: schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()
# 运行任务到某时间: 

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()
# 马上运行所有任务(主要用于测试): 

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒
# 并行运行: 使用 Python 内置队列实现: 

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

    APScheduler中的重要概念
    Job 作业
    Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

    构建说明: 

    id: 指定作业的唯一ID
    name: 指定作业的名字
    trigger: apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
    executor: apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
    max_instances: 执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
    next_run_time: Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
    misfire_grace_time: Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
    coalesce: Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
    func: Job执行的函数
    args: Job执行函数需要的位置参数
    kwargs: Job执行函数需要的关键字参数
    Trigger 触发器
    Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。

    目前APScheduler支持触发器: 

    指定时间的DateTrigger
    指定间隔时间的IntervalTrigger
    像Linux的crontab一样的CronTrigger。
    触发器参数: date

    date定时,作业只执行一次。

    run_date (datetime|str) -- the date/time to run the job at
    timezone (datetime.tzinfo|str) -- time zone for run_date if it does not have one already
    sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
    sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
    触发器参数: interval

    interval间隔调度

    weeks (int) --  间隔几周
    days (int) --  间隔几天
    hours (int) --  间隔几小时
    minutes (int) --  间隔几分钟
    seconds (int) --  间隔多少秒
    start_date (datetime|str) --  开始日期
    end_date (datetime|str) --  结束日期
    timezone (datetime.tzinfo|str) --  时区
    sched.add_job(job_function, 'interval', hours=2)
    触发器参数: cron

    cron调度

    (int|str) 表示参数既可以是int类型,也可以是str类型
    (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
    year (int|str) -- 4-digit year -(表示四位数的年份,如2008年)
    month (int|str) -- month (1-12) -(表示取值范围为1-12月)
    day (int|str) -- day of the (1-31) -(表示取值范围为1-31日)
    week (int|str) -- ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
    day_of_week (int|str) -- number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) -- (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
    hour (int|str) -- hour (0-23) -- (表示取值范围为0-23时)
    minute (int|str) -- minute (0-59) -- (表示取值范围为0-59分)
    second (int|str) -- second (0-59) -- (表示取值范围为0-59秒)
    start_date (datetime|str) -- earliest possible date/time to trigger on (inclusive) -- (表示开始时间)
    end_date (datetime|str) -- latest possible date/time to trigger on (inclusive) -- (表示结束时间)
    timezone (datetime.tzinfo|str) -- time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
    CronTrigger可用的表达式: 

    表达式 参数类型    描述
    *   所有  通配符。例: minutes=*即每分钟触发
    * / a   所有  每隔时长a执行一次。例: minutes=”* / 3″ 即每隔3分钟执行一次
    a -- b  所有  a -- b的范围内触发。例: minutes=“2-5”。即2到5分钟内每分钟执行一次
    a -- b / c  所有  a -- b范围内,每隔时长c执行一次。
    xth y   日   第几个星期几触发。x为第几个,y为星期几
    last x  日   一个月中,最后一个星期的星期几触发
    last    日   一个月中的最后一天触发
    x, y, z 所有  组合表达式,可以组合确定值或上述表达式
    # 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
    sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
    # 每周一到周五运行 直到2024-05-30 00:00:00
    sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
    Executor 执行器
    Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job。

    Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。

    Executor的选择需要根据实际的scheduler来选择不同的执行器。目前APScheduler支持的Executor: 

    executors.asyncio: 同步io,阻塞
    executors.gevent: io多路复用,非阻塞
    executors.pool: 线程ThreadPoolExecutor和进程ProcessPoolExecutor
    executors.twisted: 基于事件驱动
    Jobstore 作业存储
    Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有: 

    jobstores.memory: 内存
    jobstores.mongodb: 存储在mongodb
    jobstores.redis: 存储在redis
    jobstores.rethinkdb: 存储在rethinkdb
    jobstores.sqlalchemy: 支持sqlalchemy的数据库如mysql,sqlite等
    jobstores.zookeeper: zookeeper
    不同的任务存储器可以在调度器的配置中进行配置(见调度器)

    Event 事件
    Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作。常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。

    目前APScheduler定义的Event: 

    EVENT_SCHEDULER_STARTED
    EVENT_SCHEDULER_START
    EVENT_SCHEDULER_SHUTDOWN
    EVENT_SCHEDULER_PAUSED
    EVENT_SCHEDULER_RESUMED
    EVENT_EXECUTOR_ADDED
    EVENT_EXECUTOR_REMOVED
    EVENT_JOBSTORE_ADDED
    EVENT_JOBSTORE_REMOVED
    EVENT_ALL_JOBS_REMOVED
    EVENT_JOB_ADDED
    EVENT_JOB_REMOVED
    EVENT_JOB_MODIFIED
    EVENT_JOB_EXECUTED
    EVENT_JOB_ERROR
    EVENT_JOB_MISSED
    EVENT_JOB_SUBMITTED
    EVENT_JOB_MAX_INSTANCES
    Listener表示用户自定义监听的一些Event,比如当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。

    调度器
    Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

    APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler

    BlockingScheduler: 适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
    BackgroundScheduler: 适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
    AsyncIOScheduler: 适用于使用了asyncio模块的应用程序。
    GeventScheduler: 适用于使用gevent模块的应用程序。
    TwistedScheduler: 适用于构建Twisted的应用程序。
    QtScheduler: 适用于构建Qt的应用程序。

Linux定时任务

# linux 计划任务
# 一般crontab文件里面的定时任务格式如下所示: 

# 59 23 * * * /home/oracle/scripts/alert_log_archive.sh >/dev/null 2>&1

# crontab 文件中每个条目中各个域的意义和格式: 

# 第一列 分钟:  1——59

# 第二列 小时:  1——23(0表示子夜)

# 第三列 日 :  1——31

# 第四列 月 :  1——12

# 第五列 星期:  星期0——6(0表示星期天,1表示星期一、以此类推)

# 第六列 要运行的命令

# 我们暂且用C1、C2、C3、C4、C5、C6代表这六列,前面五列通过组合方式来决定执行脚本的频率,最小频率为每分钟执行一次,其中Cn可以用 * ; */n ; T1-T2; a,b,c; 四种形式来表示: 

# 当 C1 为 * 时表示每分钟都要执行脚本,C2 为 * 时表示每小时都要执行程式,依次类推.....

# 当 C1 为 T1-T2 时表示从第 T1 分钟到第 T2 分钟这段时间内要执行,C2 为 T1-T2 时表示从第 T1 到第 T2 小时都要执行,依次类推....

# 当 C1 为 */n 时表示每 n 分钟的时间间隔执行一次,C2 为 */n 表示每隔n小时的间隔执行一次,依次类推.....

# 当 C1 为 a, b, c,... 时表示第 a, b, c,... 分钟要执行,C2 为 a, b, c,... 时表示第 a, b, c...个小时要执行,依次类推....

# 下面列举几个例子供大家参考

# 1:  59 23 * * * /home/oracle/scripts/alert_log_archive.sh >/dev/null 2>&1

# 表示每天23点59分执行脚本/home/oracle/scripts/alert_log_archive.sh

# 2: */5 * * * * /home/oracle/scripts/monitoring_alert_log.sh >/dev/null 2>&1

# 表示每5分钟执行一次脚本/home/oracle/scripts/monitoring_alert_log.sh

# 3:  0 20 * * 1-5 mail -s "**********" kerry@domain.name < /tmp/maildata

# 周一到周五每天下午 20:00 寄一封信给 kerry@domain.name

# ..............................................

# 关于 >/dev/null 2>&1 的解释: 

# 0表示键盘输入

# 1表示标准输出

# 2表示错误输出.

# 我们首先创建test.sh脚本如下: 

# #! /bin/sh

# echo "hello, everybody, now is " `date`

# date >> test.txt

# 然后添加作业

# * * * * * /home/oracle/test.sh >/home/oracle/log.txt & 默认值为1,即和下面命令一致

# * * * * * /home/oracle/test.sh 1>/home/oracle/log.txt &

# * * * * * /home/oracle/test.sh 2>/home/oracle/log.txt &

# * * * * * /home/oracle/test.sh 2>/home/oracle/log.txt 2>&1 &

# 1,2将tesh.sh 命令输出重定向到log.txt, 即输出内容不打印到屏幕上,而是输出到log.txt文件中。如果你需要追加而不是覆盖,可以用 >>代替>

# 2>&1 是将错误输出重定向到标准输出。 然后将标准输入重定向到文件log.txt。

# &1 表示的是文件描述1,表示标准输出,如果这里少了&就成了数字1,就表示重定向到文件1。