源码解析angr的模拟执行

阅读量165119

|

发布时间 : 2021-09-01 14:30:09

 

前言

angr是很有名的二进制符号执行工具,网上有许多关于angr的源码解析的文章。但是好像还没有关于angr模拟执行模块的解析。而模拟执行部分也是angr中相当重要的一个部分。因此,本文将解析angr模拟执行部分的源码,来帮助大家了解angr模拟执行的基本原理。

 

概述

当我们用angr去符号执行的时候,最基本的几个操作如下面代码所示:导入代码(第1行)、导入二进制(第2行)、确定初始状态(第3行)、构建simulation_manager对象(第4行)、模拟执行(第5行)。而到底angr是怎么符号执行的呢?因此就需要深入simulation_manager的源码(sim_manager.py)去一探究竟了。

import angr
p = angr.Project("xxxx")
entry_state = p.factory.entry_state()
simgr = p.factory.simgr(entry_state)#simgr是simulation_manager的别名
simgr.explore(find=xxxx)

simulation_manager这个类位于angr/sim_manager.py文件里。

simulation_manager是angr中模拟执行管理器。主要的操作对象是程序的状态对象(sim_state)。状态都被放在stash里,可以往前执行、过滤、合并或者移到别的stash里。stash里可以理解为是放状态的一个列表,stash有这么几种,分别表示状态的状态:

(1) active:保存接下来要执行的状态
(2) deadended:由于某些原因不能再继续执行下去,比如没有合法的指令、下个节点的状态不可解,或者有一个非法的指令指针。
(3) pruned:当使用lazy_sovles的策略时,只有在必要的时候才去检查状态是否可解。当发现一个不可求解的节点后,将其后面的节点都优化掉,放在pruned里。
(4) unconstrained:比如PC被用户数据或者其他类型的符号变量所控制,导致不知道执行哪个指令。
(5) unsat:不可求解的状态。比如,输入同时为AAAA和BBBB。

接下来看看源码,源码中提示我们看simulation_manager的三个重要方法:step、explore、use_technique。

use_technique

angr里有自带很多启发式的路径探索方法。这个函数就是让simulation_manager能够调用外部写好的启发式路径搜索方法。官方给出的几个样例里,除了经典的深度优先搜索、也有检测内存使用情况、CMU论文里的Veritest(合并循环的状态)等等策略。

代码首先先判断tech是否属于ExplorationTechnique这个类。然后setup方法开始初始化。然后把tech防到techniques列表中去,这也意味着可以使用多种策略。这里的hookset暂时没有看懂。

 def use_technique(self, tech):
        """
        Use an exploration technique with this SimulationManager.
        Techniques can be found in :mod:`angr.exploration_techniques`.
        :param tech:    An ExplorationTechnique object that contains code to modify
                        this SimulationManager's behavior.
        :type tech:     ExplorationTechnique
        :return:        The technique that was added, for convenience
        """
        if not isinstance(tech, ExplorationTechnique):
            raise SimulationManagerError

        # XXX: as promised
        tech.project = self._project
        tech.setup(self)

        HookSet.install_hooks(self, **tech._get_hooks())
        self._techniques.append(tech)
        return tech

explore

先来看看看explore函数的参数,有stash,n,find,avoid等参数。explore函数的功能是从某个类型的stash,比如active,开始寻找满足find条件的,需要避免avoid条件的状态,直到找了n次,或者找到了num_find个状态。然后找到的状态都会塞到find_stash里,筛选的状态都会放在avoid_stash里。

其中find和avoid参数可以是一个地址,或者一堆地址的集合或者列表,甚至可以是一个函数,以状态为输入,输出True 或者False,来表示该状态是否是要寻找的状态。如果angr的CFG作为cfg的参数并且find是一个地址或者一个列表或者集合,那么到达不了目标状态的状态就会先把提前筛选掉。

