Ansible源码分析之corn模块
corntab是linux上被大家熟知的定时任务,而Ansible中使用corn模块来管理crontab和环境变量条目。
- 该模块允许创建环境变量和命名的crontab条目,更新或删除它们。
- ‘当管理crontab作业时:模块包括一行,其中crontab条目C(“#Ansible:”)的描述与传递给模块的“名称”相对应,供将来的ansible /模块使用调用以查找/检查状态。
- “名称”参数应该是唯一的,并且更改“名称”值将导致创建新的cron任务(或删除其他任务)。
- 在管理环境变量时,不添加任何注释行,但是,当模块需要查找/检查状态时,它将使用“名称”参数来查找环境变量定义行。
先看导入的模块。
import os
import platform
import pwd
import re
import sys
import tempfile
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_bytes, to_native
from ansible.module_utils.six.moves import shlex_quote
class CronTabError(Exception):
pass
platform模块提供了很多方法去获取操作系统的信息,其他模块也都是常用到的。紧接着定义了CronTabError异常类。
class CronTab(object):
def __init__(self, module, user=None, cron_file=None):
self.module = module
self.user = user
self.root = (os.getuid() == 0)
self.lines = None
self.ansible = "#Ansible: "
self.n_existing = ''
self.cron_cmd = self.module.get_bin_path('crontab', required=True)
if cron_file:
if os.path.isabs(cron_file):
self.cron_file = cron_file
self.b_cron_file = to_bytes(cron_file, errors='surrogate_or_strict')
else:
self.cron_file = os.path.join('/etc/cron.d', cron_file)
self.b_cron_file = os.path.join(b'/etc/cron.d', to_bytes(cron_file, errors='surrogate_or_strict'))
else:
self.cron_file = None
self.read()
定义了CronTab这个类,CronTab对象写入基于时间的crontab文件。构造函数接收两个参数,user-crontab的用户(默认为当前用户)cron_file为/etc/cron.d下的cron文件,或绝对路径。然后调用read()方法从系统中读取crontab。
def read(self):
self.lines = []
if self.cron_file:
try:
f = open(self.b_cron_file, 'rb')
self.n_existing = to_native(f.read(), errors='surrogate_or_strict')
self.lines = self.n_existing.splitlines()
f.close()
except IOError:
return
except Exception:
raise CronTabError("Unexpected error:", sys.exc_info()[0])
else:
(rc, out, err) = self.module.run_command(self._read_user_execute(), use_unsafe_shell=True)
if rc != 0 and rc != 1:
raise CronTabError("Unable to read crontab")
self.n_existing = out
lines = out.splitlines()
count = 0
for l in lines:
if count > 2 or (not re.match(r'# DO NOT EDIT THIS FILE - edit the master and reinstall.', l) and
not re.match(r'# \(/tmp/.*installed on.*\)', l) and
not re.match(r'# \(.*version.*\)', l)):
self.lines.append(l)
else:
pattern = re.escape(l) + '[\r\n]?'
self.n_existing = re.sub(pattern, '', self.n_existing, 1)
count += 1
初始化lines为空列表,开始读取cronfile。抛出IOError异常时,也就是cron文件不存在时返回空。其他异常时则抛出上面定义的CronTabError。sys.exc_info()是用来获取异常信息。
在实际调试程序的过程中,有时只获得异常的类型是远远不够的,还需要借助更详细的异常信息才能解决问题。捕获异常时,有 2 种方式可获得更多的异常信息,分别是:
- 使用 sys 模块中的 exc_info 方法;
- 使用 traceback 模块中的相关函数。
exc_info() 方法会将当前的异常信息以元组的形式返回,该元组中包含 3 个元素,分别为 type、value 和 traceback,它们的含义分别是:
- type:异常类型的名称,它是 BaseException 的子类
- value:捕获到的异常实例。
- traceback:是一个 traceback 对象。
当cron_file为None时,通过命令执行的方式执行_read_user_execute()返回的命令行。这里对状态码做了判断,非0非1的情况,1表示没有工作。抛出CronTabError无法读取crontab。将命令执行的结果保存在n_existing中。
接着就是对corntab文件内容的处理,按行读取。当行数大于3时或者没有对文件中特定的注释行匹配到时,也就是crontab的任务内容。其他情况则直接用re.escape将改行转义,用re.sub将其替换为空。
上面代码中调用了_read_user_execute方法。
def _read_user_execute(self):
user = ''
if self.user:
if platform.system() == 'SunOS':
return "su %s -c '%s -l'" % (shlex_quote(self.user), shlex_quote(self.cron_cmd))
elif platform.system() == 'AIX':
return "%s -l %s" % (shlex_quote(self.cron_cmd), shlex_quote(self.user))
elif platform.system() == 'HP-UX':
return "%s %s %s" % (self.cron_cmd, '-l', shlex_quote(self.user))
elif pwd.getpwuid(os.getuid())[0] != self.user:
user = '-u %s' % shlex_quote(self.user)
return "%s %s %s" % (self.cron_cmd, user, '-l')
返回用于读取crontab的命令行,这里是针对一些不同的linux操作系统生成对应获取crontab的命令。
def is_empty(self):
if len(self.lines) == 0:
return True
else:
return False
这里是我觉得写得很不好的代码,一点都不专业,像新手写的代码。
def is_empty(self):
return len(self.lines) == 0
直接这样写不就好了 ???这两种写法完全是等价的!
def write(self, backup_file=None):
if backup_file:
fileh = open(backup_file, 'wb')
elif self.cron_file:
fileh = open(self.b_cron_file, 'wb')
else:
filed, path = tempfile.mkstemp(prefix='crontab')
os.chmod(path, int('0644', 8))
fileh = os.fdopen(filed, 'wb')
fileh.write(to_bytes(self.render()))
fileh.close()
if backup_file:
return
if not self.cron_file:
(rc, out, err) = self.module.run_command(self._write_execute(path), use_unsafe_shell=True)
os.unlink(path)
if rc != 0:
self.module.fail_json(msg=err)
if self.module.selinux_enabled() and self.cron_file:
self.module.set_default_selinux_context(self.cron_file, False)
将crontab写入系统,保存所有信息。都是文件IO操作。当没有指定cron_file时还是通过命令执行的方式调用_write_execute返回对应的操作系统的crontab命令。并检查selinux是否开启。
def _write_execute(self, path):
user = ''
if self.user:
if platform.system() in ['SunOS', 'HP-UX', 'AIX']:
return "chown %s %s ; su '%s' -c '%s %s'" % (
shlex_quote(self.user), shlex_quote(path), shlex_quote(self.user), self.cron_cmd, shlex_quote(path))
elif pwd.getpwuid(os.getuid())[0] != self.user:
user = '-u %s' % shlex_quote(self.user)
return "%s %s %s" % (self.cron_cmd, user, shlex_quote(path))
返回用于写入crontab的命令行。和之前的_read_user_execute也是类似的。
def render(self):
crons = []
for cron in self.lines:
crons.append(cron)
result = '\n'.join(crons)
if result:
result = result.rstrip('\r\n') + '\n'
return result
渲染此crontab,就像在crontab中一样。就是将上面的self.lines中每行的内容加上换行符。
def do_comment(self, name):
return "%s%s" % (self.ansible, name)
def add_job(self, name, job):
self.lines.append(self.do_comment(name))
self.lines.append("%s" % (job))
def update_job(self, name, job):
return self._update_job(name, job, self.do_add_job)
def do_add_job(self, lines, comment, job):
lines.append(comment)
lines.append("%s" % (job))
def remove_job(self, name):
return self._update_job(name, "", self.do_remove_job)
def do_remove_job(self, lines, comment, job):
return None
这几个方法都很简洁,对crontab的添加或删除操作以及添加注释,比如update_job和remove_job都是对_update_job的封装,只是传递不同的参数。
def add_env(self, decl, insertafter=None, insertbefore=None):
if not (insertafter or insertbefore):
self.lines.insert(0, decl)
return
if insertafter:
other_name = insertafter
elif insertbefore:
other_name = insertbefore
other_decl = self.find_env(other_name)
if len(other_decl) > 0:
if insertafter:
index = other_decl[0] + 1
elif insertbefore:
index = other_decl[0]
self.lines.insert(index, decl)
return
self.module.fail_json(msg="Variable named '%s' not found." % other_name)
def update_env(self, name, decl):
return self._update_env(name, decl, self.do_add_env)
def do_add_env(self, lines, decl):
lines.append(decl)
def remove_env(self, name):
return self._update_env(name, '', self.do_remove_env)
def do_remove_env(self, lines, decl):
return None
这个env是什么呢?先看一个标准的crontab文件是怎么写的。在文件开头制定了所使用的shell和PATH等。

add_env有insertafter和insertbefore两个插入的标记位置。都为None时直接在索引0的位置也就是行首插入。有值时赋值给other_name,并调用find_env来查找对应的env。而find_env在匹配到时是返回的二元列表,所以索引0就是匹配到的env的索引,根据是在之前或之后插入绝对是否+1并插入。
至于update_env和remove_env还是调用的_update_env传参。
def find_env(self, name):
for index, l in enumerate(self.lines):
if re.match(r'^%s=' % name, l):
return [index, l]
return []
def _update_job(self, name, job, addlinesfunction):
ansiblename = self.do_comment(name)
newlines = []
comment = None
for l in self.lines:
if comment is not None:
addlinesfunction(newlines, comment, job)
comment = None
elif l == ansiblename:
comment = l
else:
newlines.append(l)
self.lines = newlines
if len(newlines) == 0:
return True
else:
return False
find_env是遍历self.lines来正则匹配返回索引及内容的。再看_update_env的addenvfunction参数,从上面的调用可以知道这实际是传递的一个函数,也就是回调函数。
def remove_job_file(self):
try:
os.unlink(self.cron_file)
return True
except OSError:
return False
except Exception:
raise CronTabError("Unexpected error:", sys.exc_info()[0])
def find_job(self, name, job=None):
comment = None
for l in self.lines:
if comment is not None:
if comment == name:
return [comment, l]
else:
comment = None
elif re.match(r'%s' % self.ansible, l):
comment = re.sub(r'%s' % self.ansible, '', l)
if job:
for i, l in enumerate(self.lines):
if l == job:
if not re.match(r'%s' % self.ansible, self.lines[i - 1]):
self.lines.insert(i, self.do_comment(name))
return [self.lines[i], l, True]
elif name and self.lines[i - 1] == self.do_comment(None):
self.lines[i - 1] = self.do_comment(name)
return [self.lines[i - 1], l, True]
return []
remove_job_file顾名思义是删除crontab文件,cron文件不存在返回False。尝试通过“ Ansible:”开头的注释找到job。这里“Ansible:”的这种注释也是由ansible的cron模块生成的,相当于一种标记。匹配到ansible时,将其替换为空。
否则,尝试通过完全匹配找到工作。由于有些crontab可能不是cron来写入的,所以没有前导ansible标头,则插入一个。如果前导空白ansible标头并且job具有名称,则更新标头。
def get_cron_job(self, minute, hour, day, month, weekday, job, special, disabled):
# normalize any leading/trailing newlines (ansible/ansible-modules-core#3791)
job = job.strip('\r\n')
if disabled:
disable_prefix = '#'
else:
disable_prefix = ''
if special:
if self.cron_file:
return "%s@%s %s %s" % (disable_prefix, special, self.user, job)
else:
return "%s@%s %s" % (disable_prefix, special, job)
else:
if self.cron_file:
return "%s%s %s %s %s %s %s %s" % (disable_prefix, minute, hour, day, month, weekday, self.user, job)
else:
return "%s%s %s %s %s %s %s" % (disable_prefix, minute, hour, day, month, weekday, job)
def get_jobnames(self):
jobnames = []
for l in self.lines:
if re.match(r'%s' % self.ansible, l):
jobnames.append(re.sub(r'%s' % self.ansible, '', l))
return jobnames
def get_envnames(self):
envnames = []
for l in self.lines:
if re.match(r'^\S+=', l):
envnames.append(l.split('=')[0])
return envnames
在get_cron_job中,规范化任何前导/尾随的换行符。这里就是根据crontab 分时日月周 来格式化写入crontab。get_jobnames的作用就是获取job的名字,但这里用的(re.sub(r’%s’ % self.ansible, ”, l),就是用正则替换”#Ansible:为空后所剩下的就是job的名字。虽然我也不知道这里用sub替换有没有必要,直接字符串截取处理貌似也差不多?get_envnames和get_jobnames也是差不多的。
接着看main函数部分的代码。Ansible实例化模块对象以及参数赋值的部分就不提了。
changed = False
res_args = dict()
warnings = list()
if cron_file:
cron_file_basename = os.path.basename(cron_file)
if not re.search(r'^[A-Z0-9_-]+$', cron_file_basename, re.I):
warnings.append('Filename portion of cron_file ("%s") should consist' % cron_file_basename +
' solely of upper- and lower-case letters, digits, underscores, and hyphens')
os.umask(int('022', 8))
crontab = CronTab(module, user, cron_file)
module.debug('cron instantiated - name: "%s"' % name)
if not name:
module.deprecate(
msg="The 'name' parameter will be required in future releases.",
version='2.12', collection_name='ansible.builtin'
)
if reboot:
module.deprecate(
msg="The 'reboot' parameter will be removed in future releases. Use 'special_time' option instead.",
version='2.12', collection_name='ansible.builtin'
)
先获取了cron_file的文件名,正则判断是否符合规范。通过os.umask(int(‘022’, 8))确保生成的所有文件只能由拥有用户的用户写入,022就是644的权限,主要与cron_file选项有关。然后实例化crontab对象,以及判断参数。
- 将来的版本中将需要’name’参数。
- 在将来的版本中,“ reboot”参数将被删除。请改用“ special_time”选项。
if module._diff:
diff = dict()
diff['before'] = crontab.n_existing
if crontab.cron_file:
diff['before_header'] = crontab.cron_file
else:
if crontab.user:
diff['before_header'] = 'crontab for user "%s"' % crontab.user
else:
diff['before_header'] = 'crontab'
if env and not name:
module.fail_json(msg="You must specify 'name' while working with environment variables (env=yes)")
if (special_time or reboot) and \
(True in [(x != '*') for x in [minute, hour, day, month, weekday]]):
module.fail_json(msg="You must specify time and date fields or special time.")
if (special_time or reboot) and platform.system() == 'SunOS':
module.fail_json(msg="Solaris does not support special_time=... or @reboot")
if cron_file and do_install:
if not user:
module.fail_json(msg="To use cron_file=... parameter you must specify user=... as well")
if job is None and do_install:
module.fail_json(msg="You must specify 'job' to install a new cron job or variable")
if (insertafter or insertbefore) and not env and do_install:
module.fail_json(msg="Insertafter and insertbefore parameters are valid only with env=yes")
if reboot:
special_time = "reboot"
diff是一个字典保存的前后crontab的改变。然后对用户输入验证,比如无法在solaris上支持special_time。
if backup and not module.check_mode:
(backuph, backup_file) = tempfile.mkstemp(prefix='crontab')
crontab.write(backup_file)
if crontab.cron_file and not do_install:
if module._diff:
diff['after'] = ''
diff['after_header'] = '/dev/null'
else:
diff = dict()
if module.check_mode:
changed = os.path.isfile(crontab.cron_file)
else:
changed = crontab.remove_job_file()
module.exit_json(changed=changed, cron_file=cron_file, state=state, diff=diff)
if env:
if ' ' in name:
module.fail_json(msg="Invalid name for environment variable")
decl = '%s="%s"' % (name, job)
old_decl = crontab.find_env(name)
if do_install:
if len(old_decl) == 0:
crontab.add_env(decl, insertafter, insertbefore)
changed = True
if len(old_decl) > 0 and old_decl[1] != decl:
crontab.update_env(name, decl)
changed = True
else:
if len(old_decl) > 0:
crontab.remove_env(name)
changed = True
如果需要,请先进行备份,然后再进行更改。do_install参数是表示当前要执行的添加/删除操作,默认是present。还有就是对crontab对象方法的一些调用,实际上在上面类方法中已经分析过了。同时对每个job也有判断,job中不应包含换行符。
这里再补充一下\r,\n,\r\n的区别
在Windows中:
- ‘\r’ 回车,回到当前行的行首,而不会换到下一行,如果接着输出的话,本行以前的内容会被逐一覆盖。
- ‘\n’ 换行,换到当前位置的下一行,而不会回到行首。
Unix系统里,每行结尾只有“<换行>”,即”\n”;Windows系统里面,每行结尾是“<回车><换行>”,即“\r\n”;Mac系统里,每行结尾是“<回车>”,即”\r”;。一个直接后果是,Unix/Mac系统下的文件在Windows里打开的话,所有文字会变成一行;而Windows里的文件在Unix/Mac下打开的话,在每行的结尾可能会多出一个^M符号。
if not changed and crontab.n_existing != '':
if not (crontab.n_existing.endswith('\r') or crontab.n_existing.endswith('\n')):
changed = True
res_args = dict(
jobs=crontab.get_jobnames(),
envs=crontab.get_envnames(),
warnings=warnings,
changed=changed
)
if changed:
if not module.check_mode:
crontab.write()
if module._diff:
diff['after'] = crontab.render()
if crontab.cron_file:
diff['after_header'] = crontab.cron_file
else:
if crontab.user:
diff['after_header'] = 'crontab for user "%s"' % crontab.user
else:
diff['after_header'] = 'crontab'
res_args['diff'] = diff
if backup and not module.check_mode:
if changed:
res_args['backup_file'] = backup_file
else:
os.unlink(backup_file)
if cron_file:
res_args['cron_file'] = cron_file
module.exit_json(**res_args)
module.exit_json(msg="Unable to execute cron task.")
对env/job没有更改,但是现有的crontab需要终止换行符。仅在crontab或cron文件已更改时才保留备份。
到这里就结束了,不过说实话看这个cron模块的代码,我觉得代码质量并不高。总之在学习开源项目的代码时,应该抱有质疑的态度去学习。多思考为什么这样写或者自己有没有更好的思路,才会进步的更快。
赞赏
微信赞赏
支付宝赞赏
发表评论