Exploring get_url: The Ultimate wget Replacement for Ansible

In the previous command module, it was mentioned that get_url is actually a replacement module for the curl and wget commands. Let’s take a look at the documentation for this module:

  • Downloads files from HTTP, HTTPS, or FTP to a remote server. The remote server (must) have direct access to the remote resource.
  • By default, if the environment variable C(_proxy) is set on the target host, the request will be sent through that proxy. This behavior can be overridden by setting the variable for this task or using the use_proxy option.
  • HTTP redirection can redirect from HTTP to HTTPS, so you should ensure that the proxy environments for both protocols are correctly set.
  • Starting with Ansible 2.4, when running with C(-check), it will issue a HEAD request to validate the URL but will not download the entire file or validate against the hash.
  • For Windows targets, use the win_get_url module instead.
 import datetime
import os
import re
import shutil
import tempfile
import traceback

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.module_utils._text import to_native
from ansible.module_utils.urls import fetch_url, url_argument_spec

From the imported modules, we can see that they are quite common. The shutil module provides a series of high-level operations on files and file collections. It particularly provides functions that support file copying and deletion. Additionally, the fetch_url function from module_utils.urls, which is a wrapped request module, also needs to be analyzed separately.

Let’s start by looking at the code from the main function.

 def main():
    argument_spec = url_argument_spec()

    argument_spec['url_username']['aliases'] = ['username']
    argument_spec['url_password']['aliases'] = ['password']

    argument_spec.update(
        url=dict(type='str', required=True),
        dest=dict(type='path', required=True),
        backup=dict(type='bool', default=False),
        sha256sum=dict(type='str', default=''),
        checksum=dict(type='str', default=''),
        timeout=dict(type='int', default=10),
        headers=dict(type='dict'),
        tmp_dest=dict(type='path'),
    )

    module = AnsibleModule(
        argument_spec=argument_spec,
        add_file_common_args=True,
        supports_check_mode=True,
        mutually_exclusive=[['checksum', 'sha256sum']],
    )

Doesn’t this look different? In other modules, the main function usually starts with instantiating an AnsibleModule object. Here, the argument_spec is assigned from the urls module. Let’s follow the code.

 def url_argument_spec():
    return dict(
        url=dict(type='str'),
        force=dict(type='bool', default=False, aliases=['thirsty'],
                   deprecated_aliases=[dict(name='thirsty', version='2.13', collection_name='ansible.builtin')]),
        http_agent=dict(type='str', default='ansible-httpget'),
        use_proxy=dict(type='bool', default=True),
        validate_certs=dict(type='bool', default=True),
        url_username=dict(type='str'),
        url_password=dict(type='str', no_log=True),
        force_basic_auth=dict(type='bool', default=False),
        client_cert=dict(type='path'),
        client_key=dict(type='path'),
        use_gssapi=dict(type='bool', default=False),
    )

This creates a parameter specification that can be used with any module that will request content via urllib/urllib2. The get_url module’s argument_spec then adds/modifies the corresponding parameters based on this. Additionally, the checksum and sha256sum parameters are mutually exclusive.

     if module.params.get('thirsty'):
        module.deprecate('The alias "thirsty" has been deprecated and will be removed, use "force" instead',
                         version='2.13', collection_name='ansible.builtin')

    if module.params.get('sha256sum'):
        module.deprecate('The parameter "sha256sum" has been deprecated and will be removed, use "checksum" instead',
                         version='2.14', collection_name='ansible.builtin')

    url = module.params['url']
    dest = module.params['dest']
    backup = module.params['backup']
    force = module.params['force']
    sha256sum = module.params['sha256sum']
    checksum = module.params['checksum']
    use_proxy = module.params['use_proxy']
    timeout = module.params['timeout']
    headers = module.params['headers']
    tmp_dest = module.params['tmp_dest']

    result = dict(
        changed=False,
        checksum_dest=None,
        checksum_src=None,
        dest=dest,
        elapsed=0,
        url=url,
    )

When the module receives the thirsty and sha256sum parameters, it will notify the user that these parameters have been deprecated and should be replaced with force and checksum, respectively. Then, the module parameters are assigned to the corresponding variables, and a dictionary is created to store the result information.

 dest_is_dir = os.path.isdir(dest)
last_mod_time = None

if sha256sum:
    checksum = 'sha256:%s' % (sha256sum)

