Reference Guide
A comprehensive reference for log analysis patterns, techniques, and decision-making.
Parsing Techniques
1. String Manipulation (split, find, slice)
When to use: Simple, predictable log formats with clear delimiters
Example: Parsing Apache/Nginx logs
Sample log format:
192.168.1.10 - - [15/Jan/2025:10:23:45 +0000] "GET /api/checkout HTTP/1.1" 200 1523 2.45
192.168.1.25 - - [15/Jan/2025:10:24:12 +0000] "POST /api/orders HTTP/1.1" 201 847 1.82
192.168.1.10 - - [15/Jan/2025:10:25:03 +0000] "GET /api/products HTTP/1.1" 500 0 0.15Parsing code:
data = line.strip().split()
ip = data[0]
status = int(data[-3])
bytes_size = int(data[-2])
latency = float(data[-1])
# Extract bracketed content
start_ts = line.find('[')
end_ts = line.find(']')
timestamp = line[start_ts + 1:end_ts]
# Extract quoted content
req_start = line.find('"')
req_end = line.find('"', req_start + 1)
request = line[req_start + 1:req_end].split()
method = request[0]
endpoint = request[1]Pros: Fast, no dependencies, easy to understand Cons: Fragile with varying formats, manual index management
2. Regular Expressions
When to use: Complex patterns, need validation, multiple formats
Example: Robust log parsing with named groups
Sample log format (same as above):
192.168.1.10 - - [15/Jan/2025:10:23:45 +0000] "GET /api/checkout HTTP/1.1" 200 1523 2.45
192.168.1.25 - - [15/Jan/2025:10:24:12 +0000] "POST /api/orders HTTP/1.1" 201 847 1.82
192.168.1.10 - - [15/Jan/2025:10:25:03 +0000] "GET /api/products HTTP/1.1" 500 0 0.15Parsing code with validation:
import re
LOG_PATTERN = r'(?P<IP>\d+\.\d+\.\d+\.\d+)\s-\s-\s\[(?P<TIMESTAMP>.*?)\]\s"(?P<REQUEST>.*?)"\s(?P<STATUS>\d{3})\s(?P<BYTES>\d+)\s(?P<LATENCY>\d+\.\d+)'
with open('file.txt', 'r') as f:
for line in f:
match = re.fullmatch(LOG_PATTERN, line.strip())
if match:
log_data = match.groupdict()
ip = log_data['IP']
status = int(log_data['STATUS'])
# ... process data
else:
print(f"Skipping unparseable line: {line.strip()}")Common patterns:
- IP address:
\d+\.\d+\.\d+\.\d+ - Timestamp in brackets:
\[(?P<TIMESTAMP>.*?)\] - Quoted content:
"(?P<REQUEST>.*?)" - Word boundaries:
\b\w+\b(useful for word extraction) - Log levels:
\[(INFO|WARN|ERROR|DEBUG)\]
Pros: Validation built-in, handles variations, self-documenting with named groups Cons: Slower than string methods, requires regex knowledge
3. JSON Parsing (Line-delimited JSON)
When to use: Structured logs (JSON lines format - one JSON object per line)
Example: Kubernetes events, structured application logs
Sample log format (each line is a separate JSON object):
{"type": "Normal", "reason": "Scheduled", "involvedObject": {"kind": "Pod", "name": "nginx-abc123", "namespace": "default"}, "timestamp": "2025-01-15T10:23:45Z"}
{"type": "Warning", "reason": "BackOff", "involvedObject": {"kind": "Pod", "name": "api-xyz789", "namespace": "production"}, "timestamp": "2025-01-15T10:24:12Z"}
{"type": "Warning", "reason": "FailedMount", "involvedObject": {"kind": "Pod", "name": "db-pod-001", "namespace": "default"}, "timestamp": "2025-01-15T10:25:03Z"}Parsing code:
import json
with open("events.json", "r") as f:
for line in f:
event = json.loads(line)
event_type = event.get("type", "Unknown")
obj = event.get("involvedObject", {})
kind = obj.get("kind")
if kind == "Pod" and event_type == "Warning":
# Process warning events
passKey techniques:
- Use
.get()with defaults to handle missing fields safely - Navigate nested structures with chained
.get()calls# Example: Safely get nested value pod_name = event.get("involvedObject", {}).get("name", "unknown") - Check for None:
if data.get("field") != None:
Pros: Handles complex nested data, type-safe, widely supported Cons: Requires valid JSON, memory overhead for large objects
4. Delimited Values (CSV-like formats)
When to use: /etc/passwd, CSV files, simple delimited configs
Example: Parse /etc/passwd
# Format: username:password:UID:GID:GECOS:home_dir:login_shell
with open("passwd.txt", "r") as f:
for line in f:
# Skip empty lines and comments
cleaned_line = line.strip()
if not cleaned_line or cleaned_line.startswith("#"):
continue
data = line.split(":")
username = data[0]
uid = data[2]
gid = data[3]Best practices:
- Always strip whitespace before checking for empty lines
- Handle comments early (skip lines starting with
#)
Data Structures
Decision Matrix: Which data structure to use?
| Need | Use | Example |
|---|---|---|
| Count occurrences | dict with .get() OR defaultdict(int) OR Counter | Word frequency, IP counts |
| Track unique items | set | Unique pod names, distinct users |
| Maintain order | list | Parsed logs, time-series data |
| Group by key | dict with list values OR defaultdict(list) | Events per pod, requests per IP |
| Top N items | Counter.most_common(N) OR manual sort | Most common errors, top endpoints |
| First occurrence only | dict (check if key not in dict) | Session start times |
1. Dictionary Patterns
Basic Counting
# Method 1: Manual with .get()
count = {}
for item in items:
count[item] = count.get(item, 0) + 1
# Method 2: Check existence
count = {}
for item in items:
if item in count:
count[item] += 1
else:
count[item] = 1
# Method 3: setdefault (concise)
count = {}
for item in items:
count.setdefault(item, 0)
count[item] += 1When to use each:
.get(): Most Pythonic, recommended for simple cases- Check existence: More explicit, easier for beginners
.setdefault(): Useful when setting complex defaults
Tracking First Occurrence Only
Parsing code:
# Pattern: Session tracking, first event timestamp
active_sessions = {}
for event in events:
user_id = event['user_id']
action = event['action']
# Only record first login
if action == "login" and user_id not in active_sessions:
active_sessions[user_id] = event['timestamp']
# Process logout only if we have a login
if action == "logout" and user_id in active_sessions:
# Calculate duration
duration = event['timestamp'] - active_sessions[user_id]
del active_sessions[user_id] # Clean up completed sessionKey insight: Check not in dict before adding to ensure first-only behavior
2. defaultdict - Cleaner Counting & Grouping
When to use: Multiple counters, grouping data, avoid repetitive .get() calls
from collections import defaultdict
# Counting (auto-initializes to 0)
ip_counts = defaultdict(int)
for log in logs:
ip_counts[log['ip']] += 1 # No need for .get() or checking
# Grouping (auto-initializes to empty list)
events_by_pod = defaultdict(list)
for event in events:
pod_name = event['pod']
events_by_pod[pod_name].append(event)Comparison:
# Counting without defaultdict
ip_counts = {}
for log in logs:
ip = log['ip']
if ip not in ip_counts:
ip_counts[ip] = 0
ip_counts[ip] += 1
# Grouping without defaultdict
events_by_pod = {}
for event in events:
pod_name = event['pod']
if pod_name not in events_by_pod:
events_by_pod[pod_name] = []
events_by_pod[pod_name].append(event)Pros: Cleaner code, no initialization boilerplate Cons: Slightly less explicit, creates keys on access
3. Counter - Advanced Counting
When to use: Need most_common(), combining counts, or multiple count operations
from collections import Counter
# Direct initialization
word_counts = Counter(word_list)
# Incremental updates
log_levels = Counter()
for line in logs:
level = extract_level(line)
log_levels[level] += 1
# Update with iterable
reasons = []
for event in events:
if event['type'] == "Warning":
reasons.append(event['reason'])
reason_counts = Counter(reasons)
# Get top N
for reason, count in reason_counts.most_common(3):
print(f"{reason}: {count}")Comparison:
# Direct initialization - manual counting from list
word_counts = {}
for word in word_list:
word_counts[word] = word_counts.get(word, 0) + 1
# Incremental updates - manual counting
log_levels = {}
for line in logs:
level = extract_level(line)
log_levels[level] = log_levels.get(level, 0) + 1
# Get top N - manual sorting
reasons = []
for event in events:
if event['type'] == "Warning":
reasons.append(event['reason'])
reason_counts = {}
for reason in reasons:
reason_counts[reason] = reason_counts.get(reason, 0) + 1
sorted_reasons = sorted(reason_counts.items(), key=lambda x: x[1], reverse=True)
for reason, count in sorted_reasons[:3]:
print(f"{reason}: {count}")Counter-specific features:
# Combining counters
count1 = Counter(['a', 'b', 'c'])
count2 = Counter(['b', 'c', 'd'])
combined = count1 + count2 # Counter({'b': 2, 'c': 2, 'a': 1, 'd': 1})
# Update (add to existing)
count1.update(count2)
# Most common without limit
all_sorted = word_counts.most_common() # All items, sorted by countPros: Built-in most_common(), arithmetic operations, readable
Cons: Overkill for simple counting, slight memory overhead
4. Sets - Uniqueness Tracking
When to use: Deduplicate, membership testing, finding unique items
unique_pods = set()
for event in events:
pod_name = f"{event['namespace']}/{event['name']}"
unique_pods.add(pod_name)
print(f"Total unique pods: {len(unique_pods)}")
# Conditional uniqueness
pods_with_warnings = set()
for event in events:
if event['type'] == "Warning":
pods_with_warnings.add(event['pod_name'])Set operations:
set1 = {1, 2, 3}
set2 = {2, 3, 4}
intersection = set1 & set2 # {2, 3}
union = set1 | set2 # {1, 2, 3, 4}
difference = set1 - set2 # {1}Key insight: Sets automatically deduplicate, O(1) membership testing
Analysis Patterns
1. Aggregation & Statistics
Counting by Category
success_count = 0
failure_count = 0
for log in parsed_logs:
status = log['status']
if 200 <= status < 300:
success_count += 1
else:
failure_count += 1
success_rate = round((success_count / (success_count + failure_count)) * 100, 2)Running Totals (for averages)
total_bytes = 0
total_latency = 0
request_count = 0
for log in logs:
total_bytes += log['bytes']
if 200 <= log['status'] < 300:
total_latency += log['latency']
request_count += 1
avg_bytes = round(total_bytes / len(logs), 2)
avg_latency = round(total_latency / request_count, 2) if request_count > 0 else 0Best practice: Always check for division by zero
Collecting for Percentiles
import numpy as np
checkout_latencies = []
for log in logs:
if log['endpoint'] == "/api/checkout":
checkout_latencies.append(log['latency'])
if len(checkout_latencies) > 0:
p95 = round(np.percentile(checkout_latencies, 95), 2)
p99 = round(np.percentile(checkout_latencies, 99), 2)
print(f"P95: {p95}, P99: {p99}")
else:
print("No data for percentile calculation")Why collect first: Percentiles require sorted data, easier to collect then calculate
2. Sorting & Ranking
Top N Pattern
# Method 1: Sort items by count
endpoint_counts = defaultdict(int)
for log in logs:
endpoint_counts[log['endpoint']] += 1
# Sort by value (count), descending
sorted_endpoints = sorted(endpoint_counts.items(), key=lambda x: x[1], reverse=True)
# Get top 3
for endpoint, count in sorted_endpoints[:3]:
print(f"{endpoint}: {count}")# Method 2: Using Counter
from collections import Counter
endpoints = [log['endpoint'] for log in logs]
endpoint_counts = Counter(endpoints)
for endpoint, count in endpoint_counts.most_common(3):
print(f"{endpoint}: {count}")Key insight: sorted() with key=lambda is versatile for any sorting need
Finding Extremes
# Find maximum by attribute
slowest_request = max(parsed_logs, key=lambda x: x['latency'])
print(f"Slowest: {slowest_request}")
# Find minimum
fastest_request = min(parsed_logs, key=lambda x: x['latency'])
# Multiple criteria (e.g., slowest error)
slowest_error = max(
[log for log in logs if log['status'] >= 400],
key=lambda x: x['latency'],
default=None
)Best practice: Use default=None with max()/min() on filtered lists to avoid errors
3. Filtering & Grouping
Multi-condition Filtering
# Method 1: Chain conditions with 'and'
pods_with_warnings = set()
for event in events:
obj = event.get("involvedObject", {})
kind = obj.get("kind")
namespace = obj.get("namespace")
event_type = event.get("type")
if kind == "Pod" and event_type == "Warning" and namespace == "default":
pod_name = f"{namespace}/{obj.get('name')}"
pods_with_warnings.add(pod_name)
# Method 2: Early continue for readability
pods_with_warnings = set()
for event in events:
obj = event.get("involvedObject", {})
# Skip non-Pod events early
if obj.get("kind") != "Pod":
continue
# Skip non-Warning events
if event.get("type") != "Warning":
continue
# Skip non-default namespace
if obj.get("namespace") != "default":
continue
pod_name = f"{obj.get('namespace')}/{obj.get('name')}"
pods_with_warnings.add(pod_name)Pattern: Chain conditions with and for compact code, or use early continue for better readability with many conditions
Grouping by Key
# Group events by pod
pod_events = defaultdict(list)
for event in events:
pod_name = f"{event['namespace']}/{event['name']}"
pod_events[pod_name].append(event)
# Process grouped data
for pod_name, events in pod_events.items():
if len(events) > 10:
print(f"{pod_name} has many events: {len(events)}")Nested Grouping (Dictionary of Dictionaries)
# Track first "Scheduled" and "Killing" per pod
pod_timestamps = {}
for event in events:
pod_name = event['pod']
# Initialize pod entry if not exists
if pod_name not in pod_timestamps:
pod_timestamps[pod_name] = {"Scheduled": None, "Killing": None}
# Record first occurrence only
reason = event['reason']
if reason == "Scheduled" and pod_timestamps[pod_name]["Scheduled"] is None:
pod_timestamps[pod_name]["Scheduled"] = event['timestamp']
if reason == "Killing" and pod_timestamps[pod_name]["Killing"] is None:
pod_timestamps[pod_name]["Killing"] = event['timestamp']Alternative with setdefault:
pod_events = {}
for event in events:
pod_name = event['pod']
pod_events.setdefault(pod_name, {"Scheduled": None, "Killing": None})
if event['reason'] == "Scheduled" and pod_events[pod_name]["Scheduled"] is None:
pod_events[pod_name]["Scheduled"] = event['timestamp']4. Two-Pass Processing Pattern
When to use: Need to process data, then fix/correct it
Example: Find duplicate UIDs and reassign
# Pass 1: Identify duplicates and collect all UIDs
dup_uids = {}
uid_list = set()
with open("passwd.txt", "r") as f:
for line in f:
if not line.strip() or line.startswith("#"):
continue
data = line.strip().split(":")
user = data[0]
uid = data[2]
if uid in dup_uids:
dup_uids[uid].append(user)
else:
dup_uids[uid] = [user]
uid_list.add(int(uid))
# Identify which users need reassignment (all but first)
movers_list = []
for uid, users in dup_uids.items():
if len(users) > 1:
print(f"Duplicate UID: {uid} | {users}")
movers_list.extend(users[1:]) # Keep first, move rest
# Find available UIDs
new_uid = 1001
new_available_uids = []
while len(new_available_uids) < len(movers_list):
if new_uid not in uid_list:
new_available_uids.append(new_uid)
new_uid += 1
# Create reassignment mapping
assignments = {user: uid for user, uid in zip(movers_list, new_available_uids)}
# Pass 2: Rewrite file with corrections
corrected_lines = []
with open("passwd.txt", "r") as f:
for line in f:
if not line.strip() or line.startswith("#"):
corrected_lines.append(line)
continue
data = line.strip().split(":")
user = data[0]
if user in assignments:
data[2] = str(assignments[user])
corrected_lines.append(":".join(data) + "\n")
# Write back
with open("passwd.txt", "w") as f:
f.writelines(corrected_lines)Key pattern:
- First pass: Analyze and identify what needs fixing
- Build data structures for corrections
- Second pass: Apply corrections
Time & Date Handling
1. Parsing ISO Format Timestamps
from datetime import datetime, timedelta
# Parse ISO format (most common in logs)
timestamp_str = "2025-01-15T10:23:45+00:00"
dt_object = datetime.fromisoformat(timestamp_str)Common timestamp formats:
- ISO 8601:
2025-01-15T10:23:45+00:00→datetime.fromisoformat() - Custom format:
15/Jan/2025:10:23:45 +0000→datetime.strptime(ts, "%d/%b/%Y:%H:%M:%S %z")
2. Calculating Durations
# Duration between two timestamps
start_time = datetime.fromisoformat(event1['timestamp'])
end_time = datetime.fromisoformat(event2['timestamp'])
duration = end_time - start_time # Returns timedelta object
print(duration) # e.g., "0:05:23.451234"3. Averaging Time Durations
from datetime import timedelta
total_lifecycle = timedelta(0)
pod_count = 0
for pod_name, events in pod_events.items():
if "Scheduled" in events and "Killing" in events:
scheduled_time = datetime.fromisoformat(events["Scheduled"])
killing_time = datetime.fromisoformat(events["Killing"])
lifecycle = killing_time - scheduled_time
total_lifecycle += lifecycle
pod_count += 1
if pod_count > 0:
average_lifecycle = total_lifecycle / pod_count
print(f"Average pod lifecycle: {average_lifecycle}")Key insights:
timedeltaobjects (durations) can be added togetherdatetimeobjects (points in time) cannot be added together- Subtracting two
datetimeobjects produces atimedelta:dt2 - dt1 = timedelta - Adding a
timedeltato adatetimeproduces a newdatetime:dt + timedelta = new_dt - Division by integer gives average
timedelta
Operations summary:
# ✅ Valid operations
duration1 + duration2 # timedelta + timedelta = timedelta
datetime2 - datetime1 # datetime - datetime = timedelta
datetime1 + timedelta(hours=5) # datetime + timedelta = datetime
# ❌ Invalid operation
datetime1 + datetime2 # datetime + datetime = TypeError (conceptually meaningless)4. Sorting by Timestamp (Pre-processing)
When to use: Logs are out of order, need chronological processing
# Read all logs into memory first
session_logs = []
with open("session.json", "r") as f:
for line in f:
if line.strip():
log = json.loads(line)
session_logs.append(log)
# Sort by timestamp
sorted_sessions = sorted(
session_logs,
key=lambda x: datetime.fromisoformat(x["timestamp"])
)
# Now process in chronological order
for session in sorted_sessions:
# Process sessions sequentially
passTrade-off: Requires loading all data into memory, but ensures correctness
Common Pitfalls & Solutions
1. Division by Zero
Problem: Calculating averages when no data exists
# BAD
average = total / count # Crashes if count == 0
# GOOD
if count > 0:
average = total / count
else:
print("No data available")
average = 02. Skipping Empty Lines & Comments
Problem: Parsing errors on blank lines or commented lines
# GOOD pattern
with open("file.txt", "r") as f:
for line in f:
# Always strip and check
cleaned_line = line.strip()
if not cleaned_line or cleaned_line.startswith("#"):
continue
# Now safe to parse
data = cleaned_line.split(":")Key insight: Strip BEFORE checking emptiness (whitespace-only lines)
3. Handling Missing JSON Fields
Problem: KeyError when accessing nested JSON
# BAD
kind = event["involvedObject"]["kind"] # Crashes if key missing
# GOOD
obj = event.get("involvedObject", {})
kind = obj.get("kind")
# Check for None explicitly if needed
if kind is not None:
# Process
pass4. String Index Errors
Problem: find() returns -1 if not found, causing slice errors
# BAD
start = line.find('[')
end = line.find(']')
timestamp = line[start+1:end] # Crashes if '[' or ']' not found
# GOOD
try:
start = line.index('[')
end = line.index(']')
timestamp = line[start+1:end]
except ValueError:
# Handle lines without brackets
continueDifference:
find()returns -1 if not found (still usable but be careful)index()raisesValueErrorif not found (clearer error handling)
5. Modifying Lists While Iterating
Problem: Changing a list while looping over it
# BAD
for item in my_list:
if condition:
my_list.remove(item) # Can skip items or crash
# GOOD - Create new list
filtered_list = [item for item in my_list if not condition]
# GOOD - Iterate over copy
for item in my_list[:]: # List slicing [:] creates a copy
if condition:
my_list.remove(item)6. Case Sensitivity in Matching
Problem: Missing matches due to case differences
# For case-insensitive matching
word = word.lower()
text = text.lower()
# When parsing log levels
level = match.upper() # Normalize to uppercase7. Punctuation in Text Analysis
Problem: Words with punctuation counted separately (“hello,” vs “hello”)
# Method 1: Using string.punctuation
import string
punctuation = string.punctuation
for punc in punctuation:
text = text.replace(punc, " ")
words = text.lower().split()
# Method 2: Using regex
import re
words = re.findall(r'\b\w+\b', text.lower())Quick Decision Guide
“Which parsing method should I use?”
Is the format simple and consistent with clear delimiters?
└─ YES → String manipulation (split, find, slice)
└─ NO → Do you need to:
- Validate format (e.g., check if IP address is valid)?
- Handle variations (e.g., optional fields, different formats)?
- Extract complex patterns (e.g., email addresses, URLs)?
└─ YES → Regular expressions
└─ NO → Is it JSON?
└─ YES → json.loads()
└─ NO → Is it colon/comma separated?
└─ YES → split(":")“Which counting method should I use?”
Just counting one thing?
└─ YES → dict with .get(key, 0) + 1
└─ NO → Multiple counters needed?
└─ YES → defaultdict(int)
└─ NO → Need most_common() or arithmetic?
└─ YES → Counter“How do I get top N items?”
Already using Counter?
└─ YES → Use .most_common(N)
└─ NO → Use sorted(dict.items(), key=lambda x: x[1], reverse=True)[:N]“Should I process line-by-line or load all into memory?”
Do I need to:
- Sort by timestamp or other field?
- Match entries across the file (e.g., pair login with logout)?
- Look ahead/back to other lines (e.g., find max before processing)?
└─ YES → Load all into memory (list), then sort/process
└─ NO → Can I calculate everything in a single pass?
└─ YES → Process line-by-line (memory efficient)
└─ NO → Need two passes (e.g., identify issues, then fix them)?
└─ YES → Load into memory or read file twice“When should I use sets?”
Do I need to:
- Track unique items only? → YES → Use set
- Check if item already seen? → YES → Use set (O(1) lookup)
- Count occurrences? → NO → Use dict/Counter insteadPerformance Tips
- Line-by-line processing is more memory-efficient than loading entire file
- Compile regex patterns outside loops:
pattern = re.compile(r'...') - Use sets for membership testing instead of lists (O(1) vs O(n))
- Avoid repeated .get() on same dict - cache result:
obj = event.get("involvedObject", {}) - List comprehensions are faster than for-loops with append
- defaultdict/Counter are faster than manual dict initialization
Common Imports Cheatsheet
import json # JSON parsing
import re # Regular expressions
import numpy as np # Percentile calculations
from datetime import datetime, timedelta # Time handling
from collections import defaultdict, Counter # Advanced dicts
import string # string.punctuation
import pprint # Pretty printing (debugging)Debugging Tips
# Pretty print nested structures
import pprint
pprint.pprint(data)
# Print formatted JSON
print(json.dumps(event, indent=4))
# Check what regex matched
match = re.match(pattern, line)
if match:
print(match.groupdict())