def explore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None,
                num_find=1, **kwargs):
        """
        Tick stash "stash" forward (up to "n" times or until "num_find" states are found), looking for condition "find",
        avoiding condition "avoid". Stores found states into "find_stash' and avoided states into "avoid_stash".
        The "find" and "avoid" parameters may be any of:
        - An address to find
        - A set or list of addresses to find
        - A function that takes a state and returns whether or not it matches.
        If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then
        any states which cannot possibly reach a success state without going through a failure state will be
        preemptively avoided.
        """
        num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0
        tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))

        # Modify first Veritesting so that they can work together.
        deviation_filter_saved = None
        for t in self._techniques:
            if isinstance(t,Veritesting):
                deviation_filter_saved = t.options.get("deviation_filter",None)
                if deviation_filter_saved is not None:
                    t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) or deviation_filter_saved(s)
                else:
                    t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s)
                break

        try:
            self.run(stash=stash, n=n, **kwargs)
        finally:
            self.remove_technique(tech)

        for t in self._techniques:
            if isinstance(t,Veritesting):
                if deviation_filter_saved is None:
                    del t.options["deviation_filter"]
                else:
                    t.options["deviation_filter"] = deviation_filter_saved
                break

        return self

宏观来看explore函数分为三部分:初始化,兼容veritest策略,探索(run)。兼容veritest策略的代码占了很多,对于理解veritest策略与其他策略的关系很有帮助,但是对我们理解符号执行帮助较小,这里就不赘述了。

首先,更新num_find的参数为设定的num_find参数加上找到的状态。接着,用传入的参数find,avoid等生成Explorer对象,然后再用use_technique方法生成一个tech对象。这里为什么要生成Explore对象,然后再用use_technique方法?

Explorer对象继承了ExplorationTechnique类,所以他也是一种探索策略,并且是一种最基础的策略。

而符号执行过程中,可以使用多种策略,那么如何综合这些策略呢?angr是把他们都放在了simulationmanager里的.techniques列表里,而use_technique方法的作用正是把策略对象放进这个techniques列表里。

 num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0
 tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))

初始化后,接下来就是去探索状态部分。简单的一个try,finally语句。不论run的结果如何,最后都把基础探索策略移出_techniques列表里。

 try:
     self.run(stash=stash, n=n, **kwargs)
 finally:
     self.remove_technique(tech)

run函数的代码如下,思路很简单,根据当前的探索策略,一直探索,直到到达一个完整的状态。如果策略里没定义完整的策略,那就把stash里的状态都跑完。run里涉及到了后面会讲的step函数,这里可以先简单理解为单步符号执行。

def run(self, stash='active', n=None, until=None, **kwargs):
        """
        Run until the SimulationManager has reached a completed state, according to
        the current exploration techniques. If no exploration techniques that define a completion
        state are being used, run until there is nothing left to run.
        :param stash:       Operate on this stash
        :param n:           Step at most this many times
        :param until:       If provided, should be a function that takes a SimulationManager and
                            returns True or False. Stepping will terminate when it is True.
        :return:            The simulation manager, for chaining.
        :rtype:             SimulationManager
        """
        for _ in (itertools.count() if n is None else range(0, n)):
            if not self.complete() and self._stashes[stash]:
                self.step(stash=stash, **kwargs)
                if not (until and until(self)):
                    continue
            break
        return self

step

最后就是这个比较复杂的step函数了,可以看作是符号执行的基本单元了。相比explore函数的参数多了selector_func,step_func,successor_func,filter_func,until。这些参数的意思代码注释写得比较清楚了,就简单翻译一下。这些参数都是一个以状态为输入,返回各种东西(比如bool值,后继节点等)的一个函数,类似下面的代码。