if checksum:
    try:
        algorithm, checksum = checksum.split(':', 1)
    except ValueError:
        module.fail_json(msg="The checksum parameter has to be in format :", **result)

    if is_url(checksum):
        checksum_url = checksum
        # download checksum file to checksum_tmpsrc
        checksum_tmpsrc, checksum_info = url_get(module, checksum_url, dest, use_proxy, last_mod_time, force, timeout, headers, tmp_dest)
        with open(checksum_tmpsrc) as f:
            lines = [line.rstrip('\n') for line in f]
        os.remove(checksum_tmpsrc)
        checksum_map = []
        for line in lines:
            parts = line.split(None, 1)
            if len(parts) == 2:
                checksum_map.append((parts[0], parts[1]))
        filename = url_filename(url)

        for cksum in (s for (s, f) in checksum_map if f.strip('./') == filename):
            checksum = cksum
            break
        else:
            checksum = None

        if checksum is None:
            module.fail_json(msg="Unable to find a checksum for file '%s' in '%s'" % (filename, checksum_url))
    checksum = re.sub(r'\W+', '', checksum).lower()
    try:
        int(checksum, 16)
    except ValueError:
        module.fail_json(msg='The checksum format is invalid', **result)

Determine if the provided dest parameter is a directory. Handle the deprecated sha256sum parameter by converting it to the checksum format, and then process the checksum. Specify the checksum and parse the algorithm and checksum.

Then, the is_url function is called to check the checksum, download the checksum file to checksum_tmpsrc, and read the file, putting each line into a list. The effect of line.split(None, 1) and line.split() is the same, but the former needs to use None as a positional argument when passing parameters.

 for cksum in (s for (s, f) in checksum_map if f.strip('./') == filename):

In each line of the checksum file, find the hash corresponding to the filename in the URL and return the first found hash. Here, a tuple comprehension is used. Many people are familiar with list comprehensions, but tuple comprehensions are less well-known. A brief introduction:

Tuple comprehensions can quickly generate a tuple that meets specified requirements using data types such as range, tuples, lists, dictionaries, and sets.

The syntax format for tuple comprehensions is as follows: (expression for iteration_variable in iterable_object [if condition_expression])

For example, we can use the following code to generate a tuple containing the numbers 1 to 9:

a = (x for x in range(1,10))
print(a)

The result of running this code is: at 0x0000020BAD136620>

From the execution result above, it can be seen that the result generated using a tuple comprehension is not a tuple but a generator object, which is different from a list comprehension.

Back to the code, after obtaining the checksum, remove all non-alphanumeric characters, including the infamous Unicode zero-width space, using \W for matching and replacing. Then use int conversion to ensure the checksum part is hexadecimal.

In the code above, other functions within the module are called, which we will look at one by one.

def is_url(checksum):
    supported_schemes = ('http', 'https', 'ftp', 'file')

    return urlsplit(checksum).scheme in supported_schemes

If the checksum value supports the URL protocol, it returns True; otherwise, it returns False. Now let’s look at the code for the url_get function.

def url_get(module, url, dest, use_proxy, last_mod_time, force, timeout=10, headers=None, tmp_dest='', method='GET'):
    start = datetime.datetime.utcnow()
    rsp, info = fetch_url(module, url, use_proxy=use_proxy, force=force, last_mod_time=last_mod_time, timeout=timeout, headers=headers, method=method)
    elapsed = (datetime.datetime.utcnow() - start).seconds

    if info['status'] == 304:
        module.exit_json(url=url, dest=dest, changed=False, msg=info.get('msg', ''), status_code=info['status'], elapsed=elapsed)

    if info['status'] == -1:
        module.fail_json(msg=info['msg'], url=url, dest=dest, elapsed=elapsed)

    if info['status'] != 200 and not url.startswith('file:/') and not (url.startswith('ftp:/') and info.get('msg', '').startswith('OK')):
        module.fail_json(msg="Request failed", status_code=info['status'], response=info['msg'], url=url, dest=dest, elapsed=elapsed)

    if tmp_dest:
        tmp_dest_is_dir = os.path.isdir(tmp_dest)
        if not tmp_dest_is_dir:
            if os.path.exists(tmp_dest):
                module.fail_json(msg="%s is a file but should be a directory." % tmp_dest, elapsed=elapsed)
            else:
                module.fail_json(msg="%s directory does not exist." % tmp_dest, elapsed=elapsed)
    else:
        tmp_dest = module.tmpdir

    fd, tempname = tempfile.mkstemp(dir=tmp_dest)

    f = os.fdopen(fd, 'wb')
    try:
        shutil.copyfileobj(rsp, f)
    except Exception as e:
        os.remove(tempname)
        module.fail_json(msg="failed to create temporary content file: %s" % to_native(e), elapsed=elapsed, exception=traceback.format_exc())
    f.close()
    rsp.close()
    return tempname, info

