3.4 Optimization Method Implementation

This section contains the complete details of the optimization method implementation discussed in Section 2.4.

3.4.1 Example Organization Model

This section contains the code listing of the code interfacing the example organization model from Section 2.3.3 to the proposed genetic algorithm in Section 2.4.

  import NSGAII
  import OrgModel
  import numpy
  import random
  from scipy import stats
  
  g_number_of_spares_a = 30
  g_number_of_spares_b = 30
  g_number_of_spares_c = 30
  g_max_stock_level_bits  = 5
  g_reorder_level_bits    = 5
  g_float_bits_resolution = 6
  
  class ComponentTypes:
      A = OrgModel.EquipmentComponentType("A", 200, 1.5, 2, 10000)
      B = OrgModel.EquipmentComponentType("B", 400, 1.5, 4, 5000)
      C = OrgModel.EquipmentComponentType("C", 600, 3.0, 6, 40000)
  
  class UnitTypes:
      ARCHER = OrgModel.EquipmentUnitType("ARCHER",
                                          [(1, ComponentTypes.A),
                                           (2, ComponentTypes.B),
                                           (2, ComponentTypes.C)])
  
  class ExampleOrg:
  
      def __init__(self,
                   spare_allocations_a,
                   spare_allocations_b,
                   spare_allocations_c,
                   location1_policy_dict,
                   location2_policy_dict):
          self.spare_allocations_a = spare_allocations_a
          self.spare_allocations_b = spare_allocations_b
          self.spare_allocations_c = spare_allocations_c
          self.location1_policy_dict = location1_policy_dict
          self.location2_policy_dict = location2_policy_dict
          self.cost_results = []
          self.availability_results = []
  
      def run(self):
          # Simulation setup
          evaluationScore = OrgModel.EvaluationScore()
          components = OrgModel.EquipmentComponentObjects(
              evaluationScore)
          units = OrgModel.EquipmentUnitObjects(
              evaluationScore,components)
  
          # Create given spare sets for each location agent
          spares = [[],[],[]]
          for i in self.spare_allocations_a:
              spares[i].append(components.new(ComponentTypes.A))
          for i in self.spare_allocations_b:
              spares[i].append(components.new(ComponentTypes.B))
          for i in self.spare_allocations_c:
              spares[i].append(components.new(ComponentTypes.C))
  
          # Create location agents
          upstream_loc = OrgModel.LocationManagerAgent(
              "L0", evaluationScore, spares[0])
          location1 = OrgModel.LocationManagerAgent(
              "L1",
              evaluationScore,
              spares[1],
              [units.new(UnitTypes.ARCHER),
               units.new(UnitTypes.ARCHER),
               units.new(UnitTypes.ARCHER),
               units.new(UnitTypes.ARCHER)],
              [OrgModel.MaintainerAgent("M1", evaluationScore, 200),
               OrgModel.MaintainerAgent("M2", evaluationScore, 200)],
              OrgModel.MaintenancePolicyObject(
                  self.location1_policy_dict),
              1.0,10.0)
          location2 = OrgModel.LocationManagerAgent(
              "L2",
              evaluationScore,
              spares[2],
              [units.new(UnitTypes.ARCHER),
               units.new(UnitTypes.ARCHER)],
              [OrgModel.MaintainerAgent("M3", evaluationScore, 200)],
              OrgModel.MaintenancePolicyObject(
                  self.location2_policy_dict),
              1.0,10.0)
  
          # Create distribution channels
          channel1 = OrgModel.DistributionChannelObject(
              "DC1",
              evaluationScore,
              upstream_loc,
              location1,
              72.0,1000)
          channel2 = OrgModel.DistributionChannelObject(
              "DC2",
              evaluationScore,
              upstream_loc,
              location2,
              2.0,100)
  
          # Run simulation
          t = 0
          while t < 720:
              dt = 1.0
              location1.step(dt)
              location2.step(dt)
              upstream_loc.step(dt)
              channel1.step(dt)
              channel2.step(dt)
              t += 1
  
          # Collect results. Zero-values are replaced with
          # near-zero values to enable use of harmonic mean
          cost = evaluationScore.getCost()
          if cost < 1:
              self.cost_results.append(1)
          else:
              self.cost_results.append(cost)
          avail = evaluationScore.getAvailability()
          if avail < 0.01:
              self.availability_results.append(0.01)
          else:
              self.availability_results.append(avail)
  
  
  class GAAdapter(NSGAII.Individual):
  
      def __init__(self, genome, drop_size,
                   confidence_level, max_iters=500):
          self.drop_size = drop_size
          self.confidence_level = confidence_level
          self.max_iters = max_iters
          super(GAAdapter,self).__init__(genome)
  
      def crossover(self, individual):
          """ Perform crossover between self and specified individual.
              Returns a tuple with the resulting two children, unless
              called with self as the argument, then returns None. """
          if individual == self:
              return None
          n = random.sample(xrange(len(self.genome)),1)[0]
          cg1 = self.genome[:n]
          cg1.extend(individual.genome[n:])
          cg2 = individual.genome[:n]
          cg2.extend(self.genome[n:])
  
          # Handle cross-over at a float value
          spares_offset = (g_number_of_spares_a +
                           g_number_of_spares_b +
                           g_number_of_spares_c)
          if n > spares_offset:
              comp = spares_offset
              step = g_max_stock_level_bits + \
                      g_reorder_level_bits + \
                      g_float_bits_resolution
              while comp+step < n:
                  comp += step
              if (n - comp) >= (g_max_stock_level_bits +
                                g_reorder_level_bits):
                  i = n - comp - (g_max_stock_level_bits +
                                  g_reorder_level_bits) + 1
                  cg1[n+g_float_bits_resolution-i] = \
                      random.uniform(
                      self.genome[n+g_float_bits_resolution-i],
                      individual.genome[n+g_float_bits_resolution-i])
                  cg2[n+g_float_bits_resolution-i] = \
                      random.uniform(
                      self.genome[n+g_float_bits_resolution-i],
                      individual.genome[n+g_float_bits_resolution-i])
  
          return (GAAdapter(cg1,self.drop_size,
                            self.confidence_level,self.max_iters),
                  GAAdapter(cg2,self.drop_size,
                            self.confidence_level,self.max_iters))
  
      def mutate(self, n):
          """ Mutate genome at bit ’n’. """
          spares_offset = (g_number_of_spares_a +
                           g_number_of_spares_b +
                           g_number_of_spares_c)
          if n < spares_offset:
              # Randomly reassign spare
              s = [0,1,2]
              s.remove(self.genome[n])
              self.genome[n] = random.sample(s,1)[0]
          else:
              comp = spares_offset
              step = g_max_stock_level_bits + \
                      g_reorder_level_bits + \
                      g_float_bits_resolution
              while comp+step < n:
                  comp += step
              if (n - comp) < (g_max_stock_level_bits +
                               g_reorder_level_bits):
                  # Mutation of integer
                  if self.genome[n] == 0:
                      self.genome[n] = 1
                  else:
                      self.genome[n] = 0
              else:
                  # Mutation of float
                  i = n - comp - (g_max_stock_level_bits +
                                  g_reorder_level_bits) + 1
                  if random.random() > 0.5:
                      self.genome[n+g_float_bits_resolution-i] *= \
                          2**i
                  else:
                      self.genome[n+g_float_bits_resolution-i] /= \
                          2**i
          # Reinterpret genome
          self.refresh()
  
      def objective_keys(self):
          """ Returns a set of lambda function pointers usable as 
              keys for comparing objectives. """
          keys = []
          keys.append(lambda obj: obj.availability_mean)
          keys.append(lambda obj: -obj.availability_var_upper_bound)
          keys.append(lambda obj: -obj.cost_mean)
          keys.append(lambda obj: -obj.cost_var_upper_bound)
          return keys
  
      def refresh(self):
          """ Interpret genome and construct an organization model
              using the interpreted parameter values. """
          offset = 0
          # Initial location of type A spares
          spares_loc_a = \
              self.genome[offset:offset+g_number_of_spares_a]
          offset += g_number_of_spares_a
          # Initial location of type B spares
          spares_loc_b = \
              self.genome[offset:offset+g_number_of_spares_b]
          offset += g_number_of_spares_b
          # Initial location of type C spares
          spares_loc_c = \
              self.genome[offset:offset+g_number_of_spares_c]
          offset += g_number_of_spares_c
  
          comp_step = g_max_stock_level_bits + \
              g_reorder_level_bits + \
              g_float_bits_resolution
          # Location 1 policy
          l1_policy_dict = {}
          l1_policy_dict[ComponentTypes.A] = \
              self.refresh_component_policy(offset)
          offset += comp_step
          l1_policy_dict[ComponentTypes.B] = \
              self.refresh_component_policy(offset)
          offset += comp_step
          l1_policy_dict[ComponentTypes.C] = \
              self.refresh_component_policy(offset)
          offset += comp_step
          # Location 2 policy
          l2_policy_dict = {}
          l2_policy_dict[ComponentTypes.A] = \
              self.refresh_component_policy(offset)
          offset += comp_step
          l2_policy_dict[ComponentTypes.B] = \
              self.refresh_component_policy(offset)
          offset += comp_step
          l2_policy_dict[ComponentTypes.C] = \
              self.refresh_component_policy(offset)
          offset += comp_step
  
          # Create the model and run the first drop
          self.org_model = ExampleOrg(
              spares_loc_a,
              spares_loc_b,
              spares_loc_c,
              l1_policy_dict,
              l2_policy_dict)
          self.drop()
  
      def refresh_component_policy(self, offset):
          # Policy for max stock level
          max_stock_level = 0
          for i in xrange(g_max_stock_level_bits):
              max_stock_level += self.genome[offset] << i
              offset += 1
          # Policy for reorder level
          reorder_level = 0
          for i in xrange(g_reorder_level_bits):
              reorder_level += self.genome[offset] << i
              offset += 1
          # Policy for replacement threshold
          # The float value is stored in the last "bit"
          offset += g_float_bits_resolution
          threshold = self.genome[offset-1]
  
          return { "Max Stock Level" : max_stock_level,
                   "Reorder Level"   : reorder_level,
                   "Preventive Hazard Threshold" : threshold }
  
      def drop(self):
          """ Run the organization model for a the number of 
              iterations specified in drop_size. """
          nsamples = len(self.org_model.cost_results)
          if nsamples > self.max_iters:
              return
  
          for i in xrange(self.drop_size):
              self.org_model.run()
              nsamples += 1
  
          # Get the score
          self.availability_var = numpy.var(
              self.org_model.availability_results)
          self.availability_var_upper_bound = (
              self.availability_var*(nsamples-1) /
              stats.chi2.ppf(1-self.confidence_level, nsamples-1))
          self.availability_mean = numpy.mean(
              self.org_model.availability_results)
  
          self.cost_var = numpy.var(
              self.org_model.cost_results)
          self.cost_var_upper_bound = (
              self.cost_var*(nsamples-1) /
              stats.chi2.ppf(1-self.confidence_level, nsamples-1))
          self.cost_mean = numpy.mean(
              self.org_model.cost_results)

