Ansible源码分析之kown_hosts模块

Ansible源码分析之kown_hosts模块

kown_hosts大家应该都比较熟悉,其作用如下:
ssh会把你每个访问过计算机的公钥(public key)都记录在~/.ssh/known_hosts。当下次访问相同计算机时,OpenSSH会核对公钥。如果公钥不同,OpenSSH会发出警告, 避免你受到DNS Hijack之类的攻击。

known_hosts 模块可让您从known_hosts文件中添加或删除主机密钥。

Ansible中kown_hosts.py模块的描述

模块的介绍非常简单,说明该模块的功能也不复杂,直接来看源码。

import base64
import errno
import hashlib
import hmac
import os
import os.path
import re
import tempfile

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.file import FileLock
from ansible.module_utils._text import to_bytes, to_native

先看用到了哪些模块,常见的就不说了。

  • errno模块定义了一些符号错误码,如ENOENT(“找不到该目录”)以及EPERM(“没有权限”)。如果需要区分不同的错误代码,则可以使用符号名称。简单来说就是和异常处理有关的模块。
  • Hmac它通过一个标准算法,在计算哈希的过程中,把key混入计算过程中。hmac模块实现了HAMC算法,提供了相应的函数和方法,且与hashlib提供的api基本一致。
  • tempfile顾名思义用于创建临时文件。

对于module_utils的模块介绍会有单独的文章分析。下面先看模块的main函数。

def main():

    module = AnsibleModule(
        argument_spec=dict(
            name=dict(required=True, type='str', aliases=['host']),
            key=dict(required=False, type='str'),
            path=dict(default="~/.ssh/known_hosts", type='path'),
            hash_host=dict(required=False, type='bool', default=False),
            state=dict(default='present', choices=['absent', 'present']),
        ),
        supports_check_mode=True
    )

    results = enforce_state(module, module.params)
    module.exit_json(**results)

该模块的main函数相比其他模块非常简短,毕竟功能单一。先用AnsibleModule实例化moudle。argument_spec就是一个嵌套的map,其中的参数也就是我们在命令行写入或者playbook中要传入的参数。重点关注的就是required=True的,这些就是必须指定的。

接着调用了enforce_state函数,跟进该方法。

def enforce_state(module, params):
    """
    Add or remove key.
    """
    host = params["name"].lower()
    key = params.get("key", None)
    path = params.get("path")
    hash_host = params.get("hash_host")
    state = params.get("state")
    sshkeygen = module.get_bin_path("ssh-keygen", True)

    if not key and state != "absent":
        module.fail_json(msg="No key specified when adding a host")

    if key and hash_host:
        key = hash_host_key(host, key)

    if key and not key.endswith('\n'):
        key += '\n'

    sanity_check(module, host, key, sshkeygen)

    found, replace_or_add, found_line = search_for_host_key(module, host, key, path, sshkeygen)

    params['diff'] = compute_diff(path, found_line, replace_or_add, state, key)

    if module.check_mode:
        module.exit_json(changed=replace_or_add or (state == "present") != found,
                         diff=params['diff'])

    if found and not key and state == "absent":
        module.run_command([sshkeygen, '-R', host, '-f', path], check_rc=True)
        params['changed'] = True

enforce_state的代码有点长,我拆成两部分。注释中写了该函数的作用是添加和删除key。要求传入module和params。moudle也就是前面main中实例化的模块对象,以及模块的参数。

params是一个参数的字典,前几行都是赋值的操作。但是大家有没有发现用到了字典两种取值的方式。

host = params["name"].lower()
key = params.get("key", None)

用字典的get方法取值,如果键不存在不会报错而是返回指定的默认值,而dict[key]的形式,key不存在会抛异常。这里可以看出对于非必要参数的取值方式处理是不同的。然后是获取sshkeygen命令的绝对路径。

  • 当添加host但key没有赋值时向上层返回一个异常消息。
  • 当key存在且hash_host_key为真是,key的host进行hash处理。
  • 然后判断key是否以\n结尾,并添加上换行符。

这个很好理解,在kown_hosts文件中都是一个host一行,多个host不能写在一行。而进行hash处理是什么意思呢?这里我用ssh-keyscan来演示一下。

hash处理host