Download data from the URL and store it in a temporary file, returning (temporary file, request information). The time difference is recorded by subtracting twice. It can be seen that the actual request is made by calling fetch_url, which returns two parameters.

(response, info) tuple. Use response.read() to read the data. Info contains “status” and other metadata. When an HttpError (status >= 400) occurs, “info[‘body’]” will contain the error response data.

Here, the request status code and corresponding error information are judged.

  • 304 status code: If the client sends a conditional GET request and the request is allowed, but the content of the document has not changed (since the last access or according to the request conditions).
  • Exceptions in fetch_url may result in a status of -1, ensuring that appropriate errors are provided to the user in all cases.
  • Non-200 status codes and incorrect protocol situations.

Next, determine the tmp_dest parameter and whether it is an existing path. Create a temporary file and copy the content for checksum-based replacement. Here, tempfile.mkstemp() is used. The mkstemp() function only returns a raw OS file descriptor, which you need to convert to a real file object yourself, and you also need to clean up these files yourself.

Therefore, the os.fdopen() method is used to create a file object through the file descriptor fd and return this file object. This method is an alias for the built-in function open() and can accept the same parameters. The only difference is that the first parameter of fdopen() must be an integer. The function of shutil.copyfileobj() is to copy the content of one file to another file. Finally, close the file operation.

Back to the main function code.

if not dest_is_dir and os.path.exists(dest):
    checksum_mismatch = False

    if not force and checksum != '':
        destination_checksum = module.digest_from_file(dest, algorithm)

        if checksum != destination_checksum:
            checksum_mismatch = True

    if not force and checksum and not checksum_mismatch:
        file_args = module.load_file_common_arguments(module.params, path=dest)
        result['changed'] = module.set_fs_attributes_if_different(file_args, False)
        if result['changed']:
            module.exit_json(msg="file already exists but file attributes changed", **result)
        module.exit_json(msg="file already exists", **result)

    mtime = os.path.getmtime(dest)
    last_mod_time = datetime.datetime.utcfromtimestamp(mtime)

    if checksum_mismatch:
        force = True

start = datetime.datetime.utcnow()
method = 'HEAD' if module.check_mode else 'GET'
tmpsrc, info = url_get(module, url, dest, use_proxy, last_mod_time, force, timeout, headers, tmp_dest, method)
result['elapsed'] = (datetime.datetime.utcnow() - start).seconds
result['src'] = tmpsrc

If not forced to download and there is a checksum, allow matching the checksum to skip the download. The function of module.digest_from_file is to return the hexadecimal digest of the local file to obtain the digest_method specified by the name; if the file does not exist, it returns None.

Unless the checksum does not match, do not force a re-download. Unless the checksum does not match, allow file attribute changes, do not force a re-download. If the file already exists, the last modification time will be requested. If the checksum does not match, it must be forced to download because last_mod_time may be newer than the remote one.

After the above series of judgments, start downloading to tmpsrc. If the module.check_mode attribute is true, use the HEAD method to request, and elapsed is the time spent on the request.

if dest_is_dir:
    filename = extract_filename_from_headers(info)
    if not filename:
        filename = url_filename(info['url'])
    dest = os.path.join(dest, filename)
    result['dest'] = dest

if not os.path.exists(tmpsrc):
    os.remove(tmpsrc)
    module.fail_json(msg="Request failed", status_code=info['status'], response=info['msg'], **result)
if not os.access(tmpsrc, os.R_OK):
    os.remove(tmpsrc)
    module.fail_json(msg="Source %s is not readable" % (tmpsrc), **result)
result['checksum_src'] = module.sha1(tmpsrc)

When the dest path exists, call extract_filename_from_headers to parse the filename from the header fields returned by the previous url_get request. If not obtained, call url_filename to extract the path from the URL as the filename. Concatenate filename and dest to get the final file path.

Next, if there is no tmpsrc file, an error will be raised. The os.access() method attempts to access the path using the current uid/gid to check if there is access permission to the path. Finally, calculate the hash of tmpsrc.

Here, the functions `extract_filename_from_headers` and `url_filename` are used. Let’s look at the code for each.

 
def extract_filename_from_headers(headers):
    cont_disp_regex = 'attachment; ?filename="?([^"]+)'
    res = None

    if 'content-disposition' in headers:
        cont_disp = headers['content-disposition']
        match = re.match(cont_disp_regex, cont_disp)
        if match:
            res = match.group(1)
            res = os.path.basename(res)
    return res

