Ansible 高级功能:解锁强大自动化能力

Ansible 高级功能:解锁强大自动化能力

异步任务

对于长时间运行的任务,使用异步执行可以提高效率。

基本异步任务

---
- name: 异步执行任务
  hosts: all
  tasks:
    - name: 下载大文件
      get_url:
        url: http://example.com/large_file.iso
        dest: /tmp/large_file.iso
      async: 3600          # 最长等待时间(秒)
      poll: 30             # 每 30 秒检查一次

    - name: 等待任务完成
      async_status:
        jid: "{{ async_result.ansible_job_id }}"
      register: job_result
      until: job_result.finished
      retries: 120
      delay: 30

fire and forget(不等待)

- name: 启动后台进程
  command: /usr/local/bin/long_running_task.sh
  async: 3600
  poll: 0                 # 不等待,立即继续
  register: async_result

- name: 记录任务 ID
  debug:
    msg: "任务 ID: {{ async_result.ansible_job_id }}"

批量异步任务

- name: 在多个主机上并行执行长任务
  shell: /usr/local/bin/data_processing.sh
  async: 7200
  poll: 60
  loop: "{{ groups['data_servers'] }}"
  delegate_to: "{{ item }}"

循环高级技巧

嵌套循环

---
- name: 嵌套循环示例
  hosts: all
  tasks:
    - name: 配置多个服务的多个参数
      lineinfile:
        path: "/etc/{{ item.service }}/{{ item.service }}.conf"
        line: "{{ item.param }} = {{ item.value }}"
      loop:
        - { service: 'nginx', param: 'worker_processes', value: '4' }
        - { service: 'nginx', param: 'worker_connections', value: '1024' }
        - { service: 'mysql', param: 'max_connections', value: '200' }
        - { service: 'mysql', param: 'innodb_buffer_pool_size', value: '256M' }

循环字典

- name: 遍历字典
  debug:
    msg: "{{ item.key }}: {{ item.value }}"
  loop: "{{ my_dict | dict2items }}"

# 在 Playbook 中定义字典
my_dict:
  nginx:
    port: 80
    worker_processes: auto
  mysql:
    port: 3306
    max_connections: 200

循环注册变量

- name: 循环并注册结果
  uri:
    url: "{{ item }}"
    method: GET
  loop:
    - http://api1.example.com/health
    - http://api2.example.com/health
    - http://api3.example.com/health
  register: health_checks
  ignore_errors: yes

- name: 显示失败的健康检查
  debug:
    msg: "{{ item.item }} 返回状态码 {{ item.status }}"
  loop: "{{ health_checks.results }}"
  when: item.failed

带索引的循环

- name: 创建多个目录并显示索引
  file:
    path: "/opt/app/dir{{ index }}"
    state: directory
  loop:
    - data
    - logs
    - cache
    - backups
  loop_control:
    index_var: index
    label: "{{ item }}"

复杂条件

嵌套条件

- name: 复杂嵌套条件
  debug:
    msg: "满足所有条件"
  when:
    - ansible_distribution == "Ubuntu"
    - ansible_distribution_version == "22.04"
    - (ansible_memtotal_mb | int) >= 4096
    - "'web' in group_names"

# 使用 or 和 and
- name: 混合条件
  debug:
    msg: "系统满足要求"
  when: >
    (ansible_distribution == "Ubuntu" or ansible_distribution == "Debian")
    and
    (ansible_memtotal_mb | int) >= 2048

条件与过滤器

- name: 使用过滤器的条件
  debug:
    msg: "磁盘空间不足"
  when: >
    ansible_mounts
    | selectattr('mount', 'equalto', '/')
    | map(attribute='size_available')
    | first
    | int < 1073741824  # 1GB

多变量条件

- name: 检查多个条件
  block:
    - name: 执行部署
      debug:
        msg: "开始部署"

  when:
    - deployment_enabled | default(false)
    - ansible_system == "Linux"
    - not maintenance_mode | default(false)

错误处理

block-rescue-always

