3.4 Optimization Method Implementation
This section contains the complete details of the optimization method implementation
discussed in Section 2.4.
3.4.1 Example Organization Model
This section contains the code listing of the code interfacing the example
organization model from Section 2.3.3 to the proposed genetic algorithm in Section
2.4.
import NSGAII import OrgModel import numpy import random
from scipy import stats g_number_of_spares_a = 30 g_number_of_spares_b = 30
g_number_of_spares_c = 30 g_max_stock_level_bits = 5
g_reorder_level_bits = 5 g_float_bits_resolution = 6 class ComponentTypes:
A = OrgModel.EquipmentComponentType("A", 200, 1.5, 2, 10000)
B = OrgModel.EquipmentComponentType("B", 400, 1.5, 4, 5000)
C = OrgModel.EquipmentComponentType("C", 600, 3.0, 6, 40000)
class UnitTypes: ARCHER = OrgModel.EquipmentUnitType("ARCHER",
[(1, ComponentTypes.A),
(2, ComponentTypes.B),
(2, ComponentTypes.C)])
class ExampleOrg: def __init__(self, spare_allocations_a,
spare_allocations_b, spare_allocations_c,
location1_policy_dict, location2_policy_dict):
self.spare_allocations_a = spare_allocations_a
self.spare_allocations_b = spare_allocations_b
self.spare_allocations_c = spare_allocations_c
self.location1_policy_dict = location1_policy_dict
self.location2_policy_dict = location2_policy_dict
self.cost_results = [] self.availability_results = []
def run(self): # Simulation setup
evaluationScore = OrgModel.EvaluationScore()
components = OrgModel.EquipmentComponentObjects( evaluationScore)
units = OrgModel.EquipmentUnitObjects( evaluationScore,components)
# Create given spare sets for each location agent
spares = [[],[],[]] for i in self.spare_allocations_a:
spares[i].append(components.new(ComponentTypes.A))
for i in self.spare_allocations_b:
spares[i].append(components.new(ComponentTypes.B))
for i in self.spare_allocations_c:
spares[i].append(components.new(ComponentTypes.C))
# Create location agents upstream_loc = OrgModel.LocationManagerAgent(
"L0", evaluationScore, spares[0])
location1 = OrgModel.LocationManagerAgent( "L1",
evaluationScore, spares[1],
[units.new(UnitTypes.ARCHER), units.new(UnitTypes.ARCHER),
units.new(UnitTypes.ARCHER), units.new(UnitTypes.ARCHER)],
[OrgModel.MaintainerAgent("M1", evaluationScore, 200),
OrgModel.MaintainerAgent("M2", evaluationScore, 200)],
OrgModel.MaintenancePolicyObject(
self.location1_policy_dict), 1.0,10.0)
location2 = OrgModel.LocationManagerAgent( "L2",
evaluationScore, spares[2],
[units.new(UnitTypes.ARCHER), units.new(UnitTypes.ARCHER)],
[OrgModel.MaintainerAgent("M3", evaluationScore, 200)],
OrgModel.MaintenancePolicyObject( self.location2_policy_dict),
1.0,10.0) # Create distribution channels
channel1 = OrgModel.DistributionChannelObject( "DC1",
evaluationScore, upstream_loc, location1,
72.0,1000) channel2 = OrgModel.DistributionChannelObject(
"DC2", evaluationScore, upstream_loc,
location2, 2.0,100) # Run simulation t = 0
while t < 720: dt = 1.0 location1.step(dt)
location2.step(dt) upstream_loc.step(dt)
channel1.step(dt) channel2.step(dt)
t += 1 # Collect results. Zero-values are replaced with
# near-zero values to enable use of harmonic mean
cost = evaluationScore.getCost() if cost < 1:
self.cost_results.append(1) else:
self.cost_results.append(cost)
avail = evaluationScore.getAvailability() if avail < 0.01:
self.availability_results.append(0.01) else:
self.availability_results.append(avail)
class GAAdapter(NSGAII.Individual): def __init__(self, genome, drop_size,
confidence_level, max_iters=500): self.drop_size = drop_size
self.confidence_level = confidence_level self.max_iters = max_iters
super(GAAdapter,self).__init__(genome) def crossover(self, individual):
""" Perform crossover between self and specified individual.
Returns a tuple with the resulting two children, unless
called with self as the argument, then returns None. """
if individual == self: return None
n = random.sample(xrange(len(self.genome)),1)[0] cg1 = self.genome[:n]
cg1.extend(individual.genome[n:]) cg2 = individual.genome[:n]
cg2.extend(self.genome[n:]) # Handle cross-over at a float value
spares_offset = (g_number_of_spares_a +
g_number_of_spares_b +
g_number_of_spares_c) if n > spares_offset:
comp = spares_offset step = g_max_stock_level_bits + \
g_reorder_level_bits + \
g_float_bits_resolution while comp+step < n:
comp += step if (n - comp) >= (g_max_stock_level_bits +
g_reorder_level_bits):
i = n - comp - (g_max_stock_level_bits +
g_reorder_level_bits) + 1
cg1[n+g_float_bits_resolution-i] = \ random.uniform(
self.genome[n+g_float_bits_resolution-i],
individual.genome[n+g_float_bits_resolution-i])
cg2[n+g_float_bits_resolution-i] = \ random.uniform(
self.genome[n+g_float_bits_resolution-i],
individual.genome[n+g_float_bits_resolution-i])
return (GAAdapter(cg1,self.drop_size,
self.confidence_level,self.max_iters),
GAAdapter(cg2,self.drop_size,
self.confidence_level,self.max_iters))
def mutate(self, n): """ Mutate genome at bit ’n’. """
spares_offset = (g_number_of_spares_a +
g_number_of_spares_b +
g_number_of_spares_c)
if n < spares_offset: # Randomly reassign spare
s = [0,1,2] s.remove(self.genome[n])
self.genome[n] = random.sample(s,1)[0] else:
comp = spares_offset step = g_max_stock_level_bits + \
g_reorder_level_bits + \
g_float_bits_resolution while comp+step < n:
comp += step if (n - comp) < (g_max_stock_level_bits +
g_reorder_level_bits): # Mutation of integer
if self.genome[n] == 0: self.genome[n] = 1
else: self.genome[n] = 0
else: # Mutation of float
i = n - comp - (g_max_stock_level_bits +
g_reorder_level_bits) + 1
if random.random() > 0.5:
self.genome[n+g_float_bits_resolution-i] *= \
2**i else:
self.genome[n+g_float_bits_resolution-i] /= \
2**i # Reinterpret genome
self.refresh() def objective_keys(self):
""" Returns a set of lambda function pointers usable as
keys for comparing objectives. """ keys = []
keys.append(lambda obj: obj.availability_mean)
keys.append(lambda obj: -obj.availability_var_upper_bound)
keys.append(lambda obj: -obj.cost_mean)
keys.append(lambda obj: -obj.cost_var_upper_bound) return keys
def refresh(self): """ Interpret genome and construct an organization model
using the interpreted parameter values. """ offset = 0
# Initial location of type A spares spares_loc_a = \
self.genome[offset:offset+g_number_of_spares_a]
offset += g_number_of_spares_a # Initial location of type B spares
spares_loc_b = \ self.genome[offset:offset+g_number_of_spares_b]
offset += g_number_of_spares_b # Initial location of type C spares
spares_loc_c = \ self.genome[offset:offset+g_number_of_spares_c]
offset += g_number_of_spares_c
comp_step = g_max_stock_level_bits + \ g_reorder_level_bits + \
g_float_bits_resolution # Location 1 policy
l1_policy_dict = {} l1_policy_dict[ComponentTypes.A] = \
self.refresh_component_policy(offset)
offset += comp_step l1_policy_dict[ComponentTypes.B] = \
self.refresh_component_policy(offset)
offset += comp_step l1_policy_dict[ComponentTypes.C] = \
self.refresh_component_policy(offset)
offset += comp_step # Location 2 policy
l2_policy_dict = {} l2_policy_dict[ComponentTypes.A] = \
self.refresh_component_policy(offset)
offset += comp_step l2_policy_dict[ComponentTypes.B] = \
self.refresh_component_policy(offset)
offset += comp_step l2_policy_dict[ComponentTypes.C] = \
self.refresh_component_policy(offset) offset += comp_step
# Create the model and run the first drop self.org_model = ExampleOrg(
spares_loc_a, spares_loc_b, spares_loc_c,
l1_policy_dict, l2_policy_dict) self.drop()
def refresh_component_policy(self, offset): # Policy for max stock level
max_stock_level = 0 for i in xrange(g_max_stock_level_bits):
max_stock_level += self.genome[offset] << i
offset += 1 # Policy for reorder level
reorder_level = 0 for i in xrange(g_reorder_level_bits):
reorder_level += self.genome[offset] << i
offset += 1 # Policy for replacement threshold
# The float value is stored in the last "bit"
offset += g_float_bits_resolution threshold = self.genome[offset-1]
return { "Max Stock Level" : max_stock_level,
"Reorder Level" : reorder_level,
"Preventive Hazard Threshold" : threshold }
def drop(self): """ Run the organization model for a the number of
iterations specified in drop_size. """
nsamples = len(self.org_model.cost_results) if nsamples > self.max_iters:
return for i in xrange(self.drop_size):
self.org_model.run() nsamples += 1
# Get the score self.availability_var = numpy.var(
self.org_model.availability_results)
self.availability_var_upper_bound = (
self.availability_var*(nsamples-1) /
stats.chi2.ppf(1-self.confidence_level, nsamples-1))
self.availability_mean = numpy.mean(
self.org_model.availability_results)
self.cost_var = numpy.var( self.org_model.cost_results)
self.cost_var_upper_bound = ( self.cost_var*(nsamples-1) /
stats.chi2.ppf(1-self.confidence_level, nsamples-1))
self.cost_mean = numpy.mean( self.org_model.cost_results)
3.4.2 NSGAII Implementation
This section contains the code listing of a fairly general implementation of the
non-dominated search genetic algorithm (NSGAII). It was this implementation
of the algorithm that was used to create the results presented in Section
2.4.3.
import random import sys class Individual(object):
""" Template class for usage with NSGAII. Classes representing
individuals in the implementation of NSGAII in this package
needs to implement the functions and variables that this
class implements. """ def __init__(self, genome):
self.domination_count = 0 self.distance = 0.0
self.genome = genome
self.refresh() def dominates(self, individual):
""" Test domination relationship compared to ’individual’.
Return -1,0,1, similar to the built in function
’cmp’. """ positive = False negative = False
for k in self.objective_keys(): d = cmp(k(self),k(individual))
if d > 0: positive = True elif d < 0:
negative = True if positive and negative:
return 0 if positive: return 1 if negative:
return -1 return 0 def crossover(self, individual):
""" Perform crossover between self and specified individual.
Returns a tuple with the resulting two children, unless
called with self as the argument, then returns None. """
sys.stderr.write("Unimplemented abstract function " +
"NSGAII.Indivual.crossover.\n"); exit(1)
def mutate(self, n): """ Mutates gene number ’n’ in the genome. """
sys.stderr.write("Unimplemented abstract function " +
"NSGAII.Individual.mutate.\n");
exit(1) def objective_keys(self):
""" Returns a set of lambda function pointers usable as
keys for sorting based on each objective. Required
for the distance sorting part of the NSGA-II algorithm. """
sys.stderr.write("Unimplemented abstract function " +
"NSGAII.Individual.objective_keys.\n"); exit(1)
def refresh(self): """ Parses the individuals bit string. """
sys.stderr.write("Unimplemented abstract function " +
"NSGAII.Individual.refresh.\n"); exit(1)
class NSGAII: """ Implements NSGA-II """ def __init__(self,
population, mutation_probability=0.005):
self.population = population
self.mutation_probability = mutation_probability
self.population_size = len(self.population)
if (self.population_size%2) == 1:
sys.stderr.write("Population size must be even") exit(1)
self.generation_counter = 0 self.selection()
def step(self): """ Generates the next generation """
self.crossover() self.mutation() self.selection()
# Increase resolution of selected individuals using
# the drop-by-drop technique if applicable
if hasattr(self.population[0],"drop"):
for i in self.population: i.drop()
self.generation_counter += 1 def crossover(self):
""" Create children, increasing the size of the current
population. """ reproductive_pool = self.reproduce()
pool_size = len(reproductive_pool)
index = random.sample(xrange(pool_size),pool_size)
for i in xrange(0,pool_size,2):
children = self.population[index[i]].crossover(
self.population[index[i+1]]); if children is not None:
self.population.append(children[0])
self.population.append(children[1]) def mutation(self):
""" Flip bits (in place) in the population with some
probability, while preserving the non-dominated set
(elitism). This function implements a so called
mutation-clock operator. """
pi = iter(self.population[self.non_dominated_set_size:])
p = pi.next() i = 0 n = 0 try:
while True: i += random.randrange(
int(1/self.mutation_probability)*2)
while (n+len(p.genome)-1) < i: n += len(p.genome)
p = pi.next() p.mutate(i-n)
except StopIteration: pass def selection(self):
""" Reduces the population size to N, where N is the
desired population size. Selection of the N
individuals is done using NSGA-II. """
ordered_sets = self.non_dominated_sort() new_population = []
i = 0 while (len(new_population) + len(ordered_sets[i])) <= \
self.population_size: new_population.extend(ordered_sets[i])
i += 1 if i == len(ordered_sets):
break if len(new_population) != self.population_size:
s = self.distance_sort(ordered_sets[i]) new_population.extend(
s[0:(self.population_size-len(new_population))])
self.population = new_population
if len(ordered_sets[0]) > self.population_size:
self.non_dominated_set_size = self.population_size else:
self.non_dominated_set_size = len(ordered_sets[0]) def reproduce(self):
""" Creates a pool of candidates for reproduction through
tournament selection. """
reproductive_pool = [] # First round in tournament
index = random.sample(xrange(self.population_size),
self.population_size)
for i in xrange(0,self.population_size,2):
d = self.population[index[i]].dominates(
self.population[index[i+1]]) if d == 1:
reproductive_pool.append( self.population[index[i]])
elif d == -1: reproductive_pool.append(
self.population[index[i+1]])
else: reproductive_pool.append(
random.sample([self.population[index[i]],
self.population[index[i+1]]], 1))
# Second round in tournament
index = random.sample(xrange(self.population_size),
self.population_size)
for i in xrange(0,self.population_size,2):
d = self.population[index[i]].dominates(
self.population[index[i+1]]) if d == 1:
reproductive_pool.append( self.population[index[i]])
elif d == -1: reproductive_pool.append(
self.population[index[i+1]])
else: reproductive_pool.append(
random.sample([self.population[index[i]],
self.population[index[i+1]]], 1))
return reproductive_pool def non_dominated_sort(self):
""" Creates and returns a list of sets, containing the
individuals in the list ’population’ sorted by
domination rank. """ for p in self.population:
p.domination_count = 0 p.dominated_set = set()
for i in xrange(len(self.population)):
for j in xrange(i+1,len(self.population)):
d = self.population[i].dominates(self.population[j])
if d == 1: # i dominates j
self.population[i].dominated_set.add(
self.population[j])
self.population[j].domination_count += 1
elif d == -1: # j dominates i
self.population[j].dominated_set.add(
self.population[i])
self.population[i].domination_count += 1
ordered_sets = [] current_set = set() added_individuals = 0
while added_individuals < len(self.population):
if len(current_set) == 0: for p in self.population:
if p.domination_count == 0:
current_set.add(p) ordered_sets.append(current_set)
added_individuals += len(current_set)
current_set = set() for c in ordered_sets[-1]:
for p in c.dominated_set: p.domination_count -= 1
if p.domination_count == 0:
current_set.add(p)
return ordered_sets def distance_sort(self, front):
""" Sorts the individuals in the set ’front’ according to
relative distance, using the method outlined in
NSGA-II. """ if len(front) < 2: return front
infinity = 1e16 sorted_front = [] for p in front:
p.distance = 0.0 sorted_front.append(p)
for key_func in sorted_front[0].objective_keys():
sorted_front.sort(key=key_func)
frange = key_func(sorted_front[-1]) - \
key_func(sorted_front[0]) if frange != 0.0:
sorted_front[0].distance = infinity
sorted_front[-1].distance = infinity
for i in xrange(1,len(sorted_front)-1):
sorted_front[i].distance += \
(key_func(sorted_front[i+1]) - \
key_func(sorted_front[i-1])) / frange
sorted_front.sort(cmp=lambda x,y: \
cmp(y.distance,x.distance)) return sorted_front