Package IDAscope :: Package idascope :: Package core :: Module CryptoIdentifier
[hide private]
[frames] | no frames]

Source Code for Module IDAscope.idascope.core.CryptoIdentifier

  1  #!/usr/bin/python 
  2  ######################################################################## 
  3  # Copyright (c) 2012 
  4  # Daniel Plohmann <daniel.plohmann<at>gmail<dot>com> 
  5  # Alexander Hanel <alexander.hanel<at>gmail<dot>com> 
  6  # All rights reserved. 
  7  ######################################################################## 
  8  # 
  9  #  This file is part of IDAscope 
 10  # 
 11  #  IDAscope is free software: you can redistribute it and/or modify it 
 12  #  under the terms of the GNU General Public License as published by 
 13  #  the Free Software Foundation, either version 3 of the License, or 
 14  #  (at your option) any later version. 
 15  # 
 16  #  This program is distributed in the hope that it will be useful, but 
 17  #  WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 19  #  General Public License for more details. 
 20  # 
 21  #  You should have received a copy of the GNU General Public License 
 22  #  along with this program.  If not, see 
 23  #  <http://www.gnu.org/licenses/>. 
 24  # 
 25  ######################################################################## 
 26   
 27   
 28  import time 
 29  import re 
 30   
 31  from IdaProxy import IdaProxy 
 32  from PatternManager import PatternManager 
 33   
 34  from idascope.core.structures.Segment import Segment 
 35  from idascope.core.structures.AritlogBasicBlock import AritlogBasicBlock 
 36  from idascope.core.structures.CryptoSignatureHit import CryptoSignatureHit 
 37   
 38   