---
- name: 安全部署流程
  hosts: app_servers
  become: yes

  tasks:
    - name: 部署块
      block:
        - name: 备份当前版本
          shell: cp -r {{ app_dir }} {{ backup_dir }}/{{ ansible_date_time.epoch }}
          register: backup_result

        - name: 部署新版本
          git:
            repo: "{{ app_repo }}"
            dest: "{{ app_dir }}"
            version: "{{ app_version }}"

        - name: 验证部署
          uri:
            url: http://localhost:8080/health
            status_code: 200

      rescue:
        - name: 部署失败,回滚
          shell: cp -r {{ backup_dir }}/{{ backup_result.stdout }}/* {{ app_dir }}/
          when: backup_result.skipped is not defined

        - name: 重启服务
          service:
            name: myapp
            state: restarted

        - name: 发送告警
          debug:
            msg: "部署失败,已回滚到备份版本"

      always:
        - name: 清理临时文件
          file:
            path: /tmp/deploy_temp
            state: absent

failed_when

# 自定义失败条件
- name: 检查磁盘空间
  shell: df -h / | tail -1 | awk '{print $5}' | cut -d'%' -f1
  register: disk_usage
  changed_when: false
  failed_when: disk_usage.stdout | int > 90

# 多条件失败
- name: 检查应用状态
  shell: /usr/local/bin/check_app.sh
  register: app_status
  failed_when:
    - "'ERROR' in app_status.stdout"
    - app_status.rc != 0

# 不设置失败条件
- name: 即使命令失败也继续
  command: /usr/local/bin/optional_check.sh
  failed_when: false

ignore_errors

- name: 尝试停止服务(可能不存在)
  service:
    name: oldservice
    state: stopped
  ignore_errors: yes
  register: stop_result

- name: 检查服务状态
  debug:
    msg: "服务 {{ '已停止' if not stop_result.failed else '不存在' }}"

自定义模块

创建自定义模块

library/custom_module.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ansible.module_utils.basic import AnsibleModule

def main():
    module = AnsibleModule(
        argument_spec={
            'name': {'required': True, 'type': 'str'},
            'state': {'default': 'present', 'choices': ['present', 'absent']},
            'path': {'default': '/tmp', 'type': 'path'},
        },
        supports_check_mode=True
    )

    name = module.params['name']
    state = module.params['state']
    path = module.params['path']

    # 模块逻辑
    changed = False
    message = ""

    if state == 'present':
        # 创建资源
        message = f"资源 {name} 已创建"
        changed = True
    else:
        # 删除资源
        message = f"资源 {name} 已删除"
        changed = True

    module.exit_json(
        changed=changed,
        msg=message,
        resource=name
    )

if __name__ == '__main__':
    main()

使用自定义模块

---
- name: 使用自定义模块
  hosts: all
  tasks:
    - name: 使用 custom_module
      custom_module:
        name: my_resource
        state: present
        path: /opt/resources
      register: result

    - name: 显示结果
      debug:
        var: result

自定义过滤器

创建过滤器插件

filter_plugins/custom_filters.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ansible import errors

def to_gb(value):
    """将字节转换为 GB"""
    try:
        return round(int(value) / 1024 / 1024 / 1024, 2)
    except (ValueError, TypeError):
        raise errors.AnsibleFilterError(f"Cannot convert {value} to GB")

def to_mb(value):
    """将字节转换为 MB"""
    try:
        return round(int(value) / 1024 / 1024, 2)
    except (ValueError, TypeError):
        raise errors.AnsibleFilterError(f"Cannot convert {value} to MB")

def list_to_dict(items, key_field='name'):
    """将列表转换为字典"""
    try:
        return {item[key_field]: item for item in items}
    except (KeyError, TypeError):
        raise errors.AnsibleFilterError(f"Cannot convert list to dict using key {key_field}")

def split_lines(text):
    """按行分割文本"""
    if isinstance(text, str):
        return text.splitlines()
    return []

class FilterModule(object):
    def filters(self):
        return {
            'to_gb': to_gb,
            'to_mb': to_mb,
            'list_to_dict': list_to_dict,
            'split_lines': split_lines,
        }

使用自定义过滤器

---
- name: 使用自定义过滤器
  hosts: all
  tasks:
    - name: 获取磁盘信息
      setup:
        filter: ansible_mounts
      register: mounts_info

    - name: 显示磁盘空间(GB)
      debug:
        msg: "根分区可用空间: {{ mounts_info.ansible_facts.ansible_mounts[0].size_available | to_gb }} GB"

    - name: 使用列表转字典
      debug:
        msg: "{{ servers | list_to_dict }}"

    - name: 分割文本
      debug:
        msg: "{{ config_text | split_lines }}"

自定义回调插件

创建回调插件

callback_plugins/custom_callback.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ansible.plugins.callback import CallbackBase
from datetime import datetime

class CallbackModule(CallbackBase):

    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'aggregate'
    CALLBACK_NAME = 'custom'

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_time = None

    def v2_playbook_on_start(self, playbook):
        self.start_time = datetime.now()
        self._display.display(f"Playbook 开始时间: {self.start_time}", color='C_GREEN')

    def v2_playbook_on_stats(self, stats):
        end_time = datetime.now()
        duration = end_time - self.start_time
        self._display.display(f"\nPlaybook 执行时间: {duration}", color='C_CYAN')

        hosts = sorted(stats.processed.keys())
        for host in hosts:
            stat = stats.summarize(host)
            self._display.display(f"\n主机: {host}")
            self._display.display(f"  成功: {stat['ok']}")
            self._display.display(f"  失败: {stat['failures']}")
            self._display.display(f"  变更: {stat['changed']}")

    def v2_runner_on_failed(self, result, ignore_errors=False):
        self._display.display(f"任务失败: {result._task.name}", color='C_RED')
        self._display.display(f"主机: {result._host.name}")
        if 'msg' in result._result:
            self._display.display(f"错误信息: {result._result['msg']}")

    def v2_runner_on_ok(self, result):
        if result._result.get('changed', False):
            self._display.display(f"任务变更: {result._task.name}", color='C_YELLOW')

启用回调插件

ansible.cfg:

[defaults]
callback_enabled = custom,timer,profile_tasks

策略插件

理解策略

# 默认策略(linear)
# 任务在所有主机上串行执行,每个任务完成后才进行下一个

# free 策略
# 任务在主机上自由执行,不等待其他主机
- name: 使用 free 策略
  hosts: all
  strategy: free

  tasks:
    - name: 独立任务
      debug:
        msg: "{{ inventory_hostname }}"

# debug 策略
# 用于调试,显示详细信息

缓存优化

Facts 缓存

ansible.cfg:

[defaults]
gathering = smart
fact_caching = jsonfile
fact_caching_connection = /tmp/ansible_facts
fact_caching_timeout = 86400

# 使用 Redis 缓存
# fact_caching = redis
# fact_caching_connection = localhost:6379:0
# fact_caching_timeout = 86400

动态 Inventory 高级用法

编写动态 Inventory 脚本

inventory/dynamic_inventory.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import json
import sys
import boto3

def get_ec2_instances():
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(
        Filters=[
            {'Name': 'tag:Environment', 'Values': ['production']},
            {'Name': 'instance-state-name', 'Values': ['running']}
        ]
    )

    inventory = {
        '_meta': {'hostvars': {}},
        'webservers': {'hosts': [], 'vars': {}},
        'databases': {'hosts': [], 'vars': {}},
        'all': {'hosts': [], 'vars': {}}
    }

    for reservation in response['Reservations']:
        for instance in reservation['Instances']:
            public_ip = instance.get('PublicIpAddress', instance['PrivateIpAddress'])
            name = next(
                (tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'),
                public_ip
            )

            # 根据 Tag 分组
            for tag in instance.get('Tags', []):
                if tag['Key'] == 'Role' and tag['Value'] == 'webserver':
                    inventory['webservers']['hosts'].append(name)
                elif tag['Key'] == 'Role' and tag['Value'] == 'database':
                    inventory['databases']['hosts'].append(name)

            inventory['all']['hosts'].append(name)
            inventory['_meta']['hostvars'][name] = {
                'ansible_host': public_ip,
                'instance_id': instance['InstanceId'],
                'instance_type': instance['InstanceType'],
            }

    return inventory

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == '--list':
        print(json.dumps(get_ec2_instances(), indent=2))
    elif len(sys.argv) > 1 and sys.argv[1] == '--host':
        host = sys.argv[2]
        print(json.dumps({}))

多环境部署

环境特定配置

# 目录结构
project/
├── inventory/
│   ├── production/
│   │   ├── hosts.ini
│   │   └── group_vars/
│   │       └── all.yml
│   ├── staging/
│   │   ├── hosts.ini
│   │   └── group_vars/
│   │       └── all.yml
│   └── development/
│       ├── hosts.ini
│       └── group_vars/
│           └── all.yml
├── playbooks/
│   └── site.yml
└── roles/
    └── ...

# inventory/production/group_vars/all.yml
environment: production
app_port: 80
log_level: error

# inventory/staging/group_vars/all.yml
environment: staging
app_port: 8080
log_level: info

使用变量覆盖

---
- name: 根据环境部署
  hosts: all
  vars:
    # 默认值
    database_host: localhost

  tasks:
    - name: 生产环境使用外部数据库
      set_fact:
        database_host: db.production.example.com
      when: environment == "production"

    - name: 配置数据库连接
      template:
        src: database.conf.j2
        dest: /etc/myapp/database.conf

总结

通过本教程,你已经了解了 Ansible 的高级功能:

  • 异步任务执行
  • 高级循环技巧
  • 复杂条件判断
  • 错误处理机制
  • 自定义模块开发
  • 自定义过滤器和回调插件
  • 缓存优化策略
  • 动态 Inventory 高级用法
  • 多环境部署方案

发表回复

后才能评论