Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download

📚 The CoCalc Library - books, templates and other resources

132923 views
License: OTHER
1
2
import time
3
from mrjob.job import MRJob
4
from mrjob.protocol import RawValueProtocol, ReprProtocol
5
import re
6
7
8
class MrS3LogParser(MRJob):
9
"""Parses the logs from S3 based on the S3 logging format:
10
http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
11
12
Aggregates a user's daily requests by user agent and operation
13
14
Outputs date_time, requester, user_agent, operation, count
15
"""
16
17
LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
18
r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
19
r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
20
r'("([^"]+)"|-) ("([^"]+)"|-)'
21
NUM_ENTRIES_PER_LINE = 17
22
logpat = re.compile(LOGPATS)
23
24
(S3_LOG_BUCKET_OWNER,
25
S3_LOG_BUCKET,
26
S3_LOG_DATE_TIME,
27
S3_LOG_IP,
28
S3_LOG_REQUESTER_ID,
29
S3_LOG_REQUEST_ID,
30
S3_LOG_OPERATION,
31
S3_LOG_KEY,
32
S3_LOG_HTTP_METHOD,
33
S3_LOG_HTTP_STATUS,
34
S3_LOG_S3_ERROR,
35
S3_LOG_BYTES_SENT,
36
S3_LOG_OBJECT_SIZE,
37
S3_LOG_TOTAL_TIME,
38
S3_LOG_TURN_AROUND_TIME,
39
S3_LOG_REFERER,
40
S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)
41
42
DELIMITER = '\t'
43
44
# We use RawValueProtocol for input to be format agnostic
45
# and avoid any type of parsing errors
46
INPUT_PROTOCOL = RawValueProtocol
47
48
# We use RawValueProtocol for output so we can output raw lines
49
# instead of (k, v) pairs
50
OUTPUT_PROTOCOL = RawValueProtocol
51
52
# Encode the intermediate records using repr() instead of JSON, so the
53
# record doesn't get Unicode-encoded
54
INTERNAL_PROTOCOL = ReprProtocol
55
56
def clean_date_time_zone(self, raw_date_time_zone):
57
"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format
58
'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
59
a database such as Redshift or RDS
60
61
Note: requires the chars "[ ]" to be stripped prior to input
62
Returns the converted datetime annd timezone
63
or None for both values if failed
64
65
TODO: Needs to combine timezone with date as one field
66
"""
67
date_time = None
68
time_zone_parsed = None
69
70
# TODO: Probably cleaner to parse this with a regex
71
date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
72
time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
73
raw_date_time_zone.find("+") - 1]
74
time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]
75
76
try:
77
date_struct = time.strptime(date_parsed, "%d/%b/%Y")
78
converted_date = time.strftime("%Y-%m-%d", date_struct)
79
date_time = converted_date + " " + time_parsed
80
81
# Throws a ValueError exception if the operation fails that is
82
# caught by the calling function and is handled appropriately
83
except ValueError as error:
84
raise ValueError(error)
85
else:
86
return converted_date, date_time, time_zone_parsed
87
88
def mapper(self, _, line):
89
line = line.strip()
90
match = self.logpat.search(line)
91
92
date_time = None
93
requester = None
94
user_agent = None
95
operation = None
96
97
try:
98
for n in range(self.NUM_ENTRIES_PER_LINE):
99
group = match.group(1 + n)
100
101
if n == self.S3_LOG_DATE_TIME:
102
date, date_time, time_zone_parsed = \
103
self.clean_date_time_zone(group)
104
# Leave the following line of code if
105
# you want to aggregate by date
106
date_time = date + " 00:00:00"
107
elif n == self.S3_LOG_REQUESTER_ID:
108
requester = group
109
elif n == self.S3_LOG_USER_AGENT:
110
user_agent = group
111
elif n == self.S3_LOG_OPERATION:
112
operation = group
113
else:
114
pass
115
116
except Exception:
117
yield (("Error while parsing line: %s", line), 1)
118
else:
119
yield ((date_time, requester, user_agent, operation), 1)
120
121
def reducer(self, key, values):
122
output = list(key)
123
output = self.DELIMITER.join(output) + \
124
self.DELIMITER + \
125
str(sum(values))
126
127
yield None, output
128
129
def steps(self):
130
return [
131
self.mr(mapper=self.mapper,
132
reducer=self.reducer)
133
]
134
135
136
if __name__ == '__main__':
137
MrS3LogParser.run()
138