#!/usr/bin/env python # -*- coding: utf-8 -*- from optparse import OptionParser from colors import * import re import math from pprint import pprint from PIL import Image, ImageDraw, ImageFont class LogEntry (object): def __init__(self, name, frames, certain): self.name = name self.frames = frames self.certain = certain def __str__(self): return "[%s - %s] %s %s" % (frame2time(self.frames[0]), frame2time(self.frames[1]), self.name, "t" if self.certain else "nil") class Hatchery (object): def __init__(self, id, frame): self.id = id self.available = frame self.last_known_existance = frame self.log = [] def inject(self, frame): self.log.append(LogEntry("Inject", (frame, frame + 40*64.053), True)) class Gateway (object): build_time_frames = 65 * 64.053 def __init__(self, frame): self.build_start = frame def finished(self, frame): return frame >= self.build_start + Gateway.build_time_frames class Warpgate (object): Cooldowns = { 'Zealot': 28 * 64.053, 'Stalker': 32 * 64.053, 'Sentry': 32 * 64.053, 'High Templar': 45 * 64.053, 'Dark Templar': 45 * 64.053 } def __init__(self, gateway, gateway_id, frame): self.next_available = 0 self.previous_warpin = 0 self.chrono_boost = 0 self.id = gateway_id self.gateway_available = gateway.build_start + Gateway.build_time_frames self.first_available = frame + 10 * 64.053 self.log = [] def available(self, frame): return frame >= self.next_available def warp(self, frame, unit, certain): self.next_available = frame + Warpgate.Cooldowns[unit] self.previous_warpin = frame self.log.append(LogEntry("%s" % unit, (frame, self.next_available), certain)) class WarpgateResearch (object): build_time = 140 * 64.053 chronoboost_duration = 20 * 64.053 def __init__(self): self.started = None self.log = [] def start(self, frame): if not self.started: self.started = frame def chronoboost(self, frame): self.log.append(LogEntry("Chronoboost", (frame, frame + 20*64.053), True)) def finished(self): # Start by merging all overlapping chrono boost during the # research and normalize to the start time boosts = [] for entry in self.log: if entry.frames[1] > self.started: extended = False if len(boosts) > 0: # Extend the previous boost if we're overlapping previous_end = boosts[-1][1] if previous_end > entry.frames[0]-self.started: previous_end = entry.frames[1]-self.started extended = True if not extended: boosts.append((entry.frames[0]-self.started, entry.frames[1]-self.started)) boosted_build_speed = 1.5 build_time_remaining = WarpgateResearch.build_time time = 0 for boost in boosts: # Regular build speed regular_time_reduction = boost[0]-time if regular_time_reduction > build_time_remaining: # It's going to complete before it's boosted again time += build_time_remaining build_time_remaining = 0 break else: time = boost[0] build_time_remaining -= regular_time_reduction # Boosted build speed boosted_time_reduction = (boost[1]-boost[0]) * boosted_build_speed if boosted_time_reduction > build_time_remaining: # It's going to be boosted until it's finished time += build_time_remaining/boosted_build_speed build_time_remaining = 0 break else: build_time_remaining -= boosted_time_reduction time = boost[1] return self.started + time # ---- # ---- # --- # # def frame2time(frame, digit_only=False): # 64.053fps seconds = float(frame)/64.053 minutes = math.floor(seconds/60) seconds = seconds - 60*minutes if seconds < 10: if digit_only: time = "%i:0%i" % (minutes, seconds) else: time = "%im0%is" % (minutes, seconds) else: if digit_only: time = "%i:%i" % (minutes, seconds) else: time = "%im%is" % (minutes, seconds) return time def draw_warpgates(warpgates, gateways, warpgate_research, last_action_frame, filename, playername): ppf = 1/64.053 end_buffer = ppf * Warpgate.Cooldowns['Dark Templar'] # The longest cooldown width = int(math.ceil(ppf * last_action_frame + end_buffer)) if width < 300: width = 300 # Remove those unupgraded gateways that won't have finished building by the time our graph stops l = gateways[:] for gw in l: if width < ppf*(gw.build_start+Gateway.build_time_frames): gateways.remove(gw) ppwg = 20 spacing = 2 nr_wgs = len(warpgates) nr_gws = len(gateways) legend_height = 20 title_height = 25 height = int(math.ceil(title_height + 2*legend_height + ppwg*(nr_wgs+nr_gws) + spacing * (nr_wgs+nr_gws+1))) img = Image.new('RGB', (width, height)) draw = ImageDraw.Draw(img) font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 17) unit_font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 16) used_color = "rgb(63,190,20)" available_color = "rgb(70,107,212)" unknown_color = "rgb(21,62,117)" background_color = "rgb(23,32,44)" white_color = "rgb(255,255,255)" gateway_color = "rgb(163,135,88)" research_color = "rgb(190,45,45)" boost_color = "rgb(184,242,243)" unit_name_conversion = { 'Zealot': 'Z', 'Stalker': 'S', 'Sentry': 'E', 'High Templar': 'T', 'Dark Templar': 'D' } # Draw the title x = 10 y = 2 draw.text((x, y), "%s's Warp Gates" % playername, font=font, fill=white_color) y_next = y + title_height # Draw unupgraded gateways gw_cnt = 0 gateways.sort(key=lambda x: x.build_start, reverse=True) y_current = y_next for gw in gateways: y1 = y_current + ppwg * gw_cnt + (spacing*gw_cnt) y2 = y_current + ppwg * (gw_cnt+1) + (spacing*gw_cnt) y_next = y2 # Draw the background for the entire x-axis draw.rectangle(((0, y1), (width, y2)), fill=background_color) # Draw the background for when it's a gateway x_gateway_available = ppf * (gw.build_start + Gateway.build_time_frames) draw.rectangle(((x_gateway_available, y1), (width, y2)), fill=gateway_color) # Write the gateway number draw.text((10, y1), "Unupgraded Gateway %i" % (len(gateways)-gw_cnt), font=font, fill=gateway_color) gw_cnt += 1 # Draw the warp gates in reverse order since the coordinate system is (0,0) in the top left corner wg_cnt = 0 warpgates.sort(key=lambda x: x.gateway_available, reverse=True) y_current = y_next + spacing for wg in warpgates: y1 = y_current + ppwg * wg_cnt + (spacing * wg_cnt) y2 = y_current + ppwg * (wg_cnt+1) + (spacing * wg_cnt) y_next = y2 # Draw the background for the entire x-axis draw.rectangle(((0, y1), (width, y2)), fill=background_color) # Draw the background for when it's a gateway x_gateway_available = ppf*wg.gateway_available draw.rectangle(((x_gateway_available, y1), (width, y2)), fill=gateway_color) # If this is the bottom warp gate, draw the warp gate research if wg_cnt == nr_wgs-1: x_research_start = ppf*warpgate_research.started x_research_finish = ppf*warpgate_research.finished() draw.rectangle(((x_research_start, y1), (x_research_finish, y2-ppwg+4)), fill=research_color) for entry in warpgate_research.log: x_boost_start = ppf*entry.frames[0] x_boost_start = max(x_boost_start, x_research_start) x_boost_end = ppf*entry.frames[1] x_boost_end = min(x_boost_end, x_research_finish) draw.rectangle(((x_boost_start, y1+ppwg-4), (x_boost_end, y2)), fill=boost_color) # Draw the background for when the warpgate is available x_available = ppf * wg.first_available draw.rectangle(((x_available, y1), (width, y2)), fill=available_color) x_unknown_state = x_available # Draw for when the gateway is used for entry in wg.log: x1 = int(math.ceil(ppf * entry.frames[0])) x2 = int(math.floor(ppf * entry.frames[1])) draw.rectangle(((x1, y1), (x2, y2)), fill=used_color) x_unknown_state = x2 # Write the unit name draw.text((x1+4, y1), unit_name_conversion[entry.name], font=unit_font, fill=white_color) # Write the Warp Gate number draw.text((10, y1), "Warp Gate %i" % (len(warpgates)-wg_cnt), font=font, fill=available_color) wg_cnt += 1 # Draw the part where it's unknown if the warp gate still exists draw.rectangle(((x_unknown_state+1, y1), (width, y2)), fill=unknown_color) y_current = y_next + spacing # Draw the color legend x = 10 y = y_current y_next = y_current + legend_height draw.text((x, y), "On cooldown", font=font, fill=used_color) x += 110 draw.text((x, y), "Available", font=font, fill=available_color) x += 80 draw.text((x, y), "Unit built", font=font, fill=white_color) x += 90 current_frame = x/ppf # Draw the time legend frames_per_minute = 60*64.053 # Start at the next even minute current_minute = math.ceil(current_frame/frames_per_minute) while current_minute < last_action_frame/frames_per_minute: current_frame = current_minute * frames_per_minute x = ppf * current_frame draw.text((x, y), frame2time(current_frame, digit_only=True), font=font, fill=white_color) current_minute += 1 y_current = y_next y = y_current - 4 x = 10 draw.text((x, y), "Warpgate Research", font=font, fill=research_color) x += 154 draw.text((x, y), "Chrono Boost", font=font, fill=boost_color) img.save(filename) print "Wrote",fg.GREEN+filename, fg.RESET def draw_hatcheries(hatcheries, last_action_frame, filename, playername): ppf = 1/64.053 end_buffer = 40 pph = 20 spacing = 2 nr_hatcheries = len(hatcheries) legend_height = 25 title_height = 25 width = int(math.ceil(ppf * last_action_frame + end_buffer)) if width < 300: width = 300 height = int(math.ceil(title_height + legend_height + pph * nr_hatcheries + spacing * (nr_hatcheries) + 1)) img = Image.new('RGB', (width, height)) draw = ImageDraw.Draw(img) font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 17) unit_font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 16) used_color = "rgb(255,182,123)" available_color = "rgb(80,59,42)" unknown_color = "rgb(49,36,35)" background_color = "rgb(17,12,9)" white_color = "rgb(255,255,255)" # Draw the title x = 10 y = 2 draw.text((x, y), "%s's Hatcheries" % playername, font=font, fill=white_color) y_next = y + title_height # Draw the color legend y = height - legend_height draw.text((x, y), "Available", font=font, fill=available_color) x += 80 draw.text((x, y), "Larvae Spawning", font=font, fill=used_color) x += 130 current_frame = x/ppf # Draw the time legend frames_per_minute = 60*64.053 # Start at the next even minute current_minute = math.ceil(current_frame/frames_per_minute) while current_minute < last_action_frame/frames_per_minute: current_frame = current_minute * frames_per_minute x = ppf * current_frame draw.text((x, y), frame2time(current_frame, digit_only=True), font=font, fill=white_color) current_minute += 1 # Draw the hatcheries in reverse order since the coordinate system is (0,0) in the top left corner hatch_cnt = 0 hatcheries.sort(key=lambda x: x.available, reverse=True) y_current = y_next for hatch in hatcheries: y1 = y_current + pph * hatch_cnt + (spacing*hatch_cnt) y2 = y_current +pph * (hatch_cnt+1) + (spacing*hatch_cnt) y_next = y2 # Draw the background for the entire x-axis draw.rectangle(((0, y1), (width, y2)), fill=background_color) # Draw the background for when the hatchery is available x_available = ppf * hatch.available draw.rectangle(((x_available, y1), (width, y2)), fill=available_color) x_unknown_state = x_available # Draw for when the gateway is used for entry in hatch.log: x1 = int(math.ceil(ppf * entry.frames[0])) x2 = int(math.floor(ppf * entry.frames[1])) draw.rectangle(((x1, y1), (x2, y2)), fill=used_color) x_unknown_state = x2 # Write the unit name # draw.text((x1+4, y2-20), unit_name_conversion[entry.name], font=unit_font, fill=white_color) # Draw the part where it's unknown if the hatch still exists x_unknown_state = max(x_unknown_state, ppf*hatch.last_known_existance) draw.rectangle(((x_unknown_state, y1), (width, y2)), fill=unknown_color) # Write the Hatchery number draw.text((10, y1-1), "Hatchery %i" % (len(hatcheries)-hatch_cnt), font=font, fill=used_color) hatch_cnt += 1 img.save(filename) print "Wrote",fg.GREEN+filename, fg.RESET def main(*args): usage = "usage: %prog [options] -n Playername actionsfile1 [actionsfile2...]" parser = OptionParser(usage) parser.add_option("-n", "--name", dest="name", help="player name to analyze for") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose output", default=False) parser.add_option("-d", "--debug", dest="debug", action="store_true", help="debug level output", default=False) (options, args) = parser.parse_args() if len(args) < 1: parser.error("incorrect number of arguments") if not options.name: parser.error("name required") for filename in args: process_file(filename, options) def process_file(filename, options): actionsfile = open(filename) if options.verbose: print "Parsing", filename # Misc time_and_name_re = re.compile(r'^\s*(\d+)\s+([^\s]+) ') build_gw_re = re.compile(r'Build Gateway;') build_hatch_re = re.compile(r'Build Hatchery;') train_wg_re = re.compile(r'Train (.+) \(Warp gate\);') upgrade_wg_re = re.compile(r'Upgrade to Warp Gate') research_wg_re = re.compile(r'Research Protoss Warp Gate') chronoboost_wg_re = re.compile(r'Chrono Boost \(Nexus\); target: Warp Gate \((\w+)\)') chronoboost_cc_re = re.compile(r'Chrono Boost \(Nexus\); target: Cybernetics Core \((\w+)\)') spawn_larva_re = re.compile(r'Spawn Larva \(Queen\); target: (\w+) \((\w+)\)') # Selects select_only_gw_re = re.compile(r'Select .*Gateway \((\w+)\), Deselect .+') select_additional_gw_re = re.compile(r'Select .*Gateway \((\w+)\)$') select_only_multiple_gw_re = re.compile(r'Select .*Gateway x\d+ \((.+)\), Deselect .+') select_additional_multiple_gw_re = re.compile(r'Select .*Gateway x\d+ \((.+)\)$') select_hatch_re = re.compile(r'Select .*Hatchery \((\w+)\)') # Hotkeys hotkey_select_re = re.compile(r'Hotkey Select (\d+)') hotkey_assign_re = re.compile(r'Hotkey Assign (\d+)$') hotkey_assign_add_re = re.compile(r'Hotkey Assign (\d+) \(add selection\)') # States tracked for the duration of the replay hatcheries = [] hatcheries_wo_id = [Hatchery(None, 0)] # The starting hatchery is without id to begin with warpgates = [] warpgate_research = WarpgateResearch() gateways = [] selected_gw = [] hotkeys = {} previous_warpin = (None, 0) dtime_cutoff = 50 last_action_frame = 0 for line in actionsfile: name = None time = None frame = None match = time_and_name_re.search(line) if match: frame = float(match.group(1)) name = match.group(2) line = re.sub(time_and_name_re, '', line) line = line.rstrip() else: continue if not name == options.name: continue if frame > last_action_frame: last_action_frame = frame time = frame2time(frame) match = spawn_larva_re.search(line) if match: building = match.group(1) id = match.group(2) if options.verbose: print time, "Spawn Larva on %s id:%s" % (building, id) found = False # If it's a known hatchery just inject it for hatch in hatcheries: if hatch.id == id: hatch.inject(frame) found = True break # If it's unknown we also create it if not found: if options.verbose: print time, "Found and injected previously unkown hatchery id%s" % id new_hatch = Hatchery(id, frame) new_hatch.inject(frame) hatcheries.append(new_hatch) continue match = build_gw_re.search(line) if match: if options.verbose: print time, "Build Gateway" gateways.append(Gateway(frame)) continue match = build_hatch_re.search(line) if match: if options.verbose: print time, "Build Hatchery" hatcheries_wo_id.append(Hatchery(None, frame + 100*64.053)) continue # We keep track of currently selected gateways so that we can # upgrade the right ones to warp gates when that command comes match = select_only_gw_re.search(line) if match: select = match.group(1) if options.verbose: print time, "Selected only Gateway id:%s" % select selected_gw = [select] continue match = select_additional_gw_re.search(line) if match: select = match.group(1) if options.verbose: print time, "Selected additional Gateway id:%s" % select selected_gw.append(select) continue match = select_only_multiple_gw_re.search(line) if match: select = match.group(1) if options.verbose: print time, "Selected multiple only Gateways id:%s" % select selected_gw = [] for gw in select.rsplit(','): selected_gw.append(gw) continue match = select_additional_multiple_gw_re.search(line) if match: select = match.group(1) if options.verbose: print time, "Selected additional Gateways id:%s" % select for gw in select.rsplit(','): selected_gw.append(gw) continue match = select_hatch_re.search(line) if match: select = match.group(1) if options.verbose: print time, "Selected Hatchery id:%s" % select # Check if this hatchery id is already known existing = False for hatch in hatcheries: if select == hatch.id: existing = True hatch.last_known_existance = frame break # If it's new we use it to identify the most recently # added unidentified hatchery if not existing: if len(hatcheries_wo_id) > 0: new_hatchery = hatcheries_wo_id[-1] new_hatchery.id = select new_hatchery.last_known_existance = frame del(hatcheries_wo_id[-1]) hatcheries.append(new_hatchery) if options.verbose: print time, "Identified Hatchery id:%s from previously unidentified" % select # If we don't have any unidentified hatcheries and # select one we haven't seen before it's one of the # opponent's and is ignored continue # We keep track of gateways stored in hotkeys so that they # also count as selected properly when the upgrade to warp # gate command comes # Note: gateways will bleed into other hotkeys as well, but # this is ok since we don't use them for anything match = hotkey_assign_re.search(line) if match: select = match.group(1) if options.debug: print time, "Assigned hotkey %s" % select if len(selected_gw) > 0: hotkeys[select] = list(selected_gw) if options.debug: pprint(hotkeys[select]) continue match = hotkey_assign_add_re.search(line) if match: select = match.group(1) if options.debug: print time, "Assign added hotkey %s" % select if len(selected_gw) > 0: currently_hotkeyed = [] if select in hotkeys: currently_hotkeyed = hotkeys[select] for gw in selected_gw: if not gw in currently_hotkeyed: currently_hotkeyed.append(gw) hotkeys[select] = currently_hotkeyed if options.debug: pprint(hotkeys[select]) continue match = hotkey_select_re.search(line) if match: select = match.group(1) if options.debug: print time, "Selected hotkey %s" % select if select in hotkeys: selected_gw = list(hotkeys[select]) if options.debug: pprint(hotkeys[select]) continue match = train_wg_re.search(line) if match: unit = match.group(1) if options.verbose: print time, "Train %s" % unit dtime = frame - previous_warpin[1] done = False # Find a Warp gate with a cooldown available (in order) for wg in warpgates: if wg.available(frame): certain = dtime > dtime_cutoff wg.warp(frame, unit, certain) previous_warpin = (unit, frame) done = True if options.verbose: print fg.GREEN if certain else fg.YELLOW, dtime, frame, fg.RESET break if not done: # If we are certain this is a real warp-in we find the # most recent uncertain warp-in and replace it # Note: needs to check back for the duration of the # longest warp-in cooldown until it finds a uncertain # warp-in to remove. Only checks the most recent at # the moment if dtime > dtime_cutoff: for wg in warpgates: if wg.previous_warpin == previous_warpin[1] and wg.log[-1].certain == False: if options.verbose: print fg.RED+"Removing %s" % wg.log[-1], fg.RESET del wg.log[-1] certain = True wg.warp(frame, unit, certain) previous_warpin = (unit, frame) if options.verbose: print fg.GREEN if certain else fg.YELLOW, dtime, frame, fg.RESET break else: # We don't replace uncertain warp-ins with another uncertain one if options.verbose: print fg.RED+ time, dtime, previous_warpin[1], "Ignoring uncertain %s warp-in" % unit, fg.RESET # Replacing uncertain warp-ins like this will handle the # case that the game registered too many warp-in commands # when getting cooldown blocked, but not when it was # resource or supply blocked. continue match = chronoboost_wg_re.search(line) if match: target_id = match.group(1) if options.verbose: print time, "Chrono Boosting Warp Gate id:%s" % target_id continue match = chronoboost_cc_re.search(line) if match: target_id = match.group(1) if options.verbose: print time, "Chrono Boosting Cybernetics Core id:%s" % target_id warpgate_research.chronoboost(frame) continue match = research_wg_re.search(line) if match: if options.verbose: print time, "Researching Warp Gate" warpgate_research.start(frame) continue # Upgrade a selected and finished gateway to a warp gate. Each # upgraded gateway requires a command line of its own even if # several eligable gateways are selected match = upgrade_wg_re.search(line) if match: if len(selected_gw) > 0: if len(gateways) > 0 and gateways[0].finished(frame): gateway_id = selected_gw[0] gateway = gateways[0] if options.verbose: print time, "Upgrading Warpgate id:%s" % gateway_id warpgates.append(Warpgate(gateway, gateway_id, frame)) selected_gw.remove(gateway_id) gateways.remove(gateway) continue if options.verbose: silent = True for wg in warpgates: print fg.LIGHTBLUE+"----Warp Gate %s-----" % wg.id, fg.RESET for entry in wg.log: print entry silent = False for hatch in hatcheries: print fg.LIGHTBLUE+"----Hatchery %s-----" % hatch.id, fg.RESET for entry in hatch.log: print entry silent = False if not silent: print fg.LIGHTBLUE+"------------------------", fg.RESET if len(warpgates) > 0: draw_warpgates(warpgates, gateways, warpgate_research, last_action_frame, "%s_%s_warpgates.png" % (filename, options.name), options.name) if len(hatcheries) > 0: draw_hatcheries(hatcheries, last_action_frame, "%s_%s_injects.png" % (filename, options.name), options.name) if __name__ == '__main__': main() # TODO chrono boost # plotta warpgate upgrade på första warp-gaten, samt visa när den chrono boostas!! # Potential bugs # Chrono boost på warpgate-uppgraderingen kan ge upphov till warp-ins som inte hinns med