ansible的find模块和linux上find命令非常相似,感觉是find命令的Python代码实现。文档的描述也很简单:根据特定条件返回文件列表。多个条件一起进行“与”运算。对于Windows目标,请改用 模块,不过实现上是差不多的。直接分析源码,先看用到的一些模块。
import fnmatch import grp import os import pwd import re import stat import time from ansible.module_utils._text import to_text, to_native from ansible.module_utils.basic import AnsibleModule
- fnmatch 模块主要用于文件名称的匹配,其能力比简单的字符串匹配更强大,但比使用正则表达式相比稍弱。如果在数据处理操作中,只需要使用简单的通配符就能完成文件名的匹配,则使用 fnmatch 模块是不错的选择。
- grp模块,提供了一个Unix 用户组/group(/etc/group)数据库的接口。
- pwd模块,提供了一个Unix 密码数据库(/etc/passwd)的接口,这个数据库包含本地机器用户账户信息。
- stat 模块定义了一些用于解析 os.stat(), os.fstat() 和 os.lstat() (如果它们存在) 输出结果的常量和函数。

def pfilter(f, patterns=None, excludes=None, use_regex=False): if not patterns and not excludes: return True if use_regex: if patterns and not excludes: for p in patterns: r = re.compile(p) if r.match(f): return True elif patterns and excludes: for p in patterns: r = re.compile(p) if r.match(f): for e in excludes: r = re.compile(e) if r.match(f): return False return True
else: if patterns and not excludes: for p in patterns: if fnmatch.fnmatch(f, p): return True elif patterns and excludes: for p in patterns: if fnmatch.fnmatch(f, p): for e in excludes: if fnmatch.fnmatch(f, e): return False return True return False
这里就是不使用正则的情况,所以这里匹配就用fnmatch.fnmatch来判断 filename 文件名,是否和指定 pattern 字符串匹配,不过和上面都是类似的。
def agefilter(st, now, age, timestamp): if age is None: return True elif age >= 0 and now - st.__getattribute__("st_%s" % timestamp) >= abs(age): return True elif age < 0 and now - st.__getattribute__("st_%s" % timestamp) <= abs(age): return True return False
agefilter传入的第一个参数是st,从上下文来看是 st = os.lstat(fsname),而os.lstat返回的是什么呢?