def fun(state):
    if state.addr == xxxx:
        return True
    else:
        return False
  1. selector_func:如果为True,将会继续步进,反之会被保留。
  2. successor_func:返回的是后继节点,后面将会使用这些后继节点去符号执行。反之,则是使用project.factory.successors的后继节点。
  3. filter_func:返回的是stash的名字。filter_func的主要作用是给状态分类,分到各个stash里去。
  4. step_func:与前面参数不同,输入是为simulation_manger对象,并返回simulation_manager对象。这个函数会在simulation_manager对象每次step的时候被调用。
 def step(self, stash='active', n=None, selector_func=None, step_func=None,
             successor_func=None, until=None, filter_func=None, **run_args):
        """
        Step a stash of states forward and categorize the successors appropriately.
        The parameters to this function allow you to control everything about the stepping and
        categorization process.
        :param stash:           The name of the stash to step (default: 'active')
        :param selector_func:   If provided, should be a function that takes a state and returns a
                                boolean. If True, the state will be stepped. Otherwise, it will be
                                kept as-is.
        :param step_func:       If provided, should be a function that takes a SimulationManager and
                                returns a SimulationManager. Will be called with the SimulationManager
                                at every step. Note that this function should not actually perform any
                                stepping - it is meant to be a maintenance function called after each step.
        :param successor_func:  If provided, should be a function that takes a state and return its successors.
                                Otherwise, project.factory.successors will be used.
        :param filter_func:     If provided, should be a function that takes a state and return the name
                                of the stash, to which the state should be moved.
        :param until:           (DEPRECATED) If provided, should be a function that takes a SimulationManager and
                                returns True or False. Stepping will terminate when it is True.
        :param n:               (DEPRECATED) The number of times to step (default: 1 if "until" is not provided)
        Additionally, you can pass in any of the following keyword args for project.factory.successors:
        :param jumpkind:        The jumpkind of the previous exit
        :param addr:            An address to execute at instead of the state's ip.
        :param stmt_whitelist:  A list of stmt indexes to which to confine execution.
        :param last_stmt:       A statement index at which to stop execution.
        :param thumb:           Whether the block should be lifted in ARM's THUMB mode.
        :param backup_state:    A state to read bytes from instead of using project memory.
        :param opt_level:       The VEX optimization level to use.
        :param insn_bytes:      A string of bytes to use for the block instead of the project.
        :param size:            The maximum size of the block, in bytes.
        :param num_inst:        The maximum number of instructions.
        :param traceflags:      traceflags to be passed to VEX. Default: 0
        :returns:           The simulation manager, for chaining.
        :rtype:             SimulationManager
        """
        l.info("Stepping %s of %s", stash, self)
        # 8<----------------- Compatibility layer -----------------
        if n is not None or until is not None:
            if once('simgr_step_n_until'):
                print("\x1b[31;1mDeprecation warning: the use of `n` and `until` arguments is deprecated. "
                      "Consider using simgr.run() with the same arguments if you want to specify "
                      "a number of steps or an additional condition on when to stop the execution.\x1b[0m")
            return self.run(stash, n, until, selector_func=selector_func, step_func=step_func,
                            successor_func=successor_func, filter_func=filter_func, **run_args)
        # ------------------ Compatibility layer ---------------->8
        bucket = defaultdict(list)

        for state in self._fetch_states(stash=stash):

            goto = self.filter(state, filter_func=filter_func)
            if isinstance(goto, tuple):
                goto, state = goto

            if goto not in (None, stash):
                bucket[goto].append(state)
                continue

            if not self.selector(state, selector_func=selector_func):
                bucket[stash].append(state)
                continue

            pre_errored = len(self._errored)

            successors = self.step_state(state, successor_func=successor_func, **run_args)
            # handle degenerate stepping cases here. desired behavior:
            # if a step produced only unsat states, always add them to the unsat stash since this usually indicates a bug
            # if a step produced sat states and save_unsat is False, drop the unsats
            # if a step produced no successors, period, add the original state to deadended

            # first check if anything happened besides unsat. that gates all this behavior
            if not any(v for k, v in successors.items() if k != 'unsat') and len(self._errored) == pre_errored:
                # then check if there were some unsats
                if successors.get('unsat', []):
                    # only unsats. current setup is acceptable.
                    pass
                else:
                    # no unsats. we've deadended.
                    bucket['deadended'].append(state)
                    continue
            else:
                # there were sat states. it's okay to drop the unsat ones if the user said so.
                if not self._save_unsat:
                    successors.pop('unsat', None)

            for to_stash, successor_states in successors.items():
                bucket[to_stash or stash].extend(successor_states)

        self._clear_states(stash=stash)
        for to_stash, states in bucket.items():
            self._store_states(to_stash or stash, states)

        if step_func is not None:
            return step_func(self)
        return self

首先,从stash里取出一个状态,调用filter函数看下该状态最后要去哪个stash里,如果不是当前的stash,则把该状态塞到应该放的stash的地方,然后取下一个状态。调用selector函数,选择要保留的状态。