This function extracts the filename from a given header dictionary. It searches for the content-disposition header and applies a regular expression. If successful, it returns the filename; otherwise, it returns None. The use of `os.path.basename` is to avoid matching some interesting results.

 
def url_filename(url):
    fn = os.path.basename(urlsplit(url)[2])
    if fn == '':
        return 'index.html'
    return fn

This function splits the URL and returns ‘index.html’ if the result is empty. It’s very simple. Now, let’s return to the code in the main function.

     
if os.path.exists(dest):
    if not os.access(dest, os.W_OK):
        os.remove(tmpsrc)
        module.fail_json(msg="Destination %s is not writable" % (dest), **result)
    if not os.access(dest, os.R_OK):
        os.remove(tmpsrc)
        module.fail_json(msg="Destination %s is not readable" % (dest), **result)
    result['checksum_dest'] = module.sha1(dest)
else:
    if not os.path.exists(os.path.dirname(dest)):
        os.remove(tmpsrc)
        module.fail_json(msg="Destination %s does not exist" % (os.path.dirname(dest)), **result)
    if not os.access(os.path.dirname(dest), os.W_OK):
        os.remove(tmpsrc)
        module.fail_json(msg="Destination %s is not writable" % (os.path.dirname(dest)), **result)

if module.check_mode:
    if os.path.exists(tmpsrc):
        os.remove(tmpsrc)
    result['changed'] = ('checksum_dest' not in result or
                         result['checksum_src'] != result['checksum_dest'])
    module.exit_json(msg=info.get('msg', ''), **result)

If there are no permissions to copy to `dest`, an error is raised. This code primarily checks permissions and reports the corresponding errors.

     
if module.check_mode:
    if os.path.exists(tmpsrc):
        os.remove(tmpsrc)
    result['changed'] = ('checksum_dest' not in result or
                         result['checksum_src'] != result['checksum_dest'])
    module.exit_json(msg=info.get('msg', ''), **result)

backup_file = None
if result['checksum_src'] != result['checksum_dest']:
    try:
        if backup:
            if os.path.exists(dest):
                backup_file = module.backup_local(dest)
        module.atomic_move(tmpsrc, dest, unsafe_writes=module.params['unsafe_writes'])
    except Exception as e:
        if os.path.exists(tmpsrc):
            os.remove(tmpsrc)
        module.fail_json(msg="failed to copy %s to %s: %s" % (tmpsrc, dest, to_native(e)),
                         exception=traceback.format_exc(), **result)
    result['changed'] = True
else:
    result['changed'] = False
    if os.path.exists(tmpsrc):
        os.remove(tmpsrc)

When `check_mode` is true, `tmpsrc` is deleted. The value of `changed` is determined based on whether `checksum_dest` exists or if `checksum_src` and `checksum_dest` are not equal.

When `checksum_src` and `checksum_dest` are not equal, the specified file is backed up with a date stamp using `backup_local`.

There are many `os.remove(tmpsrc)` calls in the above code. It seems redundant to have the remove operation in every conditional branch. It might be better to handle it at the end with a `try: os.remove(tmpsrc) except: pass`.

     
if checksum != '':
    destination_checksum = module.digest_from_file(dest, algorithm)

    if checksum != destination_checksum:
        os.remove(dest)
        module.fail_json(msg="The checksum for %s did not match %s; it was %s." % (dest, checksum, destination_checksum), **result)

file_args = module.load_file_common_arguments(module.params, path=dest)
result['changed'] = module.set_fs_attributes_if_different(file_args, result['changed'])

try:
    result['md5sum'] = module.md5(dest)
except ValueError:
    result['md5sum'] = None

if backup_file:
    result['backup_file'] = backup_file

# Mission complete
module.exit_json(msg=info.get('msg', ''), status_code=info.get('status', ''), **result)

It checks if `checksum` and `destination_checksum` are the same. If not, it reports an error.

It allows file attribute changes for backward compatibility, which will return None on FIPS-enabled systems. This is mentioned in the comments. I looked up the meaning of FIPS:

OpenSSL-FIPS is a FIPS-compliant version of OpenSSL. The Federal Information Processing Standards (FIPS) are a set of standards describing data processing, encryption algorithms, and other information technology standards used by non-military government agencies and contractors and vendors working with these agencies.

Anyway, I don’t really understand it, but it just means that the value of `md5sum` is null when an exception is caught.

After reading through the `get_url` code, it seems quite ordinary, even a bit redundant. After all, when learning from others’ code, it’s important to reflect, take the essence, and discard the dross. Maybe someday I’ll write an Ansible module that’s even better, haha.