这里是获取主机上的公钥信息,而hash处理就是把我们的主机名给加密隐藏了,但并不会影响到密钥的使用。

  • 接着调用sanity_check函数检查提供的密钥是否合理。
  • search_for_host_key函数在known_hosts文件路径中查找主机和键类型;如果存在,则查看这些条目之一是否与键匹配。
  • compute_diff去比较添加或删除key后的差别

当判断check_mode开启的时候直接退出并返回参数信息。然后开始命令执行了,仅在找到整个主机且没有提供密钥的情况下将其删除。

接着看enforce_state函数的下半部分。

if replace_or_add or found != (state == "present"):
    try:
        inf = open(path, "r")
    except IOError as e:
        if e.errno == errno.ENOENT:
            inf = None
        else:
            module.fail_json(msg="Failed to read %s: %s" % (path, str(e)))
    try:
        with tempfile.NamedTemporaryFile(mode='w+', dir=os.path.dirname(path), delete=False) as outf:
            if inf is not None:
                for line_number, line in enumerate(inf):
                    if found_line == (line_number + 1) and (replace_or_add or state == 'absent'):
                        continue  # skip this line to replace its key
                    outf.write(line)
                inf.close()
            if state == 'present':
                outf.write(key)
    except (IOError, OSError) as e:
        module.fail_json(msg="Failed to write to file %s: %s" % (path, to_native(e)))
    else:
        module.atomic_move(outf.name, path)

    params['changed'] = True

return params

这里主要就是添加或删除的一个密钥了。这里有对文件的IO操作,读取path的文件。当异常为 errno.ENOENT,也就是是没有该文件或目录时,将inf的值为None,而不是向上层传递异常。

这里的path也就是我们指定的known_hosts路径。

path=dict(default="~/.ssh/known_hosts", type='path')

接着用tempfile模块在known_hosts所在的目录创建临时文件。

if found_line == (line_number + 1) and (replace_or_add or state == 'absent'):

这行代码会让人有些疑惑,后面分析search_for_host_key再讨论。其余的就读取的known_hosts写入临时文件,最后再用临时文件去替换原本的known_hosts文件完成增加/删除的操作。

在enforce_state中可以看到调用很多其他方法,我们逐步跟进分析。先看sanity_check的代码。

def sanity_check(module, host, key, sshkeygen):
    if not key:
        return

    if re.search(r'\S+(\s+)?,(\s+)?', host):
        module.fail_json(msg="Comma separated list of names is not supported. "
                             "Please pass a single name to lookup in the known_hosts file.")

    with tempfile.NamedTemporaryFile(mode='w+') as outf:
        try:
            outf.write(key)
            outf.flush()
        except IOError as e:
            module.fail_json(msg="Failed to write to temporary file %s: %s" %
                             (outf.name, to_native(e)))

        sshkeygen_command = [sshkeygen, '-F', host, '-f', outf.name]
        rc, stdout, stderr = module.run_command(sshkeygen_command)

    if stdout == '': 
        module.fail_json(msg="Host parameter does not match hashed host field in supplied key")

该方法用于检查提供的密钥是否合理。如果提供的主机与提供的密钥不一致,则该功能将退出,并向用户提供错误。

当传入的key为空时就直接返回了。然后用正则匹配host是否用逗号分隔。不支持以逗号分隔的名称列表。请在known_hosts文件中将一个名称传递给查找。

开发者在注释中写了这么一段话,同时代码也是这样实现的。

与其自己解析密钥,不如让ssh-keygen来做(这对于散列密钥是必不可少的,但在其他方面很有用,因为密钥问题是ssh-keygen是否认为密钥与主机匹配)。方法是将密钥写入临时文件,然后尝试在该文件中查找指定的主机。

def hash_host_key(host, key):
    hmac_key = os.urandom(20)
    hashed_host = hmac.new(hmac_key, to_bytes(host), hashlib.sha1).digest()
    parts = key.strip().split()
    i = 1 if parts[0][0] == '@' else 0
    parts[i] = '|1|%s|%s' % (to_native(base64.b64encode(hmac_key)), to_native(base64.b64encode(hashed_host)))
    return ' '.join(parts)

再来看hash_host_key的实现,用到了os.urandom。

os.urandom

os.urandom(n)函数在python官方文档中解释是:
返回一个有n个byte那么长的一个string,很适合用于加密。

接下来的也就是按照格式对字符串的替换操作。和上面的 hash处理host 图中的效果一样。

