📚 The CoCalc Library - books, templates and other resources
License: OTHER
1import time2from mrjob.job import MRJob3from mrjob.protocol import RawValueProtocol, ReprProtocol4import re567class MrS3LogParser(MRJob):8"""Parses the logs from S3 based on the S3 logging format:9http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html1011Aggregates a user's daily requests by user agent and operation1213Outputs date_time, requester, user_agent, operation, count14"""1516LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \17r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \18r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \19r'("([^"]+)"|-) ("([^"]+)"|-)'20NUM_ENTRIES_PER_LINE = 1721logpat = re.compile(LOGPATS)2223(S3_LOG_BUCKET_OWNER,24S3_LOG_BUCKET,25S3_LOG_DATE_TIME,26S3_LOG_IP,27S3_LOG_REQUESTER_ID,28S3_LOG_REQUEST_ID,29S3_LOG_OPERATION,30S3_LOG_KEY,31S3_LOG_HTTP_METHOD,32S3_LOG_HTTP_STATUS,33S3_LOG_S3_ERROR,34S3_LOG_BYTES_SENT,35S3_LOG_OBJECT_SIZE,36S3_LOG_TOTAL_TIME,37S3_LOG_TURN_AROUND_TIME,38S3_LOG_REFERER,39S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)4041DELIMITER = '\t'4243# We use RawValueProtocol for input to be format agnostic44# and avoid any type of parsing errors45INPUT_PROTOCOL = RawValueProtocol4647# We use RawValueProtocol for output so we can output raw lines48# instead of (k, v) pairs49OUTPUT_PROTOCOL = RawValueProtocol5051# Encode the intermediate records using repr() instead of JSON, so the52# record doesn't get Unicode-encoded53INTERNAL_PROTOCOL = ReprProtocol5455def clean_date_time_zone(self, raw_date_time_zone):56"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format57'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into58a database such as Redshift or RDS5960Note: requires the chars "[ ]" to be stripped prior to input61Returns the converted datetime annd timezone62or None for both values if failed6364TODO: Needs to combine timezone with date as one field65"""66date_time = None67time_zone_parsed = None6869# TODO: Probably cleaner to parse this with a regex70date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]71time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:72raw_date_time_zone.find("+") - 1]73time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]7475try:76date_struct = time.strptime(date_parsed, "%d/%b/%Y")77converted_date = time.strftime("%Y-%m-%d", date_struct)78date_time = converted_date + " " + time_parsed7980# Throws a ValueError exception if the operation fails that is81# caught by the calling function and is handled appropriately82except ValueError as error:83raise ValueError(error)84else:85return converted_date, date_time, time_zone_parsed8687def mapper(self, _, line):88line = line.strip()89match = self.logpat.search(line)9091date_time = None92requester = None93user_agent = None94operation = None9596try:97for n in range(self.NUM_ENTRIES_PER_LINE):98group = match.group(1 + n)99100if n == self.S3_LOG_DATE_TIME:101date, date_time, time_zone_parsed = \102self.clean_date_time_zone(group)103# Leave the following line of code if104# you want to aggregate by date105date_time = date + " 00:00:00"106elif n == self.S3_LOG_REQUESTER_ID:107requester = group108elif n == self.S3_LOG_USER_AGENT:109user_agent = group110elif n == self.S3_LOG_OPERATION:111operation = group112else:113pass114115except Exception:116yield (("Error while parsing line: %s", line), 1)117else:118yield ((date_time, requester, user_agent, operation), 1)119120def reducer(self, key, values):121output = list(key)122output = self.DELIMITER.join(output) + \123self.DELIMITER + \124str(sum(values))125126yield None, output127128def steps(self):129return [130self.mr(mapper=self.mapper,131reducer=self.reducer)132]133134135if __name__ == '__main__':136MrS3LogParser.run()137138