Carving Unit Tests

So far, we have always generated system input, i.e. data that the program as a whole obtains via its input channels. If we are interested in testing only a small set of functions, having to go through the system can be very inefficient. This chapter introduces a technique known as carving, which, given a system test, automatically extracts a set of unit tests that replicate the calls seen during the system test. The key idea is to record such calls such that we can replay them later – as a whole or selectively. On top, we also explore how to synthesize API grammars from carved unit tests; this means that we can synthesize API tests without having to write a grammar at all.

Prerequisites

import APIFuzzer

Synopsis

To use the code provided in this chapter, write

>>> from fuzzingbook.Carver import <identifier>

and then make use of the following features.

This chapter provides means to record and replay function calls during a system test. Since individual function calls are much faster than a whole system run, such "carving" mechanisms have the potential to run tests much faster.

Recording Calls

The CallCarver class records all calls occurring while it is active. It is used in conjunction with a with clause:

>>> with CallCarver() as carver:
>>>     y = my_sqrt(2)
>>>     y = my_sqrt(4)

After execution, called_functions() lists the names of functions encountered:

>>> carver.called_functions()
['my_sqrt', '__exit__']

The arguments() method lists the arguments recorded for a function. This is a mapping of the function name to a list of lists of arguments; each argument is a pair (parameter name, value).

>>> carver.arguments('my_sqrt')
[[('x', 2)], [('x', 4)]]

Complex arguments are properly serialized, such that they can be easily restored.

Synthesizing Calls

While such recorded arguments already could be turned into arguments and calls, a much nicer alternative is to create a grammar for recorded calls. This allows synthesizing arbitrary combinations of arguments, and also offers a base for further customization of calls.

The CallGrammarMiner class turns a list of carved executions into a grammar.

>>> my_sqrt_miner = CallGrammarMiner(carver)
>>> my_sqrt_grammar = my_sqrt_miner.mine_call_grammar()
>>> my_sqrt_grammar
{'<start>': ['<call>'],
 '<call>': ['<my_sqrt>'],
 '<my_sqrt-x>': ['4', '2'],
 '<my_sqrt>': ['my_sqrt(<my_sqrt-x>)']}

This grammar can be used to synthesize calls.

>>> fuzzer = GrammarCoverageFuzzer(my_sqrt_grammar)
>>> fuzzer.fuzz()
'my_sqrt(2)'

These calls can be executed in isolation, effectively extracting unit tests from system tests:

>>> eval(fuzzer.fuzz())
2.0

System Tests vs Unit Tests

Remember the URL grammar introduced for grammar fuzzing? With such a grammar, we can happily test a Web browser again and again, checking how it reacts to arbitrary page requests.

Let us define a very simple "web browser" that goes and downloads the content given by the URL.

import urllib.parse
def webbrowser(url):
    """Download the http/https resource given by the URL"""
    import requests  # Only import if needed

    r = requests.get(url)
    return r.text

Let us apply this on fuzzingbook.org and measure the time, using the Timer class:

from Timer import Timer
with Timer() as webbrowser_timer:
    fuzzingbook_contents = webbrowser(
        "http://www.fuzzingbook.org/html/Fuzzer.html")

print("Downloaded %d bytes in %.2f seconds" %
      (len(fuzzingbook_contents), webbrowser_timer.elapsed_time()))
Downloaded 474685 bytes in 0.40 seconds
fuzzingbook_contents[:100]
'\n<!-- A html document -->\n<!-- \nwith standard nbconvert css layout\nwith standard nbconvert input/out'

