Race conditions are a thing. They happen, and as devs we hate them because it is difficult to track them down. Once we have, we are confronted with a new, more insidious problem: How do we know we’ve fixed them?

This blog post presents a class that I wrote for testing race conditions, named Breakpoints. It uses some neat features of Ruby threads. I'll also cover some issues we faced when attempting to use an existing Ruby gem (fork_break) for such testing.

Our Problem

At Flipp, our flyer processing pipeline has many steps, with some steps that can benefit from running subtasks in parallel. Because we use Delayed Job on a number of servers to achieve this parallelism, we ran into (shock!) a race condition.

To illustrate, let's pretend we're racing horses (like in the post image above). We need to feed each horse before it can race, and the race can only take place if all the horses are fed:

class Horse  
 def give_feed
   buy_feed_and_feed_horse
   horse_race.start_race_if_ready
 end
end

class HorseRace  
 def start_race_if_ready
    if self.horses.all?(&:fed)
       Delayed::Job.enqueue(Jobs::RunRace.new)
    end
 end
end  
  • Horse 1 is fed on one process.

  • Horse 2 is fed on another process.

  • Process 1 checks the database to see if all other horses are fed - Yes they are! Start the race!

  • Process 2 checks the database to see if all other horses are fed - Yes they are! Start the race!

We get the race happening twice in this scenario, which will likely confuse and annoy all the bookies. Depending on the implementation, we could just crash when we try to race things twice, due to validation errors.

The solution to our specific problem was to put the check and insertion into a single atomic SQL query (i.e. insert where not exists) - no more race condition. But how do we know - really, really know - that what we did worked?

RSpec and fork_break

Our testing framework uses RSpec and FactoryGirl. A previous attempt to simulate a race condition involved putting hooks halfway down a method, stubbing them out, then manually calling them later - it was not very elegant, to say the least.

Fortunately, a little research uncovered a pretty brilliant little gem called fork_break. This actually allows you to put a "breakpoint" anywhere in a process, the same way you would in a debugger. You then create forks, run them until the breakpoints you need, and then have the next process take over as necessary.

I devised a way to simply wrap any method on an existing object in breakpoints, and run the two subtasks simultaneously:

# Add breakpoints around a method to a class. Used for concurrency testing
# using ForkBreak.
# @param klass [Class] the class to add breakpoints to.
# @param method [Symbol] the method to add breakpoints around.
def add_breakpoints(klass, method)  
  return if klass.respond_to?("_before_breakpoint_#{method}")
  klass.send(:include, ForkBreak::Breakpoints)
  klass.send(:alias_method, :"_before_breakpoint_#{method}", method)
  klass.send(:define_method, method) do |*args|
    breakpoints << :"before_#{method}"
    self.send(:"_before_breakpoint_#{method}", *args)
    breakpoints << :"after_#{method}"
  end
end

# add breakpoints
add_breakpoints(HorseRace, :start_race_if_ready)

# don't actually feed the horse, we are only testing the race condition
Horse.any_instance.stub(:buy_feed_and_feed_horse)

describe 'Horse Racing' do

  it 'should race only once' do
    processes = horses.map do |horse|
      ForkBreak::Process.new do
        horse.give_feed
      end
    end
    processes.first.run_until(:before_start_race_if_ready).wait
    processes.last.finish.wait
    Delayed::Job.where('handler LIKE "%RunRace%"').
      count.should == 1
    processes.first.finish.wait
    Delayed::Job.where('handler LIKE "%RunRace%"').
      count.should == 1
  end
end  

Issues With fork_break

This looked great, but we ran into a few problems. The first is how fork_break passes data to the different processes. Down in the depths of the forking process, Marshal.dump is used to turn the Ruby objects into binary code. However, Marshal.dump is not fond of singleton methods, and RSpec uses exactly those methods with its stubs. It would be fine if we were trying to marshal up the actual Horse object in our example, but we were actually sending it a HorseRace, which has a Horse as an attribute. After several hours of trying a nice clean solution, I was forced to hack this into it:

class Horse  
  def marshal_dump
    self.dup
  end
end  

With this, the spec ran beautifully - on my local Mac. Once we pushed the code up to our Jenkins server, however, we were faced with a huge set of seemingly entirely random errors. One Jenkins box ran out of memory because we were creating entirely new Rails processes. RSpec would report stack level too deep but be unable to tell us where. In true Heisenbug manner, putting random puts statements would change the error to something else entirely. Rails lost its connection to the database, and after hours of work re-establishing it, we were back to stack levels, causing the errors to start all over again. Eventually the team realized we would either need to try something else, or be stuck debugging for days or weeks.

Spinning Our Own Threads

Rather than go the brute force method and forking entirely different processes, we reluctantly put fork_break aside and decided to spin up our own solution using threads. Our spec looks quite similar using our new solution:

# add breakpoints
add_breakpoints(HorseRace, :start_race_if_ready)

# don't actually feed the horse, we are only testing the race condition
Horse.any_instance.stub(:buy_feed_and_feed_horse)