def search_for_host_key(module, host, key, path, sshkeygen):
    if os.path.exists(path) is False:
        return False, False, None

    sshkeygen_command = [sshkeygen, '-F', host, '-f', path]

    rc, stdout, stderr = module.run_command(sshkeygen_command, check_rc=False)
    if stdout == '' and stderr == '' and (rc == 0 or rc == 1):
        return False, False, None  
    if rc != 0: 
        module.fail_json(msg="ssh-keygen failed (rc=%d, stdout='%s',stderr='%s')" % (rc, stdout, stderr))

    if not key:
        return True, False, None

    lines = stdout.split('\n')
    new_key = normalize_known_hosts_key(key)

    for lnum, l in enumerate(lines):
        if l == '':
            continue
        elif l[0] == '#':  
            try:
                found_line = int(re.search(r'found: line (\d+)', l).group(1))
            except IndexError:
                module.fail_json(msg="failed to parse output of ssh-keygen for line number: '%s'" % l)
        else:
            found_key = normalize_known_hosts_key(l)
            if new_key['host'][:3] == '|1|' and found_key['host'][:3] == '|1|': 
                new_key['host'] = found_key['host']
            if new_key == found_key: 
                return True, False, found_line  
            elif new_key['type'] == found_key['type']:  
                return True, True, found_line

    return True, True, None

在known_hosts文件路径中查找主机和键类型;如果存在,则查看这些条目之一是否与键匹配。再看参数和返回值。

search_for_host_key(module,host,key,path,sshkeygen) -> (found,replace_or_add,found_line)

found和replace_or_add都是布尔值,found_line为int或None。通过run_command执行命令,这里check_rc为False表示状态码为非0的时候不向上传递异常。可以看到这里rc有三种状态:
分别为:0,1 和 非0。

调用normalize_known_hosts_key对我们传入的密钥做规范化处理。前面对执行的输出stdout分隔成了列表,然后进行遍历。其中通过正则来匹配获取到密钥所在的行号。

至少从OpenSSH 4.0起,此输出格式已在ssh-keygen中进行了硬编码。总是在找到的键之前输出非本地化的注释。接着判断如果已经哈希,则不要更改主机哈希。找到完全相同的密钥,就不替换。或者是为相同的密钥类型找到了不同的密钥也会替换。

再回头看之前的疑惑,found_line == (line_number + 1),found_line是用正则匹配出来的行号,而line_number是索引从0开始,故+1。

def normalize_known_hosts_key(key):
    key = key.strip()  
    k = key.split()
    d = dict()
    
    if k[0][0] == '@':
        d['options'] = k[0]
        d['host'] = k[1]
        d['type'] = k[2]
        d['key'] = k[3]
    else:
        d['host'] = k[0]
        d['type'] = k[1]
        d['key'] = k[2]
    return d

再看normalize_known_hosts_key的实现。这里就是字符串的key按照格式转化成了结构化的数据字典,方便对比原本的key和新的key。

最后还有compute_diff方法,同样是在enforce_state中被调用的。

def compute_diff(path, found_line, replace_or_add, state, key):
    diff = {
        'before_header': path,
        'after_header': path,
        'before': '',
        'after': '',
    }
    try:
        inf = open(path, "r")
    except IOError as e:
        if e.errno == errno.ENOENT:
            diff['before_header'] = '/dev/null'
    else:
        diff['before'] = inf.read()
        inf.close()
    lines = diff['before'].splitlines(1)
    if (replace_or_add or state == 'absent') and found_line is not None and 1 <= found_line <= len(lines):
        del lines[found_line - 1]
    if state == 'present' and (replace_or_add or found_line is None):
        lines.append(key)
    diff['after'] = ''.join(lines)
    return diff

先定义一个字典diff,包含前后的键。读取之前的known_hosts作为before的值,注意splitlines(1)是保留换行符的,是为了保证前后的格式不变。并增加/删除密钥后赋值给after。

以上就是kown_hosts模块源码的分析,一篇完整的源码分析花费了整整一天的时间,不过收获也非常大,后续还会陆续分析其他模块的源码。

模块源码链接:https://github.com/ansible/ansible/blob/devel/lib/ansible/modules/known_hosts.py

写这篇文章时的源码:

https://zgao.top/download/ansible/known_hosts.py

赞赏

微信赞赏支付宝赞赏

Zgao

愿有一日,安全圈的师傅们都能用上Zgao写的工具。