39 -class CryptoIdentifier():
40 """ 41 This class contains the logic to perform Crypto identification. 42 Two techniques are currently supported: 43 1. A heuristic approach that identifies functions and basic blocks 44 based on the ratio of arithmetic/logic instructions to all instructions 45 2. A signature-based approach, using the signatures defined in PatternManager 46 """ 47
48 - def __init__(self):
49 self.name = "CryptoIdentifier" 50 print ("loading CryptoIdentifier") 51 self.time = time 52 self.re = re 53 self.CryptoSignatureHit = CryptoSignatureHit 54 self.AritlogBasicBlock = AritlogBasicBlock 55 self.Segment = Segment 56 self.pm = PatternManager() 57 self.low_rating_threshold = 0.4 58 self.high_rating_threshold = 1.0 59 self.low_instruction_threshold = 8 60 self.high_instruction_threshold = 100 61 # if the threshold is set to this value, it is automatically expanded to infinite. 62 self.max_instruction_threshold = 100 63 self.low_call_threshold = 0 64 self.high_call_threshold = 1 65 # if the threshold is set to this value, it is automatically expanded to infinite. 66 self.max_call_threshold = 10 67 # if at least this fraction of a signature's length' has been identified 68 # consecutively, the location is marked as a signature hit. 69 self.match_filter_factor = 0.5 70 self.aritlog_blocks = [] 71 self.signature_hits = [] 72 self.ida_proxy = IdaProxy() 73 return
74
75 - def scan(self):
76 """ 77 Scan the whole IDB with all available techniques. 78 """ 79 self.scan_aritlog() 80 self.scan_crypto_patterns()
81 82 ################################################################################ 83 # Aritlog scanning 84 ################################################################################ 85
86 - def scan_aritlog(self):
87 """ 88 scan with the arithmetic/logic heuristic 89 @return: a list of AritLogBasicBlock data objects that fulfill the parameters as specified 90 """ 91 print ("Starting aritlog heuristic analysis.") 92 self.aritlog_blocks = [] 93 time_before = self.time.time() 94 for function_ea in self.ida_proxy.Functions(): 95 function_chart = self.ida_proxy.FlowChart(self.ida_proxy.get_func(function_ea)) 96 calls_in_function = 0 97 function_blocks = [] 98 for current_block in function_chart: 99 block = self.AritlogBasicBlock(current_block.startEA, current_block.endEA) 100 for instruction in self.ida_proxy.Heads(block.start_ea, block.end_ea): 101 if self.ida_proxy.isCode(self.ida_proxy.GetFlags(instruction)): 102 mnemonic = self.ida_proxy.GetMnem(instruction) 103 has_identical_operands = self.ida_proxy.GetOperandValue(instruction, 0) == \ 104 self.ida_proxy.GetOperandValue(instruction, 1) 105 block.update_instruction_count(mnemonic, has_identical_operands) 106 if mnemonic == "call": 107 calls_in_function += 1 108 function_blocks.append(block) 109 for block in function_blocks: 110 block.num_calls_in_function = calls_in_function 111 self.aritlog_blocks.extend(function_blocks) 112 print ("Analysis took %3.2f seconds" % (self.time.time() - time_before)) 113 114 return self.get_aritlog_blocks(self.low_rating_threshold, self.high_rating_threshold, 115 self.low_instruction_threshold, self.high_instruction_threshold, 116 self.low_call_threshold, self.high_call_threshold, 117 False)
118
119 - def update_thresholds(self, min_rating, max_rating, min_instr, max_instr, min_call, max_call):
120 """ 121 update all six threshold bounds 122 @param min_rating: the minimum arit/log ratio a basic block must have 123 @type min_rating: float 124 @param max_rating: the maximum arit/log ratio a basic block can have 125 @type max_rating: float 126 @param min_instr: the minimum number of instructions a basic block must have 127 @type min_instr: int 128 @param max_instr: the minimum number of instructions a basic block can have 129 @type max_instr: int 130 @param min_call: the minimum number of calls a basic block must have 131 @type min_call: int 132 @param max_call: the minimum number of calls a basic block can have 133 @type max_call: int 134 """ 135 self.low_rating_threshold = max(0.0, min_rating) 136 self.high_rating_threshold = min(1.0, max_rating) 137 self.low_instruction_threshold = max(0, min_instr) 138 if max_instr >= self.max_instruction_threshold: 139 # we cap the value here and safely assume there is no block with more than 1000000 instructions 140 self.high_instruction_threshold = 1000000 141 else: 142 self.high_instruction_threshold = max_instr 143 self.low_call_threshold = max(0, min_call) 144 if max_call >= self.max_call_threshold: 145 # we cap the value here and safely assume there is no block with more than 1000000 instructions 146 self.high_call_threshold = 1000000 147 else: 148 self.high_call_threshold = max_call
149
150 - def get_aritlog_blocks(self, min_rating, max_rating, min_instr, max_instr, min_api, max_api, is_nonzero):
151 """ 152 get all blocks that are within the limits specified by the heuristic parameters. 153 parameters are the same as in function "update_thresholds" except 154 param is_nonzero: defines whether zeroing instructions (like xor eax, eax) shall be counted or not. 155 type is_nonzero: boolean 156 @return: a list of AritlogBasicBlock data objects, according to the parameters 157 """ 158 self.update_thresholds(min_rating, max_rating, min_instr, max_instr, min_api, max_api) 159 return [block for block in self.aritlog_blocks if 160 (self.high_rating_threshold >= block.get_aritlog_rating(is_nonzero) >= self.low_rating_threshold) and 161 (self.high_instruction_threshold >= block.num_instructions >= self.low_instruction_threshold) and 162 (self.high_call_threshold >= block.num_calls_in_function >= self.low_call_threshold)]
163
165 """ 166 returns the number of basic blocks that have been analyzed. 167 @return: (int) number of basic blocks 168 """ 169 return len(self.aritlog_blocks)
170 171 ################################################################################ 172 # Signature scanning 173 ################################################################################ 174
175 - def get_segment_data(self):
176 """ 177 returns the raw bytes of the segments as stored by IDA 178 @return: a list of Segment data objects. 179 """ 180 segments = [] 181 for segment_ea in self.ida_proxy.Segments(): 182 try: 183 segment = self.Segment() 184 segment.start_ea = segment_ea 185 segment.end_ea = self.ida_proxy.SegEnd(segment_ea) 186 segment.name = self.ida_proxy.SegName(segment_ea) 187 buf = "" 188 for ea in xrange(segment_ea, self.ida_proxy.SegEnd(segment_ea)): 189 buf += chr(self.ida_proxy.get_byte(ea)) 190 segment.data = buf 191 segments.append(segment) 192 except: 193 print ("Tried to access invalid segment data. An error has occurred while address conversion") 194 return segments
195
196 - def scan_crypto_patterns(self, pattern_size=32):
197 """ 198 perform a scan ofr signatures. For matching, the standard python re module is used. 199 @return: A list of CryptoSignatureHit data objects 200 """ 201 crypt_results = [] 202 print ("Starting aritlog function enumeration.") 203 time_before_matching = self.time.time() 204 segments = self.get_segment_data() 205 print ("Segments under analysis: ") 206 for segment in segments: 207 print (segment) 208 print ("PatternManager initialized, number of signatures: %d" % len(self.pm.signatures)) 209 keywords = self.pm.get_tokenized_signatures(pattern_size) 210 print ("PatternManager tokenized patterns into %d chunks of %d bytes" % (len(keywords.keys()), pattern_size)) 211 for keyword in keywords.keys(): 212 for segment in segments: 213 crypt_results.extend([self.CryptoSignatureHit(segment.start_ea + match.start(), \ 214 keywords[keyword], keyword) for match in self.re.finditer(self.re.escape(keyword), segment.data)]) 215 print ("Full matching took %3.2f seconds and resulted in %d hits" % (self.time.time() - time_before_matching, \ 216 len(crypt_results))) 217 self.signature_hits = crypt_results 218 return crypt_results
219
220 - def get_signature_length(self, signature_name):
221 """ 222 returns the length for a signature, identified by its name 223 @param signature_name: name for a signature, e.g. "ADLER 32" 224 @type signature_name: str 225 @return: (int) length of the signature. 226 """ 227 for item in self.pm.signatures.items(): 228 if item[1] == signature_name: 229 return len(item[0]) 230 return 0
231
232 - def get_xrefs_to_address(self, address):
233 """ 234 get all references to a certain address. 235 These are no xrefs in IDA sense but references to the crypto signatures. 236 If the signature points to an instruction, e.g. if a constant is moved to a register, the return is flagged as 237 "True", meaning it is an in-code reference. 238 @param address: an arbitrary address 239 @type address: int 240 @return: a list of tuples (int, boolean) 241 """ 242 xrefs = [] 243 head_to_address = self.ida_proxy.PrevHead(address, address - 14) 244 if head_to_address != 0xFFFFFFFF: 245 flags = self.ida_proxy.GetFlags(head_to_address) 246 if self.ida_proxy.isCode(flags): 247 xrefs.append((head_to_address, True)) 248 for x in self.ida_proxy.XrefsTo(address): 249 flags = self.ida_proxy.GetFlags(x.frm) 250 if self.ida_proxy.isCode(flags): 251 xrefs.append((x.frm, False)) 252 return xrefs
253
254 - def get_signature_hits(self):
255 """ 256 Get all signature hits that have a length of at least match_filter_factor percent 257 of the signature they triggered. 258 Hits are grouped by signature names. 259 @return: a dictionary with key/value entries of the following form: ("signature name", [CryptoSignatureHit]) 260 """ 261 sorted_hits = sorted(self.signature_hits) 262 unified_hits = [] 263 264 previous_signature_names = [] 265 for hit in sorted_hits: 266 hit_intersection = [element for element in hit.signature_names if element in previous_signature_names] 267 if len(hit_intersection) == 0: 268 previous_signature_names = hit.signature_names 269 unified_hits.append(self.CryptoSignatureHit(hit.start_address, hit.signature_names, \ 270 hit.matched_signature)) 271 else: 272 previous_signature_names = hit_intersection 273 previous_hit = unified_hits[-1] 274 if hit.start_address == previous_hit.start_address + len(previous_hit.matched_signature): 275 previous_hit.matched_signature += hit.matched_signature 276 previous_hit.signature_names = hit_intersection 277 else: 278 unified_hits.append(self.CryptoSignatureHit(hit.start_address, hit.signature_names, \ 279 hit.matched_signature)) 280 281 filtered_hits = [] 282 for hit in unified_hits: 283 if len(hit.matched_signature) >= max([self.match_filter_factor * \ 284 self.get_signature_length(name) for name in hit.signature_names]): 285 hit.code_refs_to = self.get_xrefs_to_address(hit.start_address) 286 filtered_hits.append(hit) 287 288 grouped_hits = {} 289 for hit in filtered_hits: 290 for name in hit.signature_names: 291 if name not in grouped_hits: 292 grouped_hits[name] = [hit] 293 else: 294 grouped_hits[name].append(hit) 295 296 return grouped_hits
297