describe 'Horse Racing' do

it 'should race only once' do  
  threads = horses.map do |horse|
    Breakpoints::Thread.new do
      horse.give_feed
    end
  end
  threads.first.run_until(:before_start_race_if_ready)
  threads.last.finish
  Delayed::Job.where('handler LIKE "%RunRace%"').
    count.should == 1
  threads.first.finish
  Delayed::Job.where('handler LIKE "%RunRace%"').
    count.should == 1
end  

The magic here is with our shiny new Breakpoints class:

# Fail the spec if any thread throws an error
::Thread.abort_on_exception = true

# Class that can run a thread until a given breakpoint.
class Breakpoints

  class << self
    attr_accessor :main_thread
    attr_accessor :breakpoints

    # Add breakpoints around a method to a class. This can be passed into
    # the run_until method.
    # @param klass [Class] the class to add breakpoints to.
    # @param method [Symbol] the method to add breakpoints around.
    def add_breakpoints(klass, method)
      return if klass.respond_to?("_before_breakpoint_#{method}")
      klass.send(:alias_method, :"_before_breakpoint_#{method}", method)
      klass.send(:define_method, method) do |*args|
        Breakpoints.break(:"before_#{method}")
        ret_val = self.send(:"_before_breakpoint_#{method}", *args)
        Breakpoints.break(:"after_#{method}")
        ret_val
      end
    end

    # Break the current thread if the breakpoint is set.
    # @param breakpoint [Symbol]
    def break(breakpoint)
      if ::Thread.current[:breakpoints] &&
        ::Thread.current[:breakpoints].include?(breakpoint)
          ::Thread.current[:breakpoints_reached] << breakpoint
          self.main_thread.run
          ::Thread.stop
      end
    end
  end

  class Thread
    # Create a thread to run later.
    def initialize(&block)
      @block_to_run = block
    end

    # Continue a thread to stop at the next breakpoint or finish running.
    def finish
      if @thread
        unless @thread.alive?
          breakpoints = @thread[:breakpoints] || []
          breakpoints_reached = @thread[:breakpoints_reached] || []
          missed_breakpoints = breakpoints - breakpoints_reached
          if missed_breakpoints.any?
            raise "Breakpoint(s) #{missed_breakpoints.to_a.join(', ')}" +
              "not reached!"
          end
        end
        @thread.wakeup
        ::Thread.stop
      else
        run_until(nil)
      end
    end

    # Run the thread until it reaches the given breakpoint.
    # @param breakpoint [Symbol]
    def run_until(breakpoint=nil)
      Breakpoints.main_thread = ::Thread.current
      if breakpoint && @thread
        @thread[:breakpoints] ||= Set.new
        @thread[:breakpoints_reached] ||= Set.new
        @thread[:breakpoints] << breakpoint
        self.finish
        return
      end

      @thread ||= ::Thread.new do
        if breakpoint
          ::Thread.current[:breakpoints] ||= Set.new
          ::Thread.current[:breakpoints_reached] ||= Set.new
          ::Thread.current[:breakpoints] << breakpoint
          begin
            @block_to_run.call
            unless ::Thread.current[:breakpoints_reached].include?(breakpoint)
              raise "Breakpoint #{breakpoint} not reached!"
            end
          end
        else
          @block_to_run.call
        end
        # return control back to main thread
        Breakpoints.main_thread.run
      end
      ::Thread.stop
    end
  end
end  

This solution runs on several neat features of Ruby threads:

  • Thread variables. These are attributes you can set directly on the thread - in our case, we are setting one variable called breakpoints, which stores the breakpoints we’ve set on the thread, and breakpoints_reached, which will let us know if the thread just barrelled past the breakpoint without stopping.

  • Thread scheduling. This allows us to stop the current thread and switch processing to a different one, by calling thread.wakeup followed by Thread.stop. Note that we found some timing issues when creating multiple threads, which is why we sometimes use wakeup and sometimes use run (which runs the thread immediately). When calling stop on a thread, that thread is taken out of the scheduler and won’t be run at all until we call wakeup (or run) again. This allows us to simulate a breakpoint by stopping the thread when the breakpoint is reached, and starting it again when the calling code specifies.

As an added bonus, this code runs much faster than the previous solution, and if you’re running it in debug mode, it doesn’t need to attach a debugger to each process.

Caveats

​ Two things we need to mention about this solution:

  1. When running this, you have to make sure transactions are turned off in RSpec:

    RSpec.configure do
      config.use_transactional_fixtures = true
    end
    
  2. You need to make sure all connections are cleaned up after each test:

    after(:each) { ActiveRecord::Base.connection_pool.disconnect! }
    

Failing to do that may result in deadlocks, at least with MySQL. We ran into this particularly when using the Database Cleaner gem to ensure a pristine state in our database.

So there you have it - a reusable, intuitive way to add breakpoints and test race conditions.


Do you love writing Ruby code? Are you interested in using your skills to solve challenging problems at Flipp? Check out our current job postings.