3.4.2 NSGAII Implementation

This section contains the code listing of a fairly general implementation of the non-dominated search genetic algorithm (NSGAII). It was this implementation of the algorithm that was used to create the results presented in Section 2.4.3.

  import random
  import sys
  
  class Individual(object):
      """ Template class for usage with NSGAII. Classes representing
          individuals in the implementation of NSGAII in this package
          needs to implement the functions and variables that this
          class implements. """
  
      def __init__(self, genome):
          self.domination_count = 0
          self.distance         = 0.0
          self.genome           = genome
          self.refresh()
  
      def dominates(self, individual):
          """ Test domination relationship compared to ’individual’.
              Return -1,0,1, similar to the built in function 
              ’cmp’. """
          positive = False
          negative = False
          for k in self.objective_keys():
              d = cmp(k(self),k(individual))
              if d > 0:
                  positive = True
              elif d < 0:
                  negative = True
          if positive and negative:
              return 0
          if positive:
              return 1
          if negative:
              return -1
          return 0
  
      def crossover(self, individual):
          """ Perform crossover between self and specified individual.
              Returns a tuple with the resulting two children, unless
              called with self as the argument, then returns None. """
          sys.stderr.write("Unimplemented abstract function " +
                           "NSGAII.Indivual.crossover.\n");
          exit(1)
  
      def mutate(self, n):
          """ Mutates gene number ’n’ in the genome. """
          sys.stderr.write("Unimplemented abstract function " +
                           "NSGAII.Individual.mutate.\n");
          exit(1)
  
      def objective_keys(self):
          """ Returns a set of lambda function pointers usable as 
              keys for sorting based on each objective. Required 
              for the distance sorting part of the NSGA-II 
              algorithm. """
          sys.stderr.write("Unimplemented abstract function " +
                           "NSGAII.Individual.objective_keys.\n");
          exit(1)
  
      def refresh(self):
          """ Parses the individuals bit string. """
          sys.stderr.write("Unimplemented abstract function " +
                           "NSGAII.Individual.refresh.\n");
          exit(1)
  
  class NSGAII:
      """ Implements NSGA-II """
  
      def __init__(self,
                   population,
                   mutation_probability=0.005):
          self.population             = population
          self.mutation_probability   = mutation_probability
          self.population_size        = len(self.population)
          if (self.population_size%2) == 1:
              sys.stderr.write("Population size must be even")
              exit(1)
          self.generation_counter     = 0
          self.selection()
  
      def step(self):
          """ Generates the next generation """
          self.crossover()
          self.mutation()
          self.selection()
          # Increase resolution of selected individuals using
          # the drop-by-drop technique if applicable
          if hasattr(self.population[0],"drop"):
              for i in self.population:
                  i.drop()
          self.generation_counter += 1
  
      def crossover(self):
          """ Create children, increasing the size of the current 
              population. """
          reproductive_pool = self.reproduce()
          pool_size = len(reproductive_pool)
          index = random.sample(xrange(pool_size),pool_size)
          for i in xrange(0,pool_size,2):
              children = self.population[index[i]].crossover(
                  self.population[index[i+1]]);
              if children is not None:
                  self.population.append(children[0])
                  self.population.append(children[1])
  
      def mutation(self):
          """ Flip bits (in place) in the population with some 
              probability, while preserving the non-dominated set 
              (elitism). This function implements a so called 
              mutation-clock operator. """
          pi = iter(self.population[self.non_dominated_set_size:])
          p = pi.next()
          i = 0
          n = 0
          try:
              while True:
                  i += random.randrange(
                      int(1/self.mutation_probability)*2)
                  while (n+len(p.genome)-1) < i:
                      n += len(p.genome)
                      p = pi.next()
                  p.mutate(i-n)
          except StopIteration:
              pass
  
      def selection(self):
          """ Reduces the population size to N, where N is the 
              desired population size. Selection of the N
              individuals is done using NSGA-II. """
          ordered_sets = self.non_dominated_sort()
  
          new_population = []
          i = 0
          while (len(new_population) + len(ordered_sets[i])) <= \
                  self.population_size:
              new_population.extend(ordered_sets[i])
              i += 1
              if i == len(ordered_sets):
                  break
  
          if len(new_population) != self.population_size:
              s = self.distance_sort(ordered_sets[i])
              new_population.extend(
                  s[0:(self.population_size-len(new_population))])
  
          self.population = new_population
          if len(ordered_sets[0]) > self.population_size:
              self.non_dominated_set_size = self.population_size
          else:
              self.non_dominated_set_size = len(ordered_sets[0])
  
      def reproduce(self):
          """ Creates a pool of candidates for reproduction through
              tournament selection. """
          reproductive_pool = []
          # First round in tournament
          index = random.sample(xrange(self.population_size),
                                self.population_size)
          for i in xrange(0,self.population_size,2):
              d = self.population[index[i]].dominates(
                  self.population[index[i+1]])
              if d == 1:
                  reproductive_pool.append(
                      self.population[index[i]])
              elif d == -1:
                  reproductive_pool.append(
                      self.population[index[i+1]])
              else:
                  reproductive_pool.append(
                      random.sample([self.population[index[i]],
                                     self.population[index[i+1]]], 1))
          # Second round in tournament
          index = random.sample(xrange(self.population_size),
                                self.population_size)
          for i in xrange(0,self.population_size,2):
              d = self.population[index[i]].dominates(
                  self.population[index[i+1]])
              if d == 1:
                  reproductive_pool.append(
                      self.population[index[i]])
              elif d == -1:
                  reproductive_pool.append(
                      self.population[index[i+1]])
              else:
                  reproductive_pool.append(
                      random.sample([self.population[index[i]],
                                     self.population[index[i+1]]], 1))
  
          return reproductive_pool
  
      def non_dominated_sort(self):
          """ Creates and returns a list of sets, containing the 
              individuals in the list ’population’ sorted by 
              domination rank. """
          for p in self.population:
              p.domination_count = 0
              p.dominated_set = set()
  
          for i in xrange(len(self.population)):
              for j in xrange(i+1,len(self.population)):
                  d = self.population[i].dominates(self.population[j])
                  if d == 1:    # i dominates j
                      self.population[i].dominated_set.add(
                          self.population[j])
                      self.population[j].domination_count += 1
                  elif d == -1: # j dominates i
                      self.population[j].dominated_set.add(
                          self.population[i])
                      self.population[i].domination_count += 1
  
          ordered_sets = []
          current_set = set()
          added_individuals = 0
          while added_individuals < len(self.population):
              if len(current_set) == 0:
                  for p in self.population:
                      if p.domination_count == 0:
                          current_set.add(p)
  
              ordered_sets.append(current_set)
              added_individuals += len(current_set)
              current_set = set()
              for c in ordered_sets[-1]:
                  for p in c.dominated_set:
                      p.domination_count -= 1
                      if p.domination_count == 0:
                          current_set.add(p)
  
          return ordered_sets
  
      def distance_sort(self, front):
          """ Sorts the individuals in the set ’front’ according to
              relative distance, using the method outlined in 
              NSGA-II. """
          if len(front) < 2:
              return front
  
          infinity = 1e16
  
          sorted_front = []
          for p in front:
              p.distance = 0.0
              sorted_front.append(p)
  
          for key_func in sorted_front[0].objective_keys():
              sorted_front.sort(key=key_func)
              frange = key_func(sorted_front[-1]) - \
                  key_func(sorted_front[0])
              if frange != 0.0:
                  sorted_front[0].distance  = infinity
                  sorted_front[-1].distance = infinity
                  for i in xrange(1,len(sorted_front)-1):
                      sorted_front[i].distance += \
                          (key_func(sorted_front[i+1]) - \
                               key_func(sorted_front[i-1])) / frange
  
          sorted_front.sort(cmp=lambda x,y: \
                                cmp(y.distance,x.distance))
          return sorted_front