1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import time
29 import re
30
31 from IdaProxy import IdaProxy
32 from PatternManager import PatternManager
33
34 from idascope.core.structures.Segment import Segment
35 from idascope.core.structures.AritlogBasicBlock import AritlogBasicBlock
36 from idascope.core.structures.CryptoSignatureHit import CryptoSignatureHit
37
38
40 """
41 This class contains the logic to perform Crypto identification.
42 Two techniques are currently supported:
43 1. A heuristic approach that identifies functions and basic blocks
44 based on the ratio of arithmetic/logic instructions to all instructions
45 2. A signature-based approach, using the signatures defined in PatternManager
46 """
47
49 self.name = "CryptoIdentifier"
50 print ("loading CryptoIdentifier")
51 self.time = time
52 self.re = re
53 self.CryptoSignatureHit = CryptoSignatureHit
54 self.AritlogBasicBlock = AritlogBasicBlock
55 self.Segment = Segment
56 self.pm = PatternManager()
57 self.low_rating_threshold = 0.4
58 self.high_rating_threshold = 1.0
59 self.low_instruction_threshold = 8
60 self.high_instruction_threshold = 100
61
62 self.max_instruction_threshold = 100
63 self.low_call_threshold = 0
64 self.high_call_threshold = 1
65
66 self.max_call_threshold = 10
67
68
69 self.match_filter_factor = 0.5
70 self.aritlog_blocks = []
71 self.signature_hits = []
72 self.ida_proxy = IdaProxy()
73 return
74
81
82
83
84
85
87 """
88 scan with the arithmetic/logic heuristic
89 @return: a list of AritLogBasicBlock data objects that fulfill the parameters as specified
90 """
91 print ("Starting aritlog heuristic analysis.")
92 self.aritlog_blocks = []
93 time_before = self.time.time()
94 for function_ea in self.ida_proxy.Functions():
95 function_chart = self.ida_proxy.FlowChart(self.ida_proxy.get_func(function_ea))
96 calls_in_function = 0
97 function_blocks = []
98 for current_block in function_chart:
99 block = self.AritlogBasicBlock(current_block.startEA, current_block.endEA)
100 for instruction in self.ida_proxy.Heads(block.start_ea, block.end_ea):
101 if self.ida_proxy.isCode(self.ida_proxy.GetFlags(instruction)):
102 mnemonic = self.ida_proxy.GetMnem(instruction)
103 has_identical_operands = self.ida_proxy.GetOperandValue(instruction, 0) == \
104 self.ida_proxy.GetOperandValue(instruction, 1)
105 block.update_instruction_count(mnemonic, has_identical_operands)
106 if mnemonic == "call":
107 calls_in_function += 1
108 function_blocks.append(block)
109 for block in function_blocks:
110 block.num_calls_in_function = calls_in_function
111 self.aritlog_blocks.extend(function_blocks)
112 print ("Analysis took %3.2f seconds" % (self.time.time() - time_before))
113
114 return self.get_aritlog_blocks(self.low_rating_threshold, self.high_rating_threshold,
115 self.low_instruction_threshold, self.high_instruction_threshold,
116 self.low_call_threshold, self.high_call_threshold,
117 False)
118
119 - def update_thresholds(self, min_rating, max_rating, min_instr, max_instr, min_call, max_call):
120 """
121 update all six threshold bounds
122 @param min_rating: the minimum arit/log ratio a basic block must have
123 @type min_rating: float
124 @param max_rating: the maximum arit/log ratio a basic block can have
125 @type max_rating: float
126 @param min_instr: the minimum number of instructions a basic block must have
127 @type min_instr: int
128 @param max_instr: the minimum number of instructions a basic block can have
129 @type max_instr: int
130 @param min_call: the minimum number of calls a basic block must have
131 @type min_call: int
132 @param max_call: the minimum number of calls a basic block can have
133 @type max_call: int
134 """
135 self.low_rating_threshold = max(0.0, min_rating)
136 self.high_rating_threshold = min(1.0, max_rating)
137 self.low_instruction_threshold = max(0, min_instr)
138 if max_instr >= self.max_instruction_threshold:
139
140 self.high_instruction_threshold = 1000000
141 else:
142 self.high_instruction_threshold = max_instr
143 self.low_call_threshold = max(0, min_call)
144 if max_call >= self.max_call_threshold:
145
146 self.high_call_threshold = 1000000
147 else:
148 self.high_call_threshold = max_call
149
150 - def get_aritlog_blocks(self, min_rating, max_rating, min_instr, max_instr, min_api, max_api, is_nonzero):
151 """
152 get all blocks that are within the limits specified by the heuristic parameters.
153 parameters are the same as in function "update_thresholds" except
154 param is_nonzero: defines whether zeroing instructions (like xor eax, eax) shall be counted or not.
155 type is_nonzero: boolean
156 @return: a list of AritlogBasicBlock data objects, according to the parameters
157 """
158 self.update_thresholds(min_rating, max_rating, min_instr, max_instr, min_api, max_api)
159 return [block for block in self.aritlog_blocks if
160 (self.high_rating_threshold >= block.get_aritlog_rating(is_nonzero) >= self.low_rating_threshold) and
161 (self.high_instruction_threshold >= block.num_instructions >= self.low_instruction_threshold) and
162 (self.high_call_threshold >= block.num_calls_in_function >= self.low_call_threshold)]
163
165 """
166 returns the number of basic blocks that have been analyzed.
167 @return: (int) number of basic blocks
168 """
169 return len(self.aritlog_blocks)
170
171
172
173
174
176 """
177 returns the raw bytes of the segments as stored by IDA
178 @return: a list of Segment data objects.
179 """
180 segments = []
181 for segment_ea in self.ida_proxy.Segments():
182 try:
183 segment = self.Segment()
184 segment.start_ea = segment_ea
185 segment.end_ea = self.ida_proxy.SegEnd(segment_ea)
186 segment.name = self.ida_proxy.SegName(segment_ea)
187 buf = ""
188 for ea in xrange(segment_ea, self.ida_proxy.SegEnd(segment_ea)):
189 buf += chr(self.ida_proxy.get_byte(ea))
190 segment.data = buf
191 segments.append(segment)
192 except:
193 print ("Tried to access invalid segment data. An error has occurred while address conversion")
194 return segments
195
197 """
198 perform a scan ofr signatures. For matching, the standard python re module is used.
199 @return: A list of CryptoSignatureHit data objects
200 """
201 crypt_results = []
202 print ("Starting aritlog function enumeration.")
203 time_before_matching = self.time.time()
204 segments = self.get_segment_data()
205 print ("Segments under analysis: ")
206 for segment in segments:
207 print (segment)
208 print ("PatternManager initialized, number of signatures: %d" % len(self.pm.signatures))
209 keywords = self.pm.get_tokenized_signatures(pattern_size)
210 print ("PatternManager tokenized patterns into %d chunks of %d bytes" % (len(keywords.keys()), pattern_size))
211 for keyword in keywords.keys():
212 for segment in segments:
213 crypt_results.extend([self.CryptoSignatureHit(segment.start_ea + match.start(), \
214 keywords[keyword], keyword) for match in self.re.finditer(self.re.escape(keyword), segment.data)])
215 print ("Full matching took %3.2f seconds and resulted in %d hits" % (self.time.time() - time_before_matching, \
216 len(crypt_results)))
217 self.signature_hits = crypt_results
218 return crypt_results
219
221 """
222 returns the length for a signature, identified by its name
223 @param signature_name: name for a signature, e.g. "ADLER 32"
224 @type signature_name: str
225 @return: (int) length of the signature.
226 """
227 for item in self.pm.signatures.items():
228 if item[1] == signature_name:
229 return len(item[0])
230 return 0
231
233 """
234 get all references to a certain address.
235 These are no xrefs in IDA sense but references to the crypto signatures.
236 If the signature points to an instruction, e.g. if a constant is moved to a register, the return is flagged as
237 "True", meaning it is an in-code reference.
238 @param address: an arbitrary address
239 @type address: int
240 @return: a list of tuples (int, boolean)
241 """
242 xrefs = []
243 head_to_address = self.ida_proxy.PrevHead(address, address - 14)
244 if head_to_address != 0xFFFFFFFF:
245 flags = self.ida_proxy.GetFlags(head_to_address)
246 if self.ida_proxy.isCode(flags):
247 xrefs.append((head_to_address, True))
248 for x in self.ida_proxy.XrefsTo(address):
249 flags = self.ida_proxy.GetFlags(x.frm)
250 if self.ida_proxy.isCode(flags):
251 xrefs.append((x.frm, False))
252 return xrefs
253
255 """
256 Get all signature hits that have a length of at least match_filter_factor percent
257 of the signature they triggered.
258 Hits are grouped by signature names.
259 @return: a dictionary with key/value entries of the following form: ("signature name", [CryptoSignatureHit])
260 """
261 sorted_hits = sorted(self.signature_hits)
262 unified_hits = []
263
264 previous_signature_names = []
265 for hit in sorted_hits:
266 hit_intersection = [element for element in hit.signature_names if element in previous_signature_names]
267 if len(hit_intersection) == 0:
268 previous_signature_names = hit.signature_names
269 unified_hits.append(self.CryptoSignatureHit(hit.start_address, hit.signature_names, \
270 hit.matched_signature))
271 else:
272 previous_signature_names = hit_intersection
273 previous_hit = unified_hits[-1]
274 if hit.start_address == previous_hit.start_address + len(previous_hit.matched_signature):
275 previous_hit.matched_signature += hit.matched_signature
276 previous_hit.signature_names = hit_intersection
277 else:
278 unified_hits.append(self.CryptoSignatureHit(hit.start_address, hit.signature_names, \
279 hit.matched_signature))
280
281 filtered_hits = []
282 for hit in unified_hits:
283 if len(hit.matched_signature) >= max([self.match_filter_factor * \
284 self.get_signature_length(name) for name in hit.signature_names]):
285 hit.code_refs_to = self.get_xrefs_to_address(hit.start_address)
286 filtered_hits.append(hit)
287
288 grouped_hits = {}
289 for hit in filtered_hits:
290 for name in hit.signature_names:
291 if name not in grouped_hits:
292 grouped_hits[name] = [hit]
293 else:
294 grouped_hits[name].append(hit)
295
296 return grouped_hits
297