A full web browser, of course, would also render the HTML content. We can achieve this using these commands (but we don't, as we do not want to replicate the entire Web page here):

from IPython.display import HTML, display
HTML(fuzzingbook_contents)

Having to start a whole browser (or having it render a Web page) again and again means lots of overhead, though – in particular if we want to test only a subset of its functionality. In particular, after a change in the code, we would prefer to test only the subset of functions that is affected by the change, rather than running the well-tested functions again and again.

Let us assume we change the function that takes care of parsing the given URL and decomposing it into the individual elements – the scheme ("http"), the network location ("www.fuzzingbook.com"), or the path ("/html/Fuzzer.html"). This function is named urlparse():

from urllib.parse import urlparse
urlparse('https://www.fuzzingbook.com/html/Carver.html')
ParseResult(scheme='https', netloc='www.fuzzingbook.com', path='/html/Carver.html', params='', query='', fragment='')

You see how the individual elements of the URL – the scheme ("http"), the network location ("www.fuzzingbook.com"), or the path ("//html/Carver.html") are all properly identified. Other elements (like params, query, or fragment) are empty, because they were not part of our input.

The interesting thing is that executing only urlparse() is orders of magnitude faster than running all of webbrowser(). Let us measure the factor:

runs = 1000
with Timer() as urlparse_timer:
    for i in range(runs):
        urlparse('https://www.fuzzingbook.com/html/Carver.html')

avg_urlparse_time = urlparse_timer.elapsed_time() / 1000
avg_urlparse_time
1.8796670046867802e-06

Compare this to the time required by the web browser

webbrowser_timer.elapsed_time()
0.39847187499981374

The difference in time is huge:

webbrowser_timer.elapsed_time() / avg_urlparse_time
211990.67388333147

Hence, in the time it takes to run webbrowser() once, we can have tens of thousands of executions of urlparse() – and this does not even take into account the time it takes the browser to render the downloaded HTML, to run the included scripts, and whatever else happens when a Web page is loaded. Hence, strategies that allow us to test at the unit level are very promising as they can save lots of overhead.

Carving Unit Tests

Testing methods and functions at the unit level requires a very good understanding of the individual units to be tested as well as their interplay with other units. Setting up an appropriate infrastructure and writing unit tests by hand thus is demanding, yet rewarding. There is, however, an interesting alternative to writing unit tests by hand. The technique of carving automatically converts system tests into unit tests by means of recording and replaying function calls:

  1. During a system test (given or generated), we record all calls into a function, including all arguments and other variables the function reads.
  2. From these, we synthesize a self-contained unit test that reconstructs the function call with all arguments.
  3. This unit test can be executed (replayed) at any time with high efficiency.

In the remainder of this chapter, let us explore these steps.

Recording Calls

Our first challenge is to record function calls together with their arguments. (In the interest of simplicity, we restrict ourselves to arguments, ignoring any global variables or other non-arguments that are read by the function.) To record calls and arguments, we use the mechanism we introduced for coverage: By setting up a tracer function, we track all calls into individual functions, also saving their arguments. Just like Coverage objects, we want to use Carver objects to be able to be used in conjunction with the with statement, such that we can trace a particular code block:

with Carver() as carver:
    function_to_be_traced()
c = carver.calls()

The initial definition supports this construct:

\todo{Get tracker from dynamic invariants}

import sys
class Carver:
    def __init__(self, log=False):
        self._log = log
        self.reset()

    def reset(self):
        self._calls = {}

    # Start of `with` block
    def __enter__(self):
        self.original_trace_function = sys.gettrace()
        sys.settrace(self.traceit)
        return self

    # End of `with` block
    def __exit__(self, exc_type, exc_value, tb):
        sys.settrace(self.original_trace_function)

The actual work takes place in the traceit() method, which records all calls in the _calls attribute. First, we define two helper functions:

import inspect
def get_qualified_name(code):
    """Return the fully qualified name of the current function"""
    name = code.co_name
    module = inspect.getmodule(code)
    if module is not None:
        name = module.__name__ + "." + name
    return name
def get_arguments(frame):
    """Return call arguments in the given frame"""
    # When called, all arguments are local variables
    local_variables = frame.f_locals.copy()
    arguments = [(var, frame.f_locals[var])
                 for var in local_variables]
    arguments.reverse()  # Want same order as call
    return arguments
class CallCarver(Carver):
    def add_call(self, function_name, arguments):
        """Add given call to list of calls"""
        if function_name not in self._calls:
            self._calls[function_name] = []
        self._calls[function_name].append(arguments)

    # Tracking function: Record all calls and all args
    def traceit(self, frame, event, arg):
        if event != "call":
            return None

        code = frame.f_code
        function_name = code.co_name
        qualified_name = get_qualified_name(code)
        arguments = get_arguments(frame)

        self.add_call(function_name, arguments)
        if qualified_name != function_name:
            self.add_call(qualified_name, arguments)

        if self._log:
            print(simple_call_string(function_name, arguments))

        return None

Finally, we need some convenience functions to access the calls:

class CallCarver(CallCarver):
    def calls(self):
        """Return a dictionary of all calls traced."""
        return self._calls

    def arguments(self, function_name):
        """Return a list of all arguments of the given function
        as (VAR, VALUE) pairs.
        Raises an exception if the function was not traced."""
        return self._calls[function_name]

    def called_functions(self, qualified=False):
        """Return all functions called."""
        if qualified:
            return [function_name for function_name in self._calls.keys()
                    if function_name.find('.') >= 0]
        else:
            return [function_name for function_name in self._calls.keys()
                    if function_name.find('.') < 0]

Recording my_sqrt()

Let's try out our new Carver class – first on a very simple function:

from Intro_Testing import my_sqrt
with CallCarver() as sqrt_carver:
    my_sqrt(2)
    my_sqrt(4)

We can retrieve all calls seen...

sqrt_carver.calls()
{'my_sqrt': [[('x', 2)], [('x', 4)]],
 'Intro_Testing.my_sqrt': [[('x', 2)], [('x', 4)]],
 '__exit__': [[('tb', None),
   ('exc_value', None),
   ('exc_type', None),
   ('self', <__main__.CallCarver at 0x1182c85e0>)]]}
sqrt_carver.called_functions()
['my_sqrt', '__exit__']

... as well as the arguments of a particular function:

sqrt_carver.arguments("my_sqrt")
[[('x', 2)], [('x', 4)]]

We define a convenience function for nicer printing of these lists:

def simple_call_string(function_name, argument_list):
    """Return function_name(arg[0], arg[1], ...) as a string"""
    return function_name + "(" + \
        ", ".join([var + "=" + repr(value)
                   for (var, value) in argument_list]) + ")"
for function_name in sqrt_carver.called_functions():
    for argument_list in sqrt_carver.arguments(function_name):
        print(simple_call_string(function_name, argument_list))
my_sqrt(x=2)
my_sqrt(x=4)
__exit__(tb=None, exc_value=None, exc_type=None, self=<__main__.CallCarver object at 0x1182c85e0>)

This is a syntax we can directly use to invoke my_sqrt() again:

eval("my_sqrt(x=2)")
1.414213562373095

Carving urlparse()

What happens if we apply this to webbrowser()?

with CallCarver() as webbrowser_carver:
    webbrowser("https://www.fuzzingbook.org")

We see that retrieving a URL from the Web requires quite some functionality:

function_list = webbrowser_carver.called_functions(qualified=True)
len(function_list)
365
print(function_list[:50])
['requests.api.get', 'requests.api.request', 'requests.sessions.__init__', 'requests.utils.default_headers', 'requests.utils.default_user_agent', 'requests.structures.__init__', 'collections.abc.update', 'abc.__instancecheck__', 'requests.structures.__setitem__', 'requests.hooks.default_hooks', 'requests.hooks.<dictcomp>', 'requests.cookies.cookiejar_from_dict', 'http.cookiejar.__init__', 'threading.RLock', 'http.cookiejar.__iter__', 'requests.cookies.<listcomp>', 'http.cookiejar.deepvalues', 'http.cookiejar.vals_sorted_by_key', 'requests.adapters.__init__', 'urllib3.util.retry.__init__', 'urllib3.util.retry.<listcomp>', 'requests.adapters.init_poolmanager', 'urllib3.poolmanager.__init__', 'urllib3.request.__init__', 'urllib3._collections.__init__', 'requests.sessions.mount', 'requests.sessions.<listcomp>', 'requests.sessions.__enter__', 'requests.sessions.request', 'requests.models.__init__', 'requests.sessions.prepare_request', 'requests.cookies.merge_cookies', 'requests.cookies.update', 'requests.utils.get_netrc_auth', 'collections.abc.get', 'os.__getitem__', 'os.encode', 'requests.utils.<genexpr>', 'posixpath.expanduser', 'posixpath._get_sep', 'collections.abc.__contains__', 'os.decode', 'genericpath.exists', 'urllib.parse.urlparse', 'urllib.parse._coerce_args', 'urllib.parse.urlsplit', 'urllib.parse._splitnetloc', 'urllib.parse._checknetloc', 'urllib.parse._noop', 'netrc.__init__']

Among several other functions, we also have a call to urlparse():

urlparse_argument_list = webbrowser_carver.arguments("urllib.parse.urlparse")
urlparse_argument_list
[[('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')],
 [('allow_fragments', True),
  ('scheme', ''),
  ('url', 'https://www.fuzzingbook.org/')]]

Again, we can convert this into a well-formatted call:

urlparse_call = simple_call_string("urlparse", urlparse_argument_list[0])
urlparse_call
"urlparse(allow_fragments=True, scheme='', url='https://www.fuzzingbook.org')"

Again, we can re-execute this call:

eval(urlparse_call)
ParseResult(scheme='https', netloc='www.fuzzingbook.org', path='', params='', query='', fragment='')

We now have successfully carved the call to urlparse() out of the webbrowser() execution.

Replaying Calls

Replaying calls in their entirety and in all generality is tricky, as there are several challenges to be addressed. These include:

  1. We need to be able to access individual functions. If we access a function by name, the name must be in scope. If the name is not visible (for instance, because it is a name internal to the module), we must make it visible.

  2. Any resources accessed outside of arguments must be recorded and reconstructed for replay as well. This can be difficult if variables refer to external resources such as files or network resources.

  3. Complex objects must be reconstructed as well.

These constraints make carving hard or even impossible if the function to be tested interacts heavily with its environment. To illustrate these issues, consider the email.parser.parse() method that is invoked in webbrowser():

email_parse_argument_list = webbrowser_carver.arguments("email.parser.parse")

Calls to this method look like this:

email_parse_call = simple_call_string(
    "email.parser.Parser.parse",
    email_parse_argument_list[0])
email_parse_call
'email.parser.Parser.parse(headersonly=False, fp=<_io.StringIO object at 0x13f099090>, self=<email.parser.Parser object at 0x1082adde0>)'

We see that email.parser.Parser.parse() is part of a email.parser.Parser object (self) and it gets a StringIO object (fp). Both are non-primitive values. How could we possibly reconstruct them?

Serializing Objects

The answer to the problem of complex objects lies in creating a persistent representation that can be reconstructed at later points in time. This process is known as serialization; in Python, it is also known as pickling. The pickle module provides means to create a serialized representation of an object. Let us apply this on the email.parser.Parser object we just found:

import pickle
email_parse_argument_list
[[('headersonly', False),
  ('fp', <_io.StringIO at 0x13f099090>),
  ('self', <email.parser.Parser at 0x1082adde0>)]]
parser_object = email_parse_argument_list[0][2][1]
parser_object
<email.parser.Parser at 0x1082adde0>
pickled = pickle.dumps(parser_object)
pickled
b'\x80\x04\x95w\x00\x00\x00\x00\x00\x00\x00\x8c\x0cemail.parser\x94\x8c\x06Parser\x94\x93\x94)\x81\x94}\x94(\x8c\x06_class\x94\x8c\x0bhttp.client\x94\x8c\x0bHTTPMessage\x94\x93\x94\x8c\x06policy\x94\x8c\x11email._policybase\x94\x8c\x08Compat32\x94\x93\x94)\x81\x94ub.'

From this string representing the serialized email.parser.Parser object, we can recreate the Parser object at any time:

unpickled_parser_object = pickle.loads(pickled)
unpickled_parser_object
<email.parser.Parser at 0x13f1c4490>

The serialization mechanism allows us to produce a representation for all objects passed as parameters (assuming they can be pickled, that is). We can now extend the simple_call_string() function such that it automatically pickles objects. Additionally, we set it up such that if the first parameter is named self (i.e., it is a class method), we make it a method of the self object.

def call_value(value):
    value_as_string = repr(value)
    if value_as_string.find('<') >= 0:
        # Complex object
        value_as_string = "pickle.loads(" + repr(pickle.dumps(value)) + ")"
    return value_as_string
def call_string(function_name, argument_list):
    """Return function_name(arg[0], arg[1], ...) as a string, pickling complex objects"""
    if len(argument_list) > 0:
        (first_var, first_value) = argument_list[0]
        if first_var == "self":
            # Make this a method call
            method_name = function_name.split(".")[-1]
            function_name = call_value(first_value) + "." + method_name
            argument_list = argument_list[1:]

    return function_name + "(" + \
        ", ".join([var + "=" + call_value(value)
                   for (var, value) in argument_list]) + ")"

Let us apply the extended call_string() method to create a call for email.parser.parse(), including pickled objects:

call = call_string("email.parser.Parser.parse", email_parse_argument_list[0])
print(call)
email.parser.Parser.parse(headersonly=False, fp=pickle.loads(b'\x80\x04\x95\xc3\x02\x00\x00\x00\x00\x00\x00\x8c\x03_io\x94\x8c\x08StringIO\x94\x93\x94)\x81\x94(X\x9a\x02\x00\x00Connection: keep-alive\r\nContent-Length: 51094\r\nServer: GitHub.com\r\nContent-Type: text/html; charset=utf-8\r\nLast-Modified: Thu, 18 Jan 2024 17:02:01 GMT\r\nAccess-Control-Allow-Origin: *\r\nETag: W/"65a95989-46011"\r\nexpires: Thu, 18 Jan 2024 17:16:06 GMT\r\nCache-Control: max-age=600\r\nContent-Encoding: gzip\r\nx-proxy-cache: MISS\r\nX-GitHub-Request-Id: 8B62:2C0453:3D672C8:3E5495D:65A95A7E\r\nAccept-Ranges: bytes\r\nDate: Thu, 18 Jan 2024 17:21:16 GMT\r\nVia: 1.1 varnish\r\nAge: 80\r\nX-Served-By: cache-fra-eddf8230058-FRA\r\nX-Cache: HIT\r\nX-Cache-Hits: 1\r\nX-Timer: S1705598476.364686,VS0,VE12\r\nVary: Accept-Encoding\r\nX-Fastly-Request-ID: bdf0b0f9552b918b24bfe3625029c32b77e1cea2\r\n\r\n\x94\x8c\x01\n\x94M\x9a\x02Nt\x94b.'), self=pickle.loads(b'\x80\x04\x95w\x00\x00\x00\x00\x00\x00\x00\x8c\x0cemail.parser\x94\x8c\x06Parser\x94\x93\x94)\x81\x94}\x94(\x8c\x06_class\x94\x8c\x0bhttp.client\x94\x8c\x0bHTTPMessage\x94\x93\x94\x8c\x06policy\x94\x8c\x11email._policybase\x94\x8c\x08Compat32\x94\x93\x94)\x81\x94ub.'))

With this call involving the pickled object, we can now re-run the original call and obtain a valid result:

import email
eval(call)
<http.client.HTTPMessage at 0x13f1c5810>

All Calls

So far, we have seen only one call of webbrowser(). How many of the calls within webbrowser() can we actually carve and replay? Let us try this out and compute the numbers.

import traceback
import enum
import socket
all_functions = set(webbrowser_carver.called_functions(qualified=True))
call_success = set()
run_success = set()
exceptions_seen = set()

for function_name in webbrowser_carver.called_functions(qualified=True):
    for argument_list in webbrowser_carver.arguments(function_name):
        try:
            call = call_string(function_name, argument_list)
            call_success.add(function_name)

            result = eval(call)
            run_success.add(function_name)

        except Exception as exc:
            exceptions_seen.add(repr(exc))
            # print("->", call, file=sys.stderr)
            # traceback.print_exc()
            # print("", file=sys.stderr)
            continue
print("%d/%d calls (%.2f%%) successfully created and %d/%d calls (%.2f%%) successfully ran" % (
    len(call_success), len(all_functions), len(
        call_success) * 100 / len(all_functions),
    len(run_success), len(all_functions), len(run_success) * 100 / len(all_functions)))
253/365 calls (69.32%) successfully created and 51/365 calls (13.97%) successfully ran

About a quarter of the calls succeed. Let us take a look into some of the error messages we get:

for i in range(10):
    print(list(exceptions_seen)[i])
SyntaxError('invalid syntax', ('<string>', 1, 18, "urllib3.util.url.<genexpr>(x='u', .0=pickle.loads(b'\\x80\\x04\\x95\\x1c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94\\x8c\\x00\\x94\\x85\\x94R\\x94.'))", 1, 19))
SyntaxError('invalid syntax', ('<string>', 1, 16, "requests.utils.<genexpr>(f='.netrc', .0=pickle.loads(b'\\x80\\x04\\x950\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94\\x8c\\x06.netrc\\x94\\x8c\\x06_netrc\\x94\\x86\\x94\\x85\\x94R\\x94K\\x01b.'))", 1, 17))
TypeError("'object' object is not callable")
PicklingError('__reduce__ must return a string or tuple')
SyntaxError('invalid syntax', ('<string>', 1, 21, "urllib3.poolmanager.<lambda>(p=pickle.loads(b'\\x80\\x04\\x95\\xc7\\x03\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x16urllib3.connectionpool\\x94\\x8c\\x13HTTPSConnectionPool\\x94\\x93\\x94)\\x81\\x94}\\x94(\\x8c\\x04host\\x94\\x8c\\x13www.fuzzingbook.org\\x94\\x8c\\x0b_proxy_host\\x94\\x8c\\x13www.fuzzingbook.org\\x94\\x8c\\x04port\\x94M\\xbb\\x01\\x8c\\x07headers\\x94}\\x94\\x8c\\x06strict\\x94\\x88\\x8c\\x07timeout\\x94\\x8c\\x14urllib3.util.timeout\\x94\\x8c\\x07Timeout\\x94\\x93\\x94)\\x81\\x94}\\x94(\\x8c\\x08_connect\\x94\\x8c\\x08builtins\\x94\\x8c\\x06object\\x94\\x93\\x94)\\x81\\x94\\x8c\\x05_read\\x94h\\x17\\x8c\\x05total\\x94N\\x8c\\x0e_start_connect\\x94Nub\\x8c\\x07retries\\x94\\x8c\\x12urllib3.util.retry\\x94\\x8c\\x05Retry\\x94\\x93\\x94)\\x81\\x94}\\x94(h\\x19K\\x03\\x8c\\x07connect\\x94N\\x8c\\x04read\\x94N\\x8c\\x06status\\x94N\\x8c\\x05other\\x94N\\x8c\\x08redirect\\x94N\\x8c\\x10status_forcelist\\x94\\x8f\\x94\\x8c\\x0fallowed_methods\\x94(\\x8c\\x03GET\\x94\\x8c\\x07OPTIONS\\x94\\x8c\\x06DELETE\\x94\\x8c\\x05TRACE\\x94\\x8c\\x04HEAD\\x94\\x8c\\x03PUT\\x94\\x91\\x94\\x8c\\x0ebackoff_factor\\x94K\\x00\\x8c\\x11raise_on_redirect\\x94\\x88\\x8c\\x0fraise_on_status\\x94\\x88\\x8c\\x07history\\x94)\\x8c\\x1arespect_retry_after_header\\x94\\x88\\x8c\\x1aremove_headers_on_redirect\\x94(\\x8c\\rauthorization\\x94\\x91\\x94ub\\x8c\\x04pool\\x94N\\x8c\\x05block\\x94\\x89\\x8c\\x05proxy\\x94N\\x8c\\rproxy_headers\\x94}\\x94\\x8c\\x0cproxy_config\\x94N\\x8c\\x0fnum_connections\\x94K\\x01\\x8c\\x0cnum_requests\\x94K\\x01\\x8c\\x07conn_kw\\x94}\\x94\\x8c\\x08key_file\\x94N\\x8c\\tcert_file\\x94N\\x8c\\tcert_reqs\\x94\\x8c\\rCERT_REQUIRED\\x94\\x8c\\x0ckey_password\\x94N\\x8c\\x08ca_certs\\x94\\x8cT/Users/zeller/.pyenv/versions/3.10.2/lib/python3.10/site-packages/certifi/cacert.pem\\x94\\x8c\\x0bca_cert_dir\\x94N\\x8c\\x0bssl_version\\x94N\\x8c\\x0fassert_hostname\\x94N\\x8c\\x12assert_fingerprint\\x94Nub.'))", 1, 22))
SyntaxError('invalid syntax', ('<string>', 1, 21, "requests.structures.<genexpr>(mappedvalue='python-requests/2.28.1', casedkey='User-Agent', .0=pickle.loads(b'\\x80\\x04\\x95\\x1b\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94]\\x94\\x85\\x94R\\x94.'))", 1, 22))
SyntaxError('invalid syntax', ('<string>', 1, 21, "requests.structures.<genexpr>(mappedvalue='keep-alive', casedkey='Connection', .0=pickle.loads(b'\\x80\\x04\\x95\\x1b\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94]\\x94\\x85\\x94R\\x94.'))", 1, 22))
TypeError("cannot pickle 'generator' object")
SyntaxError('invalid syntax', ('<string>', 1, 18, "urllib3.util.url.<genexpr>(x='n', .0=pickle.loads(b'\\x80\\x04\\x95\\x1c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94\\x8c\\x00\\x94\\x85\\x94R\\x94.'))", 1, 19))
SyntaxError('invalid syntax', ('<string>', 1, 21, "requests.structures.<genexpr>(.0=pickle.loads(b'\\x80\\x04\\x95n\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x8c\\x08builtins\\x94\\x8c\\x04iter\\x94\\x93\\x94]\\x94(\\x8c\\x0fAccept-Encoding\\x94\\x8c\\rgzip, deflate\\x94\\x86\\x94\\x8c\\x06Accept\\x94\\x8c\\x03*/*\\x94\\x86\\x94\\x8c\\nConnection\\x94\\x8c\\nkeep-alive\\x94\\x86\\x94e\\x85\\x94R\\x94.'))", 1, 22))

We see that:

  • A large majority of calls could be converted into call strings. If this is not the case, this is mostly due to having non-serialized objects being passed.
  • About a quarter of the calls could be executed. The error messages for the failing runs are varied; the most frequent being that some internal name is invoked that is not in scope.

Our carving mechanism should be taken with a grain of salt: We still do not cover the situation where external variables and values (such as global variables) are being accessed, and the serialization mechanism cannot recreate external resources. Still, if the function of interest falls among those that can be carved and replayed, we can very effectively re-run its calls with their original arguments.

Mining API Grammars from Carved Calls

So far, we have used carved calls to replay exactly the same invocations as originally encountered. However, we can also mutate carved calls to effectively fuzz APIs with previously recorded arguments.

The general idea is as follows:

  1. First, we record all calls of a specific function from a given execution of the program.
  2. Second, we create a grammar that incorporates all these calls, with separate rules for each argument and alternatives for each value found; this allows us to produce calls that arbitrarily recombine these arguments.

Let us explore these steps in the following sections.

From Calls to Grammars

Let us start with an example. The power(x, y) function returns $x^y$; it is but a wrapper around the equivalent math.pow() function. (Since power() is defined in Python, we can trace it – in contrast to math.pow(), which is implemented in C.)

import math
def power(x, y):
    return math.pow(x, y)

Let us invoke power() while recording its arguments:

with CallCarver() as power_carver:
    z = power(1, 2)
    z = power(3, 4)
power_carver.arguments("power")
[[('y', 2), ('x', 1)], [('y', 4), ('x', 3)]]

From this list of recorded arguments, we could now create a grammar for the power() call, with x and y expanding into the values seen:

from Grammars import START_SYMBOL, is_valid_grammar, new_symbol
from Grammars import extend_grammar, Grammar
POWER_GRAMMAR: Grammar = {
    "<start>": ["power(<x>, <y>)"],
    "<x>": ["1", "3"],
    "<y>": ["2", "4"]
}

assert is_valid_grammar(POWER_GRAMMAR)

When fuzzing with this grammar, we then get arbitrary combinations of x and y; aiming for coverage will ensure that all values are actually tested at least once:

from GrammarCoverageFuzzer import GrammarCoverageFuzzer
power_fuzzer = GrammarCoverageFuzzer(POWER_GRAMMAR)
[power_fuzzer.fuzz() for i in range(5)]
['power(1, 2)', 'power(3, 4)', 'power(1, 2)', 'power(3, 4)', 'power(3, 4)']

What we need is a method to automatically convert the arguments as seen in power_carver to the grammar as seen in POWER_GRAMMAR. This is what we define in the next section.

A Grammar Miner for Calls

We introduce a class CallGrammarMiner, which, given a Carver, automatically produces a grammar from the calls seen. To initialize, we pass the carver object:

class CallGrammarMiner:
    def __init__(self, carver, log=False):
        self.carver = carver
        self.log = log

Initial Grammar

The initial grammar produces a single call. The possible <call> expansions are to be constructed later:

import copy 
class CallGrammarMiner(CallGrammarMiner):
    CALL_SYMBOL = "<call>"

    def initial_grammar(self):
        return extend_grammar(
            {START_SYMBOL: [self.CALL_SYMBOL],
                self.CALL_SYMBOL: []
             })
m = CallGrammarMiner(power_carver)
initial_grammar = m.initial_grammar()
initial_grammar
{'<start>': ['<call>'], '<call>': []}

A Grammar from Arguments

Let us start by creating a grammar from a list of arguments. The method mine_arguments_grammar() creates a grammar for the arguments seen during carving, such as these:

arguments = power_carver.arguments("power")
arguments
[[('y', 2), ('x', 1)], [('y', 4), ('x', 3)]]

The mine_arguments_grammar() method iterates through the variables seen and creates a mapping variables of variable names to a set of values seen (as strings, going through call_value()). In a second step, it then creates a grammar with a rule for each variable name, expanding into the values seen.

class CallGrammarMiner(CallGrammarMiner):
    def var_symbol(self, function_name, var, grammar):
        return new_symbol(grammar, "<" + function_name + "-" + var + ">")

    def mine_arguments_grammar(self, function_name, arguments, grammar):
        var_grammar = {}

        variables = {}
        for argument_list in arguments:
            for (var, value) in argument_list:
                value_string = call_value(value)
                if self.log:
                    print(var, "=", value_string)

                if value_string.find("<") >= 0:
                    var_grammar["<langle>"] = ["<"]
                    value_string = value_string.replace("<", "<langle>")

                if var not in variables:
                    variables[var] = set()
                variables[var].add(value_string)

        var_symbols = []
        for var in variables:
            var_symbol = self.var_symbol(function_name, var, grammar)
            var_symbols.append(var_symbol)
            var_grammar[var_symbol] = list(variables[var])

        return var_grammar, var_symbols
m = CallGrammarMiner(power_carver)
var_grammar, var_symbols = m.mine_arguments_grammar(
    "power", arguments, initial_grammar)
var_grammar
{'<power-y>': ['4', '2'], '<power-x>': ['3', '1']}

The additional return value var_symbols is a list of argument symbols in the call:

var_symbols
['<power-y>', '<power-x>']

A Grammar from Calls

To get the grammar for a single function (mine_function_grammar()), we add a call to the function:

class CallGrammarMiner(CallGrammarMiner):
    def function_symbol(self, function_name, grammar):
        return new_symbol(grammar, "<" + function_name + ">")

    def mine_function_grammar(self, function_name, grammar):
        arguments = self.carver.arguments(function_name)

        if self.log:
            print(function_name, arguments)

        var_grammar, var_symbols = self.mine_arguments_grammar(
            function_name, arguments, grammar)

        function_grammar = var_grammar
        function_symbol = self.function_symbol(function_name, grammar)

        if len(var_symbols) > 0 and var_symbols[0].find("-self") >= 0:
            # Method call
            function_grammar[function_symbol] = [
                var_symbols[0] + "." + function_name + "(" + ", ".join(var_symbols[1:]) + ")"]
        else:
            function_grammar[function_symbol] = [
                function_name + "(" + ", ".join(var_symbols) + ")"]

        if self.log:
            print(function_symbol, "::=", function_grammar[function_symbol])

        return function_grammar, function_symbol
m = CallGrammarMiner(power_carver)
function_grammar, function_symbol = m.mine_function_grammar(
    "power", initial_grammar)
function_grammar
{'<power-y>': ['4', '2'],
 '<power-x>': ['3', '1'],
 '<power>': ['power(<power-y>, <power-x>)']}

The additionally returned function_symbol holds the name of the function call just added:

function_symbol
'<power>'

A Grammar from all Calls

Let us now repeat the above for all function calls seen during carving. To this end, we simply iterate over all function calls seen:

power_carver.called_functions()
['power', '__exit__']
class CallGrammarMiner(CallGrammarMiner):
    def mine_call_grammar(self, function_list=None, qualified=False):
        grammar = self.initial_grammar()
        fn_list = function_list
        if function_list is None:
            fn_list = self.carver.called_functions(qualified=qualified)

        for function_name in fn_list:
            if function_list is None and (function_name.startswith("_") or function_name.startswith("<")):
                continue  # Internal function

            # Ignore errors with mined functions
            try:
                function_grammar, function_symbol = self.mine_function_grammar(
                    function_name, grammar)
            except:
                if function_list is not None:
                    raise

            if function_symbol not in grammar[self.CALL_SYMBOL]:
                grammar[self.CALL_SYMBOL].append(function_symbol)
            grammar.update(function_grammar)

        assert is_valid_grammar(grammar)
        return grammar

The method mine_call_grammar() is the one that clients can and should use – first for mining...

m = CallGrammarMiner(power_carver)
power_grammar = m.mine_call_grammar()
power_grammar
{'<start>': ['<call>'],
 '<call>': ['<power>'],
 '<power-y>': ['4', '2'],
 '<power-x>': ['3', '1'],
 '<power>': ['power(<power-y>, <power-x>)']}

...and then for fuzzing:

power_fuzzer = GrammarCoverageFuzzer(power_grammar)
[power_fuzzer.fuzz() for i in range(5)]
['power(2, 3)', 'power(4, 1)', 'power(2, 3)', 'power(2, 3)', 'power(4, 3)']

With this, we have successfully extracted a grammar from a recorded execution; in contrast to "simple" carving, our grammar allows us to recombine arguments and thus to fuzz at the API level.

Fuzzing Web Functions

Let us now apply our grammar miner on a larger API – the urlparse() function we already encountered during carving.

with CallCarver() as webbrowser_carver:
    webbrowser("https://www.fuzzingbook.org")

We can mine a grammar from the calls encountered:

m = CallGrammarMiner(webbrowser_carver)
webbrowser_grammar = m.mine_call_grammar()

This is a rather large grammar:

call_list = webbrowser_grammar['<call>']
len(call_list)
141
print(call_list[:20])
['<webbrowser>', '<default_headers>', '<default_user_agent>', '<update>', '<default_hooks>', '<cookiejar_from_dict>', '<RLock>', '<deepvalues>', '<vals_sorted_by_key>', '<init_poolmanager>', '<mount>', '<prepare_request>', '<merge_cookies>', '<get_netrc_auth>', '<encode>', '<expanduser>', '<decode>', '<exists>', '<urlparse>', '<urlsplit>']

Here's the rule for the urlparse() function:

webbrowser_grammar["<urlparse>"]
['urlparse(<urlparse-allow_fragments>, <urlparse-scheme>, <urlparse-url>)']

Here are the arguments.

webbrowser_grammar["<urlparse-url>"]
["'https://www.fuzzingbook.org'", "'https://www.fuzzingbook.org/'"]

If we now apply a fuzzer on these rules, we systematically cover all variations of arguments seen, including, of course, combinations not seen during carving. Again, we are fuzzing at the API level here.

urlparse_fuzzer = GrammarCoverageFuzzer(
    webbrowser_grammar, start_symbol="<urlparse>")
for i in range(5):
    print(urlparse_fuzzer.fuzz())
urlparse(True, '', 'https://www.fuzzingbook.org')
urlparse(True, '', 'https://www.fuzzingbook.org/')
urlparse(True, '', 'https://www.fuzzingbook.org')
urlparse(True, '', 'https://www.fuzzingbook.org')
urlparse(True, '', 'https://www.fuzzingbook.org')

Just as seen with carving, running tests at the API level is orders of magnitude faster than executing system tests. Hence, this calls for means to fuzz at the method level:

from urllib.parse import urlsplit
from Timer import Timer
with Timer() as urlsplit_timer:
    urlsplit('http://www.fuzzingbook.org/', 'http', True)
urlsplit_timer.elapsed_time()
1.0957999620586634e-05
with Timer() as webbrowser_timer:
    webbrowser("http://www.fuzzingbook.org")
webbrowser_timer.elapsed_time()
0.8124162079911912
webbrowser_timer.elapsed_time() / urlsplit_timer.elapsed_time()
74139.09802158752

But then again, the caveats encountered during carving apply, notably the requirement to recreate the original function environment. If we also alter or recombine arguments, we get the additional risk of violating an implicit precondition – that is, invoking a function with arguments the function was never designed for. Such false alarms, resulting from incorrect invocations rather than incorrect implementations, must then be identified (typically manually) and wed out (for instance, by altering or constraining the grammar). The huge speed gains at the API level, however, may well justify this additional investment.

Lessons Learned

  • Carving allows for effective replay of function calls recorded during a system test.
  • A function call can be orders of magnitude faster than a system invocation.
  • Serialization allows creating persistent representations of complex objects.
  • Functions that heavily interact with their environment and/or access external resources are difficult to carve.
  • From carved calls, one can produce API grammars that arbitrarily combine carved arguments.

Next Steps

In the next chapter, we will discuss how to reduce failure-inducing inputs.

Background

Carving was invented by Elbaum et al. [Elbaum et al, 2006] and originally implemented for Java. In this chapter, we follow several of their design choices (including recording and serializing method arguments only).

The combination of carving and fuzzing at the API level is described in [Kampmann et al, 2018].

Exercises

Exercise 1: Carving for Regression Testing

So far, during carving, we only have looked into reproducing calls, but not into actually checking the results of these calls. This is important for regression testing – i.e. checking whether a change to code does not impede existing functionality. We can build this by recording not only calls, but also return values – and then later compare whether the same calls result in the same values. This may not work on all occasions; values that depend on time, randomness, or other external factors may be different. Still, for functionality that abstracts from these details, checking that nothing has changed is an important part of testing.

Our aim is to design a class ResultCarver that extends CallCarver by recording both calls and return values.

In a first step, create a traceit() method that also tracks return values by extending the traceit() method. The traceit() event type is "return" and the arg parameter is the returned value. Here is a prototype that only prints out the returned values:

class ResultCarver(CallCarver):
    def traceit(self, frame, event, arg):
        if event == "return":
            if self._log:
                print("Result:", arg)

        super().traceit(frame, event, arg)
        # Need to return traceit function such that it is invoked for return
        # events
        return self.traceit
with ResultCarver(log=True) as result_carver:
    my_sqrt(2)
my_sqrt(x=2)
Result: 1.414213562373095
__exit__(tb=None, exc_value=None, exc_type=None, self=<__main__.ResultCarver object at 0x13f1c6aa0>)

Part 1: Store function results

Extend the above code such that results are stored in a way that associates them with the currently returning function (or method). To this end, you need to keep track of the current stack of called functions.

Part 2: Access results

Give it a method result() that returns the value recorded for that particular function name and result:

class ResultCarver(CallCarver):
    def result(self, function_name, argument):
        """Returns the result recorded for function_name(argument"""

Part 3: Produce assertions

For the functions called during webbrowser() execution, create a set of assertions that check whether the result returned is still the same. Test this for urllib.parse.urlparse().

Exercise 2: Abstracting Arguments

When mining an API grammar from executions, set up an abstraction scheme to widen the range of arguments to be used during testing. If the values for an argument, all conform to some type T. abstract it into <T>. For instance, if calls to foo(1), foo(2), foo(3) have been seen, the grammar should abstract its calls into foo(<int>), with <int> being appropriately defined.

Do this for a number of common types: integers, positive numbers, floating-point numbers, host names, URLs, mail addresses, and more.

Creative Commons License The content of this project is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. The source code that is part of the content, as well as the source code used to format and display that content is licensed under the MIT License. Last change: 2023-11-11 18:18:05+01:00CiteImprint