bucket = defaultdict(list)
# 依次从stash里取出状态
for state in self._fetch_states(stash=stash): 
    goto = self.filter(state, filter_func=filter_func) # 返回的是个元组,(状态该去的stash,状态)
    if isinstance(goto, tuple):
        goto, state = goto
    #如果要去的stash不是当前的stash,也不是None,
    if goto not in (None, stash):
        # 那么把他放进该去的stash里,就不管他了。也就筛选掉了。
        bucket[goto].append(state) # 
        continue
    # 如果selector函数返回False,则需要保留该状态到当前的stash
    if not self.selector(state, selector_func=selector_func):
        # 保留状态
        bucket[stash].append(state)
        continue

如果没有触发selector或者filter,就去找后继节点。这里调用了step_state函数。

for state in self._fetch_states(stash=stash):
    ...
    successors = self.step_state(state, successor_func=successor_func, **run_args)

step_state函数如下所示,这个函数主要是处理后继节点的状态。将不可解的状态,无约束的状态都放在相应的stash里。

def step_state(self, state, successor_func=None, **run_args):
    """
        Don't use this function manually - it is meant to interface with exploration techniques.
        """
    try:
        successors = self.successors(state, successor_func=successor_func, **run_args)
        stashes = {None: successors.flat_successors,
                   'unsat': successors.unsat_successors,
                   'unconstrained': successors.unconstrained_successors}    
    except:
    ...
    return stashes

由于step_state函数可能会发生很多错误,因此后续的代码是去做后继节点错误状态的处理。

for state in self._fetch_states(stash=stash): 
    ...
    #如果有后继节点有任何一个unsat状态或者发生了新的错误
    if not any(v for k, v in successors.items() if k != 'unsat') and len(self._errored) == pre_errored:
        #对于unsat状态,就先不管他
        if successors.get('unsat', []):
            # only unsats. current setup is acceptable.
            pass
        else:
            #如果不是unsat,那说明遇到某些原因终止了,把该状态加到deadended的stash里去。
            bucket['deadended'].append(state)
            continue
    else:
    # 如果没有设置保留unsat状态,就把后继节点的unsat状态丢出去。
        if not self._save_unsat:
            successors.pop('unsat', None)

接下来就把后继节点加到bucket的to_stash或者stash里去。自此,这个for循环就结束了。

for state in self._fetch_states(stash=stash): 
...
    for to_stash, successor_states in successors.items():
        bucket[to_stash or stash].extend(successor_states)

剩下就是一些收尾工作,清空当前stash里的状态,然后再把bucket的内容存到simulation_manager对象的stash里去。

self._clear_states(stash=stash)
for to_stash, states in bucket.items():
    self._store_states(to_stash or stash, states)

如果有设置step_func,就去调用step_func。由此也能看到step_func是在step函数最后调用的。

if step_func is not None:
    return step_func(self)

 

总结

angr模拟执行部分的主要代码就解析到这里了,希望大家能够对angr的模拟执行有更深的理解。在理解了angr这部分的内容之后,应该能够比较容易地扩展angr的探索策略。

本文只涉及了sim_manager.py中的几个重要地方,如果要熟练使用simulation_manager的各种功能的话,比如split,merge等等,还需要再看看源码。

参考资料https://github.com/angr/angr/tree/master/angr

本文由iskindar原创发布

转载,请参考转载声明,注明出处: https://www.anquanke.com/post/id/251983

安全客 - 有思想的安全新媒体

分享到:微信
+12赞
收藏
iskindar
分享到:微信

发表评论

iskindar

一个菜鸡,联系:iskindar1997@126.com

  • 文章
  • 9
  • 粉丝
  • 32

热门推荐

内容需知
  • 投稿须知
  • 转载须知
  • 官网QQ群8:819797106
  • 官网QQ群3:830462644(已满)
  • 官网QQ群2:814450983(已满)
  • 官网QQ群1:702511263(已满)
合作单位
  • 安全客
  • 安全客
Copyright © 北京奇虎科技有限公司 360网络攻防实验室 安全客 All Rights Reserved 京ICP备08010314号-66