Terminal/scripts/contributions/watch_replay.py
2021-06-01 10:50:27 -05:00

1346 lines
50 KiB
Python

#!/usr/bin/env python
'''
------------------------------------------------------------------------------------------------
Author: @Isaac
Last Updated: 14 Nov 2018
Contact: Message @Isaac at https://forum.c1games.com/
Copyright: CC0 - completely open to edit, share, etc
Short Description:
This is a python script to visualize replay files in a similar format to the online version.
You can also run games and watch them as they play (this is slow).
Lastly, it allows you to save these visualizations in video format.
------------------------------------------------------------------------------------------------
README:
This program assumes this file is in the contributions/scripts directory
This script takes an input of a replay file and displays it visually.
Alternatively, it can run a match and you can visualize it as the game engine runs.
There are a variety of ways you can tell it how to run; I will give an example of each.
The default is (runs latest replay):
>py scripts/contributions/watch_replay.py
----------------------------------------------------------------------------------------
-f: Run a specific replay file
You can specify which file you would like to run by:
>py scripts/contributions/watch_replay.py -f [REPLAY_FILE].replay
where REPLAY_FILE is the file you'd like to look at. You can list more than one, but it will
NOT display more than one replay.
----------------------------------------------------------------------------------------
-b: Blitting
You can specify whether to use blitting with this flag:
>py scripts/contributions/watch_replay.py -b
Blitting will improve the performance of the graphing, but comes at the cost of several features:
- You will not be able to use or see the slider
- You may notice small graphical glitches with text updating (will not show if you do not use the keyboard inputs)
- If you run the program in real-time the player names will not update (the winner name will be displayed correctly).
----------------------------------------------------------------------------------------
-run: Real-time watching
This means you will run and watch a game in real time.
The program will start a match like you normally would, but will also open the visualizer when possible.
You can specify the algos you would like to run like normal:
>py scripts/contributions/watch_replay.py -run algos/my-bot1 algos/my-bot2
You cannot save a game if you watch it in real-time.
The slider will not appear until the game has completed (you can still use the keyboard commands).
If you use blitting, the player names will not update.
----------------------------------------------------------------------------------------
-s: Save
You can specify if you would like to save the replay.
This saves a direct copy of what you would see if you ran the program normally.
You must specify the name the file should be saved as. For example:
>py scripts/contributions/watch_replay.py -s awesome_video
This may take a while and seem to hang, but it has to go through the entire replay as if you were watching it.
There are three possible file formats to save the replay in: .mp4, .gif, .html
Two of these formats (.mp4, .gif) have dependencies outside of this script.
This script will automatically detect the type of encoder (writer) depending on the extension you give unless
you specify otherwise.
.mp4:
>py scripts/contributions/watch_replay.py -s awesome_video.mp4
This will run the script and save the replay in .mp4 format, but require the installation of 3rd party software, ffmpeg.
This runs on ffmpeg, which can be downloaded here: https://www.ffmpeg.org/download.html
As long as you have ffmpeg installed and in your PATH, this format should work.
This script will let you know if it cannot use it.
.gif:
>py scripts/contributions/watch_replay.py -s awesome_video.gif
This will run the script and save the replay in .gif format, but requires the python module Pillow.
If you ask for a .gif file this program will ask if you want to install Pillow.
.html:
>py scripts/contributions/watch_replay.py -s awesome_video.html
This one is pretty cool, it will generate a folder (awesome_video_frames) with every single frame
stored as an image and then generate an html file to view the files in video format.
If you want the individual frames, use this option.
This option also does not have any dependencies (as far as I know).
There are two arguments specific to -s that you can use to specify more with the save option
-------------
-w: Writers
This specifies the writer to use (which results in the output file format).
You can specify more than one, and it will create a save for each of the parameters you provide.
For example (these are the same):
>py scripts/contributions/watch_replay.py -s awesome_video -w pillow
>py scripts/contributions/watch_replay.py -s awesome_video.gif
But if you wanted to also save it as a .mp4:
>py scripts/contributions/watch_replay.py -s awesome_video -w pillow ffmpeg
Valid inputs for this are:
Writer | File Type
ffmpeg | .mp4
pillow | .gif
html | .html
You can still supply an extension name when you do this, but it will be overwritten (you will be notified):
For example:
>py scripts/contributions/watch_replay.py -s awesome_video.mp4 -w pillow ffmpeg
would output (assuming dependencies installed):
>This may take a little while and seem to hang
>
>You used extension .mp4, but .gif is the valid type for pillow. Using .gif:
>Saving file awesome_video.gif
>Done saving file: awesome_video.gif
>
>Saving file awesome_video.mp4
>Done saving file: awesome_video.mp4
-------------
-kt: Keep Trying
This flag changes the behavior of the script if a dependency fails when saving.
By default, if ffmpeg is not accessible the program simply skips it.
However, if you add the -kt flag it will continue to try the other avalible options.
For example, assuming ffmpeg is not installed:
>py scripts/contributions/watch_replay.py -s awesome_video.mp4
output:
>This may take a little while and seem to hang
>
>ffmpeg not installed or in PATH, skipping
But with -kt:
>py scripts/contributions/watch_replay.py -s awesome_video.mp4 -kt
output:
>This may take a little while and seem to hang
>
>ffmpeg not installed or in PATH, skipping
>
>You used extension .mp4, but .gif is the valid type for pillow. Using .gif:
>Saving file awesome_video.gif
>Done saving file: awesome_video.gif
The default priority order for checking next is ffmpeg, then pillow, and html last.
If you want to change the order just:
1. Ctrl-Find in this script: this is the default order of priority for running a save
2. Change the order of the list to be the priority you want
----------------------------------------------------------------------------------------
I cannot stress enough that this program is slow and unoptimized. Expect slowness :).
I have included the same speeds on the normal visualizer in case people have better machines
than me, but it doesn't appear to do much past a point.
A much faster way of viewing replays (other than skipping) is using the fast-fwd option
(holding down Ctrl). This is something I did not program; I believe (100% guessing) that
matplotlib skips frames resulting in the speed-up, which is great :).
If you have suggestions on improving speeds please let me know.
Please forgive the ugliness as this is my first large project with matplotlib - suggestions are loved and appreciated :).
If you have any questions just ask me on the forums - @Isaac
'''
try:
import os
import sys
import time
import json
import glob
import random
import warnings
import argparse
import subprocess
import multiprocessing as mp
except ImportError as e:
sys.stderr.write("WARNING: Module not found, full error:\n\n")
sys.stderr.write(e)
try:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Circle, Wedge, Polygon
from matplotlib.collections import PatchCollection
from matplotlib.widgets import Slider
except ImportError:
usr_in = input('Matplotlib not found.\nWould you like this program to try and install matplotlib? (y/n) ')
if usr_in.lower() == 'y' or usr_in.lower() == 'yes':
subprocess.run(['python', '-m', 'pip', 'install', 'matplotlib'])
try:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Circle, Wedge, Polygon
from matplotlib.collections import PatchCollection
from matplotlib.widgets import Slider
sys.stderr.write('\n\n')
except ImportError as e:
sys.stderr.write('\n\n{}\n\n'.format(str(e)))
sys.stderr.write('Failed: Check to make sure you have all core dependencies installed (usually TkAgg).\n\n')
sys.exit()
global FILTER, ENCRYPTOR, DESTRUCTOR, PING, EMP, SCRAMBLER, MAX_HP, GET_VERTS, SPEED, BLIT
FILTER = 0
ENCRYPTOR = 1
DESTRUCTOR = 2
PING = 3
EMP = 4
SCRAMBLER = 5
MAX_HP = {FILTER:60, ENCRYPTOR:30, DESTRUCTOR:75, PING:15, EMP:5, SCRAMBLER:40}
SPEED = {'1':.25, '2':.5, '3':1, '4':2, '5':4, '6':8} # speed versions, key is user input (number)
# returns a rotated angle (created to make health deplete from vertical angle)
def rotate(angle, theta=90):
return angle + theta
# returns the vertice points around a center point x,y for a ping
def ping_verts(x, y):
p1 = 0
p2 = .2
p3 = .06
verts = [(-p2,p1),(-p3,p3),(p1,p2),(p3,p3),(p2,p1),(p3,-p3),(p1,-p2),(-p3,-p3)]
return [(a+x, b+y) for (a,b) in verts]
# returns the vertice points around a center point x,y for an emp
def emp_verts(x,y):
p1 = 0
p2 = .5
p3 = .12
verts = [(-p3,p1),(-p2,p2),(p1,p3),(p2,p2),(p3,p1),(p2,-p2),(p1,-p3),(-p2,-p2)]
return [(a+x, b+y) for (a,b) in verts]
# returns the vertice points around a center point x,y for a scambler
def scrambler_verts(x,y):
p1 = 0
p2 = .35
p3 = .25
verts = [(-p3,p1),(-p2,p2),(p1,p3),(p2,p2),(p3,p1),(p2,-p2),(p1,-p3),(-p2,-p2)]
return [(a+x, b+y) for (a,b) in verts]
GET_VERTS = {PING:ping_verts, EMP:emp_verts, SCRAMBLER:scrambler_verts}
# handles all the arguments
def parse_args():
ap = argparse.ArgumentParser(add_help=False, formatter_class=argparse.RawTextHelpFormatter)
ap.add_argument('-h', '--help', action='help', help='show this help message and exit\n\n')
ap.add_argument(
'-f', '--file',
nargs='*',
default=[],
help="specify a replay file you'd like to watch\n\n")
ap.add_argument(
'-b', '--blit',
action='store_true',
help="will tell the program to use blit - will improve performance, but you will not be able to see or use the slider and there will be minor text glitches when fast forwarding, etc (you can still use all the keyboard commands)\n\n")
ap.add_argument(
'-run', '--run_match',
nargs='+',
default=['empty'],
help="specify whether to watch a replay in real-time (runs a match) - you supply algo names as arguments just like when normally running a match\n\n")
ap.add_argument(
'-s', '--save',
default='',
help="specify whether to save replay as an .mp4 file - you must supply a file name (it may take a little while)\n\n")
ap.add_argument(
'-w', '--writers',
nargs='+',
default=['empty'],
help="specify the type of writer(s) to use (video save format) - flag only works if you are saving a replay\n\nValid Options:\n\t- ffmpeg\n\t- pillow\n\t- html\n\n")
ap.add_argument(
'-kt', '--keep_trying',
action='store_true',
help="forces the save file to keep trying different writers until one works - flag only works if you are saving a replay\n\n")
return vars(ap.parse_args())
# stores all information for a single unit on the graph
class Unit:
def __init__(self, t, x, y, hp, p, ID, count, ax):
self.x = x # x position on the graph
self.y = y # y position on the graph
self.unit_type = t # the type of unit (a constant declare in global)
self.stability = hp # stability of unit
self.p_index = p # player index
self.ID = ID # unique id for unit (from engine.java)
self.owner = True if p == 1 else False # boolean to keep track of owner
self.polygons = [] # all polygons that make up that unit
self.patches = [] # these are what get drawn to the animation (contain polygons themselves)
self.color = {True:'C0', False:'r'} # constants for player color
self.create_patches(count, ax) # adds patches to the graph based on type
self.set_color() # sets the color and alpha based on type and health
# equality is determined by ID
def __eq__(self, other):
if type(other) == str: return self.ID == other
return self.ID == other.ID
def __repr__(self):
return "'{}'".format(self.ID)
# creates and adds all units to the graph
def create_patches(self, count, ax):
if self.unit_type == FILTER:
wedge = Wedge((self.x,self.y), .2, rotate(0), rotate(360), width=0.07)
self.polygons.append(wedge)
self.patches.append(ax.add_patch(wedge))
self.set_wedge_shape()
elif self.unit_type == ENCRYPTOR:
inner_wedge = Wedge((self.x,self.y), .12, rotate(0), rotate(360), width=0.03)
outer_wedge = Wedge((self.x,self.y), .37, rotate(0), rotate(360), width=0.15)
self.polygons.append(inner_wedge)
self.polygons.append(outer_wedge)
self.patches.append(ax.add_patch(inner_wedge))
self.patches.append(ax.add_patch(outer_wedge))
self.set_wedge_shape()
elif self.unit_type == DESTRUCTOR:
inner_wedge = Wedge((self.x,self.y), .2, rotate(0), rotate(360), width=0.07)
outer_wedge = Wedge((self.x,self.y), .45, rotate(0), rotate(360), width=0.01)
self.polygons.append(inner_wedge)
self.polygons.append(outer_wedge)
self.patches.append(ax.add_patch(inner_wedge))
self.patches.append(ax.add_patch(outer_wedge))
self.set_wedge_shape()
elif self.unit_type == PING or \
self.unit_type == EMP or \
self.unit_type == SCRAMBLER:
verts = GET_VERTS[self.unit_type](self.x, self.y)
polygon = Polygon(verts, True)
self.polygons.append(polygon)
self.patches.append(ax.add_patch(polygon))
self.check_stability(count, ax)
# returns a radius based upon the stability and max stability of a mobile unit
def hp_to_r(self, stability):
return min((stability - MAX_HP[self.unit_type])/50, .5)
# returns an angle based upon the stability and max stability of a static unit
def hp_to_angle(self, stability):
return int(rotate(360 * (stability / MAX_HP[self.unit_type])))
# sets the wedge angle based upon a static unit's stability
def set_wedge_shape(self):
angle = self.hp_to_angle(self.stability)
for patch in self.patches:
patch.set_theta2(angle)
# sets the postition of a unit on the graph
def set_pos(self, x, y):
self.x = x
self.y = y
for patch in self.patches:
if type(patch) == Polygon: patch.set_xy([(x, y) for (x,y) in GET_VERTS[self.unit_type](x, y)])
elif type(patch) == Circle: patch.center = x, y
# sets the color of a unit based on type
def set_color(self):
for patch in self.patches:
patch.set_color(self.color[self.owner])
patch.set_antialiased(True)
if self.unit_type == EMP or \
self.unit_type == SCRAMBLER:
patch.set_fill(False)
if self.unit_type == ENCRYPTOR:
self.patches[1].set_alpha(0.3)
if self.stability > MAX_HP[self.unit_type]:
self.patches[1].set_fill(False)
self.patches[1].set_alpha(0.5)
# checks the stability and if greater than max for a mobile unit adds a new Circle
def check_stability(self, count, ax):
if self.stability > MAX_HP[self.unit_type]:
if len(self.patches) == 1:
extra_wedge = Circle((self.x,self.y), self.hp_to_r(self.stability), linewidth=4)
self.patches.append(ax.add_patch(extra_wedge))
self.polygons.append(extra_wedge)
else:
self.patches[1].set_radius(self.hp_to_r(self.stability))
if count != 1:
self.patches[1].set_visible(False)
else:
if len(self.patches) > 1:
self.polygons[1].set_visible(False)
# updates all the values of a unit (count is necessary otherwise alpha is messed up)
def update(self, x, y, stability, p_index, ID, count, ax):
self.stability = stability
if self.unit_type == FILTER or \
self.unit_type == ENCRYPTOR or \
self.unit_type == DESTRUCTOR:
self.set_wedge_shape()
elif self.unit_type == PING or \
self.unit_type == EMP or \
self.unit_type == SCRAMBLER:
self.set_pos(x,y)
self.check_stability(count, ax)
self.set_color()
# removes the unit from the graph
def remove(self):
for polygon in self.polygons:
polygon.remove()
# holds all units (patches) and handles their creation/destruction on the board
class PatchWrapper:
def __init__(self):
self.units = {} # stores every unit currently on the board with each ID as the key
self.loc = {} # stores the number of units at a location with each location tuple (x,y) as the key
self.lbls = [] # stores the text labels if a location has more than 1 unit
# creates a unit and stores it in self.units
def create_unit(self, unit_type, pos, stability, p_index, ID, count, ax):
x,y = pos
self.units[ID] = Unit(unit_type, x, y, stability, p_index, ID, count, ax)
# removes a unit by ID from both the board and self.units
def remove_unit(self, ID):
for key, val in self.units.items():
if val == ID:
self.units[ID].remove()
self.units.pop(ID, None)
break
# clears the entire board - not used anymore (very inefficient mode of updating)
def clear_board(self):
self.loc = {}
for ID in self.units:
self.units[ID].remove()
self.units = {}
self.remove_lbls()
# removes all number labels from the board and from self.lbls
def remove_lbls(self):
for lbl in self.lbls:
lbl.remove()
self.lbls = []
# removes all previous labels and creates new ones at locations with more than one unit, then resets self.loc
def update_lbls(self, ax):
self.remove_lbls()
for pos, val in self.loc.items():
if val > 1:
self.plot_text(val, pos, ax)
self.loc = {}
# updates all units
def update_units(self, units, ax):
ids = [u[4] for u in units]
# get all units that are no longer on the board
# (you cannot use the engines remove since using the slider does not remove them)
remove = []
for unit in self.units.values():
if unit not in ids:
remove.append(unit.ID)
# remove all the units (cannot do it from the above loop - RuntimeError)
for ID in remove:
self.remove_unit(ID)
# loop through all the units given by the engine
for unit_raw in units:
unit_type, (x, y), stability, p_index, ID = unit_raw
# update the board locations count of units
try:
self.loc[(x,y)] += 1
except KeyError:
self.loc[(x,y)] = 1
# if a unit already exists, update it. If not, create it
try:
self.units[ID].update(x, y, stability, p_index, ID, self.loc[(x,y)], ax)
except KeyError:
self.create_unit(unit_type, (x,y), stability, p_index, ID, self.loc[(x,y)], ax)
# adds the count lable to a position on the board
def plot_text(self, txt, pos, ax):
x,y = pos
self.lbls.append(ax.text(x+.4, y-.4, str(txt), fontsize=10))
# return all the patches that need to be updated every animation
def values(self):
return [patch for unit in self.units.values() for patch in unit.patches]
# this class is for the right side (information side) except for the plot (see Plot class)
class Info:
def __init__(self, endStats, ax, slider_exists=False):
self.lbls = [] # holds every text object that needs to be updated
self.color = {True:'C0', False:'r', 1:'C0', 2:'r'} # color reference based on player index
self.ax = ax # reference to the right plt axes
# if in runtime mode, endStats don't exist, so we don't know a winner yet
if endStats != None:
self.winner = endStats['winner']
self.winner_name = endStats['player1']['name'] if self.winner == 1 else endStats['player2']['name']
# posisition references to make moving around easier
y_init = .95
offset = .05
list_pos = [y_init - ((x+1)*offset) for x in range(4)]
self.y_pos = { 'health' : list_pos[0],
'cores' : list_pos[1],
'bits' : list_pos[2],
'time' : list_pos[3],
1 : y_init,
2 : y_init
}
self.x_pos = { 1 : .06,
2 : .506,
}
self.hide_graph() # remove everything from the ax
self.disp_reference(slider_exists) # display the keyboard reference
self.disp_static(endStats) # display text that won't change
# adds a data value to the information page (health, cores, etc)
def add_data(self, d_dype, p_index, data, fontsize=14):
self.lbls.append(self.ax.text(self.x_pos[p_index]+.15, self.y_pos[d_dype], str(data), fontsize=fontsize, verticalalignment='bottom', horizontalalignment='left'))
# clear all dynamic text
def clear_info(self):
for lbl in self.lbls:
lbl.remove()
self.lbls = []
# display all text that won't change
def disp_static(self, endStats):
# if endStats don't exist use default values, otherwise use endStates for names
if endStats != None:
self.ax.text(self.x_pos[1], self.y_pos[1], endStats['player1']['name'], fontsize=18, verticalalignment='bottom', horizontalalignment='left', color=self.color[1])
self.ax.text(self.x_pos[2], self.y_pos[2], endStats['player2']['name'], fontsize=18, verticalalignment='bottom', horizontalalignment='left', color=self.color[2])
else:
self.ax.text(self.x_pos[1], self.y_pos[1], 'player1', fontsize=18, verticalalignment='bottom', horizontalalignment='left', color=self.color[1])
self.ax.text(self.x_pos[2], self.y_pos[2], 'player2', fontsize=18, verticalalignment='bottom', horizontalalignment='left', color=self.color[2])
# add all data labels (health, bits, cores, etc)
for lbl, pos in self.y_pos.items():
if type(lbl) == str:
self.ax.text(self.x_pos[1], self.y_pos[lbl], '{}:'.format(lbl), fontsize=14, verticalalignment='bottom', horizontalalignment='left')
self.ax.text(self.x_pos[2], self.y_pos[lbl], '{}:'.format(lbl), fontsize=14, verticalalignment='bottom', horizontalalignment='left')
# removes the axis from the ax
def hide_graph(self):
self.ax.axis('off')
# display the keyboard reference
def disp_reference(self, slider_exists):
self.ax.text(.5, .215, 'Keyboard Commands:', fontsize=13, verticalalignment='bottom', horizontalalignment='center')
self.ax.text(0.2, .075, '\nSpace/Enter:\nArrow Keys:\nCtrl+Arrow Keys:\n<,> and numbers 1-6:\nHold Ctrl', fontsize=12, verticalalignment='bottom', horizontalalignment='left')
self.ax.text(.8, .075, 'Pause and Play\nNext/Previous Frame\nNext/Previous Turn\nChange Speed\nFast-Fwd', fontsize=12, verticalalignment='bottom', horizontalalignment='right')
# if the slider doesn't exist, don't show the prompt for it
if not BLIT and slider_exists:
self.ax.text(.5, .03, 'You can also scrub the turn slider with your mouse', fontsize=12, verticalalignment='bottom', horizontalalignment='center')
# remove previous information and add all new data
def update(self, p1Stats, p2Stats):
self.clear_info()
self.add_data('health', 1, int(p1Stats[0]))
self.add_data('cores', 1, p1Stats[1])
self.add_data('bits', 1, p1Stats[2])
self.add_data('time', 1, p1Stats[3])
self.add_data('health', 2, int(p2Stats[0]))
self.add_data('cores', 2, p2Stats[1])
self.add_data('bits', 2, p2Stats[2])
self.add_data('time', 2, p2Stats[3])
# if the end of game is reached, show the winner
def show_winner(self):
try:
self.lbls.append(self.ax.text(.5, .67, '{} wins!'.format(self.winner_name), color=self.color[self.winner], verticalalignment='bottom', horizontalalignment='center', fontsize=24))
except TypeError:
print ('tried and failed to show winner - no endStats')
# this contains all data for the health plot on the right side
class Plot:
def __init__(self, data, ax, frame=0):
ax.clear() # clear the plot
self.ax = ax # reference to the ax containing the plot
self.data = data # all known health data, tuple containing two lists, player1 and player2 healths
self.lines = [] # the lines of the plot
self.ax.set_ylabel('Health') # set the y_axis label
self.ax.set_xlabel('Frame') # set the x_axis label
line, = self.ax.plot(self.data[0][:0], color='C0') # create and get a line reference from the plot
self.lines.append(line)
line, = self.ax.plot(self.data[1][:0], color='r') # create and get a line reference from the plot
self.lines.append(line)
self.ax.set_yticks(list(range(0, 36, 5))) # set the y_labels and scale
self.lines[0].set_xdata(list(range(0, 100))) # set the x_range for the line to always be 100
self.lines[1].set_xdata(list(range(0, 100))) # set the x_range for the line to always be 100
self.ax.set_xlim(0, 100) # set the x_range for the plot to always be 100
self.update(frame) # update the plot
# sets the lines for the plot based upone the current frame
def update(self, frame, data=None):
# handles initial values
frame = int(frame) if frame > 0 else 0
frames = frame if frame > 100 else 100
x_0 = frame - 100 if frame > 100 else 0
nulls = [None for x in range(100 - frame)] # extra values for beginning, size must match
# if we specify data, use that, otherwise use the cached values.
# this is used if running in real-time and not all data is known
if data == None:
line1 = nulls + self.data[0][x_0:frame]
line2 = nulls + self.data[1][x_0:frame]
else:
self.data = data
line1 = nulls + self.data[0][x_0:frame]
line2 = nulls + self.data[1][x_0:frame]
self.ax.set_xticklabels(list(range(x_0, frames+1, 20))) # updates the x_labels
self.lines[0].set_ydata(line1) # set the data for line1
self.lines[1].set_ydata(line2) # set the data for line2
# this class contains all information regarding the entire window
class Graph:
def __init__(self, data, frames_in_turn, healths, writers, keep_trying, save='', fh=None):
# pretty clear, if no data, raise an Error
if len(data) < 1:
raise RuntimeError('no data')
self.fh = fh # reference to file handler
self.real_time = False if self.fh == None else True # tracks whether real-time
plt.style.use('dark_background') # sets black background
plt.rcParams["figure.figsize"] = (16,8) # resizes the window
self.fig, ax = plt.subplots(nrows=1, ncols=2) # splits the plot into two halves and gets the references
self.board_ax, self.info_ax = ax # assign left and right side references
self.plot_ax = self.fig.add_subplot(324) # add the plot and assing it's reference
self.general_init(data, frames_in_turn, healths) # handles general initialization (called again if in real-time)
self.head = (0,-1) # tracks the current turn, frame pair
self.end_of_game = False # end of game flag
self.is_manual = False # stores whether the user is manually moving the slider, keyboard, etc
self.single_advance = False # true when user is scrubbing, but still want to move forward one frame
self.stop_slider_evt = False # stop the slider event from triggereing when the code changes it
self.patches = PatchWrapper() # creates the PatchWrapper object
self.stream = self.data_stream() # gets a data_reference - this passes all data to the animation
self.setup_board() # initialize static parts of the board
self.fig.canvas.mpl_connect('key_press_event', self.keyboard_input) # connect keyboard events to the keyboard_input function
# if in real-time, use a generator function to update number of frames, otherwise frames is static
if not self.real_time:
self.anim = animation.FuncAnimation(self.fig, self.update, init_func=self.init, frames=self.num_frames, interval=100, blit=BLIT, repeat=False)
else:
self.frame_generator = self.gen_frames()
self.anim = animation.FuncAnimation(self.fig, self.update, init_func=self.init, frames=self.frame_generator, interval=100, blit=BLIT, repeat=False)
self.change_play_speed('3') # initialize the playback speed ('3' is default)
# if you don't save, show the plot. Otherwise save the animation (nothing is shown)
if save == '':
self.show()
else:
self.save_animation(save, writers, keep_trying)
# saves all animations passed from the command line
def save_animation(self, save_name, writers, keep_trying):
print ('This may take a little while and seem to hang')
# reference dictionaries to converte values based on input
ex_to_writer = {'gif':'pillow', 'mp4':'ffmpeg', 'html':'html'}
check_writer = {'ffmpeg':self.check_ffmpeg, 'pillow':self.check_pillow, 'html':lambda:True}
# seperate the file name from the extension
try: name, given_ext = save_name.split('.')
except ValueError: name, given_ext = save_name, ''
default = ['ffmpeg', 'pillow', 'html'] # this is the default order of priority for running a save
# when this block finishes, attempts is a list of possible attempts in order of priority
attempts = [ex_to_writer[given_ext]] if given_ext != '' and given_ext in ex_to_writer.keys() else default
attempts = attempts if 'empty' in writers else writers
attempts = attempts + [w for w in default if w not in attempts] if keep_trying else attempts
# loop through and save all attempts until input is complete
complete = 0
for i, writer in enumerate(attempts):
try:
# if we can create that type of writer, make it
print ()
if check_writer[writer]():
self.create_animation(writer, name, given_ext)
complete += 1
# check and make sure we haven't finished (based on whether we keep trying or not)
if len(writers) <= complete: keep_trying = False
else: keep_trying = True
if not keep_trying: break
except KeyError:
print ('{} is not a valid writer. Options are:\n\t- ffmpeg (for .mp4 videos)\n\t- pillow (for gifs)\n\t- html (for browser view and individual frames)'.format(writer))
# creates an animation of a writer type
def create_animation(self, writer, name, given_ext):
extension = self.get_extension(writer, given_ext)
print ('Saving file {}.{}'.format(name, extension))
try:
self.anim.save('{}.{}'.format(name, extension), writer=writer)
print ('Done saving file: {}.{}'.format(name, extension))
except Exception as e:
print ('Unknown error. Full Output:')
print (str(e))
# returns the extension that must be used for the appropriate writer
def get_extension(self, writer, given_ext):
extensions = {'pillow':'gif', 'ffmpeg':'mp4', 'html':'html'}
if given_ext != extensions[writer] and given_ext != '':
print ('You used extension .{0}, but .{2} is the valid type for {1}. Using .{2}:'.format(given_ext, writer, extensions[writer]))
return extensions[writer]
# confirms whether ffmpeg is avaliable as a writer
def check_ffmpeg(self):
ffmpeg_process = subprocess.Popen('ffmpeg', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_,err = ffmpeg_process.communicate()
err = str(err).replace("b'",'').replace('b"','').replace('"','').replace("'",'').replace('\\n','').replace('\\r','').replace('\\','').replace('"','')
# covers both Ubuntu and Windows users (fingers-crossed)
# if anyone knows a better way to check for ffmpeg let me know, or add it here :)
if err == 'ffmpeg is not recognized as an internal or external command,operable program or batch file.':
print ('ffmpeg not installed or in PATH, skipping')
return False
elif err.find('Command ffmpeg not found') != -1:
print ('ffmpeg not installed or in PATH, skipping')
return False
return True
# confirms whether pillow is avaliable as a writer and offers to import it if not
def check_pillow(self):
try:
import PIL
return True
except ImportError:
usr_in = input('Pillow not found.\nWould you like this program to try and install Pillow? (y/n) ')
if usr_in.lower() == 'y' or usr_in.lower() == 'yes':
subprocess.run(['python', '-m', 'pip', 'install', 'Pillow'])
try:
import PIL
print ()
return True
except ImportError as e:
sys.stderr.write('\n\n{}\n\n'.format(str(e)))
sys.stderr.write('Failed to import Pillow.\n\n')
return False
return False
# extension of __init__(), called every frame when real-time
def general_init(self, data, frames_in_turn, healths):
self.data = data # dict with keys of (turn, frame) tuple and values of a Frame object
self.frames_in_turn = frames_in_turn # dict with keys of turn and values of number of frames in that turn
self.healths = healths # all known health data, tuple containing two lists, player1 and player2 healths
self.num_frames = len(self.data) # the number of total frames
self.slider_exists = False # begin by assuming the slider does not exist
self.plot = Plot(self.healths, self.plot_ax) # create the Plot object (plots the health)
# try and get endStats, if not then file is still being created by engine (game is still running)
try:
last_frame = max(self.data, key=lambda f: (f[0], f[1])) # the last frame of the entire match (single number)
endStats = self.data[last_frame].data['endStats'] # here is where the error would be thrown - if endStats exists
# From here on we know we have all data for entire game - endStats exists
# if blit is on, do not create the slider
if not BLIT:
self.slider = Slider(self.fig.add_axes([0.6, 0.03, 0.3, 0.03]), 'Turn Slider', 0, self.num_frames, valfmt='%1i', valstep=1, color='w')
self.slider.on_changed(self.slider_active)
self.slider_exists = True # tracks whether the slider exists
self.info = Info(endStats, self.info_ax, True) # create the Info (right side) with endStates information
self.real_time = False # not longer running in real-time
except KeyError as e:
self.info = Info(None, self.info_ax) # endStats doesn't exist, create Info with default values
# change the interval speed between frames
def change_play_speed(self, speed):
self.speed = speed
self.anim.event_source.interval = 100./SPEED[self.speed]
# setup all initial static values for the board (axis and board points)
def setup_board(self):
self.board_ax.set_xticks(range(-1, 29))
self.board_ax.set_yticks(range(-1, 29))
self.board_ax.tick_params(axis=u'both', which=u'both',length=0)
self.board_ax.set_xticklabels(['']+list(range(28)))
self.board_ax.set_yticklabels(['']+list(range(28)))
[spine.set_visible(False) for n, spine in self.board_ax.spines.items()]
self.board_ax.set_title('Local Match Visualizer')
self.plot_references()
# matplotlib throws a warning for tight_layout because it doesn't work for
# specific situations - we can ignore this warning
with warnings.catch_warnings():
warnings.simplefilter("ignore")
plt.tight_layout()
# handles all keyboard input - pretty self explanatory
# this also handles changing the head based on the keyboard input
# for example, 'right' means advance the head by 1 (hence self.single_advance = True)
def keyboard_input(self, evt):
if evt.key == ' ' or evt.key == 'enter':
self.is_manual = False if self.is_manual else True
elif evt.key == 'ctrl+right':
self.is_manual = True
if self.head[0]+1 in self.frames_in_turn:
self.head = self.head[0]+1, 0
else:
self.head = self.head[0], self.frames_in_turn[self.head[0]]-3
elif evt.key == 'ctrl+left':
self.is_manual = True
if self.head[1] == 0:
if self.head[0] != 0:
self.head = self.head[0]-1, 0
else:
self.head = self.head[0], 0
elif evt.key == 'right':
self.is_manual = True
self.single_advance = True
elif evt.key == 'left':
self.is_manual = True
self.single_advance = True
self.backwards()
elif evt.key in ['1','2','3','4','5','6']:
self.change_play_speed(evt.key)
elif evt.key == '<' or evt.key == ',':
speed = str(int(self.speed)-1) if int(self.speed) > 1 else self.speed
self.change_play_speed(speed)
elif evt.key == '>' or evt.key == '.':
speed = str(int(self.speed)+1) if int(self.speed) < 6 else self.speed
self.change_play_speed(speed)
# only update the slider if it exists
if not BLIT and self.slider_exists:
self.update_slider(self.head)
self.update() # something was changed, so update the board
# converts a global frame value to a turn, frame pair
def val_to_frame_turn(self, val):
turn = 0
frame = 0
for i in self.frames_in_turn.values():
if val < i: break
val -= i
turn += 1
frame = val - 1
try:
self.data[(int(turn), int(frame))]
except KeyError:
if frame > 0:
frame -= 1
return (int(turn), int(frame))
# converts a turn, frame pair into a global frame value
def frame_turn_to_val(self, turns, frames):
val = 0
for i in self.frames_in_turn.values():
if turns < 1: break
val += i
turns -= 1
val += frames
return int(val)
# this is triggered every time the slider value changes for any reason
def slider_active(self, val):
if self.stop_slider_evt: # don't update again if value was changed by code and not user
return
self.head = self.val_to_frame_turn(val) # update the head based on the slider value
self.is_manual = True # user changed something
self.update(val) # update the board
# this sets the value of the slider every time the board updates, but suppresses the event (user did not select slider)
def update_slider(self, pair):
turns, frames = pair
self.stop_slider_evt = True
if self.end_of_game: self.slider.set_val(int(self.frame_turn_to_val(turns, frames))+2)
else: self.slider.set_val(int(self.frame_turn_to_val(turns, frames)))
self.stop_slider_evt = False
# move backwards a single frame
def backwards(self):
# don't update - the user has paused the game
if self.is_manual and not self.single_advance:
return
val = self.frame_turn_to_val(self.head[0], self.head[1])
self.head = self.val_to_frame_turn(val)
if self.head[0] < 0: self.head = 0, self.head[1]
if self.head[1] < 0: self.head = self.head[0], -1
# only update the slider if it exits
if not BLIT and self.slider_exists:
self.update_slider(self.head)
self.single_advance = False # reset the single advance (used with arrow keys)
# this is the main function that moves the head forward
def advance(self):
# don't update - the user has paused the game
if self.is_manual and not self.single_advance:
return
# while you can, increment the frame by 1
try:
self.data[self.head[0],self.head[1]+1]
self.head = self.head[0], self.head[1]+1
except KeyError:
# outside of frames for that turn, try incrementing turn by 1
try:
self.data[self.head[0]+1,-1]
self.head = self.head[0]+1, -1
except KeyError as e:
# outside both turns and frames - must be the end of game
self.end_of_game = True
# only update the slider if it exits
if not BLIT and self.slider_exists:
self.update_slider(self.head)
self.single_advance = False # reset the single advance (used with arrow keys)
# this is the main function that supplies data to the matplotlib animation object
def data_stream(self):
while True:
# everything in this block (real-time) is extrememly inefficient, becuase it has to constantly load new data
if self.real_time:
self.info_ax.clear() # clear the inforation side
self.fh.load_files(1,False,args['file']) # load latest replay (again)
replay = self.fh.get_last_replay() # get latest replay (again)
# user paused game, don't advance
if not self.is_manual:
self.advance()
self.general_init(replay.frames, replay.frames_in_turn, replay.healths) # call general initialization
# this is for the first call - cannot send before yield is reached (function called)
try:
self.frame_generator.send(self.num_frames) # send the inverval generator the number of frames loaded
except TypeError:
pass
# get the data
p1Units = self.data[self.head]['p1Units']
p2Units = self.data[self.head]['p2Units']
p1Stats = self.data[self.head]['p1Stats']
p2Stats = self.data[self.head]['p2Stats']
units = self.cache_units(p1Units, 1) + self.cache_units(p2Units, 2) # format the unit data into how it is passed to my functions
self.patches.update_units(units, self.board_ax) # update all the units
self.patches.update_lbls(self.board_ax) # update all the unit count labels
self.info.update(p1Stats, p2Stats) # update the information board
self.plot.update(self.frame_turn_to_val(self.head[0], self.head[1])) # update the health plot
self.advance() # move the head forward 1
self.check_end_of_game() # if end of game, display winner
yield self.patches.values() + self.patches.lbls + self.info.lbls + self.plot.lines # send all dynamic data to the matplotlib animator
# called by the animator everytime it's interval finishes
def update(self, i=0):
# self.patches.clear_board() # inefficient, no longer used
return next(self.stream) # sends the data to the animator
# if blit is used, animator requires an init function to get intial graph values
def init(self):
return next(self.stream)
# generator function to give the animator the number of total frames when running real-time
def gen_frames(self):
while True:
num = yield
yield num
# format all of the raw unit data into how my functions recieve it
def cache_units(self, units, p_index):
filters, encryptors, destructors, pings, emps, scramblers, removes = units
units_new = []
for unit in filters: units_new.append((FILTER, (unit[0], unit[1]), unit[2], p_index, unit[3]))
for unit in encryptors: units_new.append((ENCRYPTOR, (unit[0], unit[1]), unit[2], p_index, unit[3]))
for unit in destructors: units_new.append((DESTRUCTOR, (unit[0], unit[1]), unit[2], p_index, unit[3]))
for unit in pings: units_new.append((PING, (unit[0], unit[1]), unit[2], p_index, unit[3]))
for unit in emps: units_new.append((EMP, (unit[0], unit[1]), unit[2], p_index, unit[3]))
for unit in scramblers: units_new.append((SCRAMBLER, (unit[0], unit[1]), unit[2], p_index, unit[3]))
return units_new
# checks if reached the final frame - if so, display winner
def check_end_of_game(self):
self.end_of_game = False
try:
self.data[self.head[0]+1,-1]
except KeyError as e: # outside of turn limit
try:
self.data[self.head[0],self.head[1]+1]
except KeyError as e: # outside of frame limit
self.end_of_game = True # must be end of game
if self.end_of_game: self.info.show_winner() # show the winner if it is the end of game
# show the matplotlib window
def show(self):
plt.show()
# plots all the little green dots for position references
def plot_references(self):
refs = []
for pos in self.__empty_grid():
refs.append(Circle(pos, .02))
colors = [100*random.random() for x in range(len(refs))]
p = PatchCollection(refs, color='lightgreen')
self.board_ax.add_collection(p)
# should look familiar... :)
def in_arena_bounds(self, location):
"""Checks if the given location is inside the diamond shaped game board.
Args:
* location: A map location
Returns:
True if the location is on the board, False otherwise
"""
x, y = location
full_board = 28
half_board = 14
row_size = y + 1
startx = half_board - row_size
endx = startx + (2 * row_size) - 1
top_half_check = (y < half_board and x >= startx and x <= endx)
row_size = (full_board - 1 - y) + 1
startx = half_board - row_size
endx = startx + (2 * row_size) - 1
bottom_half_check = (y >= half_board and x >= startx and x <= endx)
return bottom_half_check or top_half_check
# should look familiar... :)
def __empty_grid(self):
grid = []
for x in range(28):
for y in range(28):
if self.in_arena_bounds((x,y)):
grid.append((x,y))
return grid
# a simple data storage class to hold the data for a single frame
class Frame:
def __init__(self, t, f, data):
self.turn = t # the turn for this frame
self.frame = f # the local frame for this frame
self.data = data # the data for this frame
def __repr__(self):
return ('({}, {})'.format(self.turn, self.frame))
def __getitem__(self, key):
return self.data[key]
# Stores data from a single replay
class Replay:
def __init__(self, f_name):
self.fname = f_name # the file name of the replay
self.ref = None # stores the raw dict data as a reference
self.frames = {} # dict containing all data, keys are turn, frame tuple with Frame objects as values
self.frames_in_turn = {} # number of frames in each turn
self.healths = ([], []) # contains the healths for player1 and player2
self.load_data() # handles loading all the data from file into python variables
def __eq__(self, other):
return self.fname == other.fname
def __string(self):
return self.fname
def __str__(self):
return self.__string()
def __repr__(self):
return self.__string()
# loads all data from a replay into the python variables
def load_data(self):
with open(self.fname) as f:
i = 0
for line in f:
line = line.replace("\n", "")
line = line.replace("\t", "")
if (line != ''):
data = json.loads(line)
try:
data['debug']
self.ref = data
except:
turn_num = data['turnInfo'][1]
frame_num = data['turnInfo'][2]
self.frames[(turn_num, frame_num)] = Frame(turn_num, frame_num, data)
self.healths[0].append(data['p1Stats'][0])
self.healths[1].append(data['p2Stats'][0])
try:
self.frames_in_turn[turn_num] += 1
except KeyError:
self.frames_in_turn[turn_num] = 1
# handles opening multiple games (replays)
class FileHandler:
def __init__(self):
self.replays = [] # all of the replays loaded
def get_replays(self):
return self.replays
def get_last_replay(self):
return self.replays[0] if len(self.replays) > 0 else None
def get_replay(self, i=0):
if i >= len(self.replays):
sys.stderr.write("Invalid replay")
return None
return self.replays[i]
def __latest_replays(self, num=1, a=False):
replay_dir = '{}/../../replays/'.format(os.path.dirname(os.path.realpath(__file__)))
files = glob.glob('{}*.replay'.format(replay_dir))
files = sorted(files, key=os.path.getctime, reverse=True)
if a:
return files
return files[:num]
def load_files(self, num=1, a=False, f_names=[]):
self.replays = []
if len(f_names) > 0:
for f_name in f_names:
if f_name.find('replays') == -1:
self.replays.append(Replay('replays/'+f_name))
else:
self.replays.append(Replay(f_name))
else:
for f_name in self.__latest_replays(num, a):
self.replays.append(Replay(f_name))
# This is all almost directly copied from run_match.py
# Runs a single game
def run_single_game(process_command):
print("Start run a match")
p = subprocess.Popen(
process_command,
shell=True,
stdout=sys.stdout,
stderr=sys.stderr
)
# daemon necessary so game shuts down if this script is shut down by user
p.daemon = 1
p.wait()
print("Finished running match")
def run_match(a1='', a2=''):
# Get location of this run file
file_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.abspath('{}/..'.format(os.path.join(file_dir, os.pardir)))
# Get if running in windows OS
is_windows = sys.platform.startswith('win')
print("Is windows: {}".format(is_windows))
# Set default path for algos if script is run with no params
default_algo = parent_dir + "/algos/starter-algo-ZIPME/run.ps1" if is_windows else parent_dir + "/algos/starter-algo-ZIPME/run.sh"
algo1 = default_algo
algo2 = default_algo
if a1 != '':
algo1 = a1
if a2 != '':
algo2 = a2
# If folder path is given instead of run file path, add the run file to the path based on OS
# trailing_char deals with if there is a trailing \ or / or not after the directory name
if is_windows:
if "run.ps1" not in algo1:
trailing_char = "" if algo1.endswith("\\") else "/"
algo1 = algo1 + trailing_char + "run.ps1"
if "run.ps1" not in algo2:
trailing_char = "" if algo2.endswith("\\") else "/"
algo2 = algo2 + trailing_char + "run.ps1"
else:
if "run.sh" not in algo1:
trailing_char = "" if algo1.endswith('/') else "/"
algo1 = algo1 + trailing_char + "run.sh"
if "run.sh" not in algo2:
trailing_char = "" if algo2.endswith('/') else "/"
algo2 = algo2 + trailing_char + "run.sh"
print("Algo 1: ", algo1)
print("Algo 2:", algo2)
match = mp.Process(target=run_single_game, args=("cd {} && java -jar engine.jar work {} {}".format(parent_dir, algo1, algo2),))
match.start()
def main(args):
global BLIT
BLIT = args['blit'] # get whether blit is enabled
save = args['save'] # get whether save is enabled
writers = args['writers'] # get save modes
keep_trying = args['keep_trying'] # get whether to keep trying writer types
if args['run_match'][0] != 'empty':
# inside here we are now running a match and displaying real-time data
# warn the user about run-time and saving
if save != '':
print ('\n\nWARNING: You specified a save file, but nothing will be saved since this is running real time. Wait for the match to end.')
elif 'empty' not in writers:
print ('\n\nWARNING: You specified a writer mode, but nothing will be saved since this is running real time. Wait for the match to end.')
elif keep_trying:
print ('\n\nWARNING: You specified keep trying writers, but nothing will be saved since this is running real time. Wait for the match to end.')
fh = FileHandler() # create a file handler object
fh.load_files(1,False,args['file']) # load latest replay
previous_replay = str(fh.get_last_replay()) # get the replay that was last created
if len(args['run_match']) > 1: run_match(args['run_match'][0], args['run_match'][1]) # run the match with both algos specified
else: run_match(args['run_match'][0]) # run the match with one algo specified
# wait to open visualizer until a new replay has been created
while str(fh.get_last_replay()) == previous_replay:
fh.load_files()
time.sleep(.5)
# keep trying to load the replay file until it is capable of getting data from it - then start the visualizer
while True:
try:
fh.load_files(1,False,args['file'])
replay = fh.get_last_replay()
animatedReplay = Graph(replay.frames, replay.frames_in_turn, replay.healths, writers, keep_trying, fh=fh) # create our Graph object
break
except RuntimeError: # we raised this error when data was nothing in Graph init()
time.sleep(.5)
else:
# here we know the replay file is already created an finished
# warn the user about saving
if save == '' and 'empty' not in writers:
print ('You specified a writer type, but it will not be used since you are not saving the animation')
elif save == '' and keep_trying:
print ('You specified to keep trying writers, but it will not be used since you are not saving the animation')
fh = FileHandler() # create a file handler object
fh.load_files(1,False,args['file']) # load latest replay
replay = fh.get_last_replay() # get latest replay
animatedReplay = Graph(replay.frames, replay.frames_in_turn, replay.healths, writers, keep_trying, save=save) # create our Graph object
if __name__ == '__main__':
args = parse_args() # get command line arguments
main(args) # run program
# print (animation.writers.list())