def __getattribute__(self, *args, **kwargs): """ Return getattr(self, name). """ pass
def sizefilter(st, size): if size is None: return True elif size >= 0 and st.st_size >= abs(size): return True elif size < 0 and st.st_size <= abs(size): return True return False
def contentfilter(fsname, pattern, read_whole_file=False): if pattern is None: return True prog = re.compile(pattern) try: with open(fsname) as f: if read_whole_file: return bool( for line in f: if prog.match(line): return True except Exception: pass return False
return bool(
if return True return False
def statinfo(st): pw_name = "" gr_name = "" try: # user data pw_name = pwd.getpwuid(st.st_uid).pw_name except Exception: pass try: # group data gr_name = grp.getgrgid(st.st_gid).gr_name except Exception: pass return { 'mode': "%04o" % stat.S_IMODE(st.st_mode), 'isdir': stat.S_ISDIR(st.st_mode), 'ischr': stat.S_ISCHR(st.st_mode), 'isblk': stat.S_ISBLK(st.st_mode), 'isreg': stat.S_ISREG(st.st_mode), 'isfifo': stat.S_ISFIFO(st.st_mode), 'islnk': stat.S_ISLNK(st.st_mode), 'issock': stat.S_ISSOCK(st.st_mode), 'uid': st.st_uid, 'gid': st.st_gid, 'size': st.st_size, 'inode': st.st_ino, 'dev': st.st_dev, 'nlink': st.st_nlink, 'atime': st.st_atime, 'mtime': st.st_mtime, 'ctime': st.st_ctime, 'gr_name': gr_name, 'pw_name': pw_name, 'wusr': bool(st.st_mode & stat.S_IWUSR), 'rusr': bool(st.st_mode & stat.S_IRUSR), 'xusr': bool(st.st_mode & stat.S_IXUSR), 'wgrp': bool(st.st_mode & stat.S_IWGRP), 'rgrp': bool(st.st_mode & stat.S_IRGRP), 'xgrp': bool(st.st_mode & stat.S_IXGRP), 'woth': bool(st.st_mode & stat.S_IWOTH), 'roth': bool(st.st_mode & stat.S_IROTH), 'xoth': bool(st.st_mode & stat.S_IXOTH), 'isuid': bool(st.st_mode & stat.S_ISUID), 'isgid': bool(st.st_mode & stat.S_ISGID), }

‘wusr’: bool(st.st_mode & stat.S_IWUSR)

def main(): module = AnsibleModule( argument_spec=dict( paths=dict(type='list', required=True, aliases=['name', 'path'], elements='str'), patterns=dict(type='list', default=['*'], aliases=['pattern'], elements='str'), excludes=dict(type='list', aliases=['exclude'], elements='str'), contains=dict(type='str'), read_whole_file=dict(type='bool', default=False), file_type=dict(type='str', default="file", choices=['any', 'directory', 'file', 'link']), age=dict(type='str'), age_stamp=dict(type='str', default="mtime", choices=['atime', 'ctime', 'mtime']), size=dict(type='str'), recurse=dict(type='bool', default=False), hidden=dict(type='bool', default=False), follow=dict(type='bool', default=False), get_checksum=dict(type='bool', default=False), use_regex=dict(type='bool', default=False), depth=dict(type='int'), ), supports_check_mode=True, ) params = module.params filelist = []
if params['age'] is None: age = None else: m = re.match(r"^(-?\d+)(s|m|h|d|w)?$", params['age'].lower()) seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800} if m: age = int( * seconds_per_unit.get(, 1) else: module.fail_json(age=params['age'], msg="failed to process age") if params['size'] is None: size = None else: m = re.match(r"^(-?\d+)(b|k|m|g|t)?$", params['size'].lower()) bytes_per_unit = {"b": 1, "k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4} if m: size = int( * bytes_per_unit.get(, 1) else: module.fail_json(size=params['size'], msg="failed to process size")
now = time.time() msg = '' looked = 0 for npath in params['paths']: npath = os.path.expanduser(os.path.expandvars(npath)) try: if not os.path.isdir(npath): raise Exception("'%s' is not a directory" % to_native(npath)) for root, dirs, files in os.walk(npath, followlinks=params['follow']): looked = looked + len(files) + len(dirs) for fsobj in (files + dirs): fsname = os.path.normpath(os.path.join(root, fsobj)) if params['depth']: wpath = npath.rstrip(os.path.sep) + os.path.sep depth = int(fsname.count(os.path.sep)) - int(wpath.count(os.path.sep)) + 1 if depth > params['depth']: continue if os.path.basename(fsname).startswith('.') and not params['hidden']: continue
- os.path.expanduser(path):把path中包含的”~”和”~user”转换成用户目录
- os.path.expandvars(path):根据环境变量的值替换path中包含的”$name”和”${name}”
- os.path.isdir(path):判断路径是否为目录
- os.path.normpath(path):规范path字符串形式
- os.path.basename(path):返回文件名
- os.path.sep:常量字符串“/”

wpath = npath.rstrip(os.path.sep) + os.path.sep depth = int(fsname.count(os.path.sep)) - int(wpath.count(os.path.sep)) + 1
乍一看rstrip(os.path.sep) + os.path.sep感觉不对劲,为什么要先去掉”/”,再加上”/”。其实rstrip没有匹配到文本就返回的本身,这里是为了不管路径最后有没有斜杠,最后都添加一个上去。因为通常我们可能会这样写,用切片的方式去判断。
if npath[-1] != "\\": npath += "\\"
try: st = os.lstat(fsname) except (IOError, OSError) as e: msg += "Skipped entry '%s' due to this access issue: %s\n" % (fsname, to_text(e)) continue r = {'path': fsname} if params['file_type'] == 'any': if pfilter(fsobj, params['patterns'], params['excludes'], params['use_regex']) and agefilter(st, now, age, params['age_stamp']): r.update(statinfo(st)) if stat.S_ISREG(st.st_mode) and params['get_checksum']: r['checksum'] = module.sha1(fsname) filelist.append(r) elif stat.S_ISDIR(st.st_mode) and params['file_type'] == 'directory': if pfilter(fsobj, params['patterns'], params['excludes'], params['use_regex']) and agefilter(st, now, age, params['age_stamp']): r.update(statinfo(st)) filelist.append(r)
我们前面知道statinfo函数返回的是一个包含文件所有信息的字典,用字典的update将信息合并,并对文件路径取哈希值,而目录是不取哈希的,也就是两个if的区别。从这里可知filelist就是一个 lsit[dict{}]所有文件信息字典的列表。
elif stat.S_ISREG(st.st_mode) and params['file_type'] == 'file': if pfilter(fsobj, params['patterns'], params['excludes'], params['use_regex']) and \ agefilter(st, now, age, params['age_stamp']) and \ sizefilter(st, size) and contentfilter(fsname, params['contains'], params['read_whole_file']): r.update(statinfo(st)) if params['get_checksum']: r['checksum'] = module.sha1(fsname) filelist.append(r) elif stat.S_ISLNK(st.st_mode) and params['file_type'] == 'link': if pfilter(fsobj, params['patterns'], params['excludes'], params['use_regex']) and agefilter(st, now, age, params['age_stamp']): r.update(statinfo(st)) filelist.append(r) if not params['recurse']: break except Exception as e: warn = "Skipped '%s' path due to this access issue: %s\n" % (npath, to_text(e)) module.warn(warn) msg += warn matched = len(filelist) module.exit_json(files=filelist, changed=False, msg=msg, matched=matched, examined=looked)
stat.S_ISREG(mode):如果 mode 来自一个常规文件则返回非零值。同时文件类型为file。这里应该是对常规文件的处理,也就多了一个sizefilter函数调用。同样为link链接时是不取哈希的,总的来说都是差不多的。最后还是对错误的一些处理,基本上每个模块相同的部分。