loyalty.dev

How We Solved a Critical Memory Issue When Ingesting a Large Gzip JSON File in Ruby

In this article, we pull back the curtain on how the team investigated the out-of-memory (OOM) issue, optimised memory usage, and validated it to ensure that our system remained robust as the Gzip JSON file that we ingest continues to grow.

Introduction

In software engineering, it is not uncommon to build a solution that works initially, only to watch it falter as the scale grows larger. For more than a year, the team responsible for integrating with external services to ingest data, has reliably ingested small files with a few hundred records. However, more recently, have encountered out-of-memory (OOM) issues as the file size grew significantly larger than what the current solution could handle.

From Smooth Sailing to Stormy Seas

Our system has been ingesting a 1.25MB file daily, but our most recent integration requires us to process a 55MB GZIP JSON file when decompressed, containing 22k records. When ingesting a file 44 times larger than what we were accustomed to, we noticed that it was not successfully ingested, and according to the logs, the 512MB container crashed, causing the app to restart. Our Grafana metrics also showed that the memory usage of the container was hovering around 400MB before the crash happened.

At this point, I began to hypothesise that it was due to OOM which caused the container to crash, but let’s practice the hypothesis-driven validation approach to dig deeper, challenging our hypothesis through testing.

Donning the Detective Hat

To validate memory usage locally, we’ve installed getprocessmem, a Ruby gem that analyses the memory usage at the various points of the code which is what we need to help us validate the memory usage.

def call
    mb = GetProcessMem.new.mb
    p "before downloading MEMORY USAGE(MB): #{ mb.round }"

    gzip_file = download_file(remote_file_path)
    mb = GetProcessMem.new.mb
    p "after downloading MEMORY USAGE(MB): #{ mb.round }"

    result = JSON.parse(gzip_file.read)
    mb = GetProcessMem.new.mb
    p "after reading MEMORY USAGE(MB): #{ mb.round }"
end

Here’s the result of the above code:

before downloading MEMORY USAGE(MB): 328
after downloading MEMORY USAGE(MB): 328
after reading file MEMORY USAGE(MB): 475 # +150mb

Below are our findings which aligns with our earlier hypothesis where the container crashed due to OOM and we have further identified the root cause of it

  • Reading the file has inflated memory usage 2.7 times the actual file size (55MB)
  • As mentioned in the previous section where our 512MB instance was already running at 400MB, consuming an additional 150MB would push us over the edge by 7.4% 💥

Optimising Memory Usage

There are 2 main areas that we need to optimise here:

  1. Parsing the JSON file
  2. Ingesting the raw file

Evaluating Oj vs. JSON for Better Performance

In the previous section, we discovered that parsing the JSON file inflated memory usage by 2.7 times the actual file size. The Oj gem has been known to be a more performant gem for parsing JSON data as compared to the standard JSON gem, and can be used as a drop-in replacement by using Oj.load(gzip_file.read).

Benchmarking the Oj gem against JSON does indeed show that the Oj gem is more performant, but it doesn't fully address our memory usage concerns. While Oj reduces memory usage by inflating the file 2.3 times instead of 2.7, it still doesn't scale effectively for our needs.

before downloading MEMORY USAGE(MB): 274
after downloading MEMORY USAGE(MB): 274
after reading file MEMORY USAGE(MB): 401 # +130mb

Streaming File in Chunks

As highlighted previously, all 22k JSON records are too large to be loaded into memory all at once. To manage this, we need to load and process the records in smaller chunks. By iteratively loading partial records into memory, processing them, and then relying on garbage collection to remove them before loading the next batch, we can prevent holding all 22k records into memory and excessive memory utilisation.

Remember that we are streaming a GZIP file which results in a stringified JSON. This adds complexity in terms of processing it in batches since we need a way to determine the start and the end of a JSON string. For example, the JSON data could look like this

"[{\"status\":\"active\",\"name\":{\"first\":\"Lily\",\"middle\":\"Billie\",\"last\":\"Johns\"},\"phoneNumber\":\"+12345678\",\"location\":{\"street\":\"325 Franey Valleys\",\"city\":\"South Hailieville\",\"state\":\"North Dakota\",\"country\":\"Mali\",\"zip\":\"13593\",\"coordinates\":{\"latitude\":73.7041,\"longitude\":-29.4307}},\"uuid\":\"b5a81d5f-f768-4f70-88ef-2c820b2516a4\"}]"

Chunking 10Kb of the data could look like this:

"[{\"status\":\"active\",\"name\":{\"first"

Building the Chisel

Unfortunately, there isn’t a gem available that can process stringified JSON so let’s build our own streamer and processor in the simplest way possible.

First, we’ll need a JsonArrayStreamer to detect the start and end of arrays or hashes within a stringified JSON. This enables us to pinpoint when a complete record has been loaded. The Oj gem that we have introduced previously has ScHandler that does exactly the trick where it has the various callbacks such as array_start, array_append that we need so let’s take advantage of this to build our JsonArrayStreamer

We’ll initialize the JsonArrayStreamer with level = 0 which helps us to track the level of the nested array. This is important because array_append callback is triggered when Oj detects that it has reached the end of the JSON array element, and we only want to append the object to @batch when the whole top level JSON array object is completed.

class JsonArrayStreamer < Oj::ScHandler
  def initialize
    @batch = []
    @level = 0
    super
  end

  ...

  def array_start
    @level += 1

    if @level > 1
          []
        end
  end

  def array_end
    @level -= 1
  end

  def array_append(array, value)
    if @level == 1
        # append to array once 1 full record is loaded
      @batch << value
      # process the batch 
      if @batch.size == @batch_size
        @yielder.(@batch)
        @batch = []
      end
    else
      array << value
    end
  end
end

For example, considering the below data, [1, 2, 3] does not represent the top level array JSON object because we still need to account for "foo": "bar". Hence, we use @level to determine when the top-level JSON array element is complete.

[{ "qwer": [1, 2, 3], "foo": "bar" }, { "qwer": [3, 2, 1], "foo": "rab", }]

Next, we have hash_set which helps us to process the hash when a key value pair is encountered e.g. "foo": "bar". Once one full array JSON object { "qwer": [1, 2, 3], "foo": "bar" } is completed, array_append callback is triggered and this time, @level is 1 and we append it to the @batch


class JsonArrayStreamer < Oj::ScHandler
    ...

  def hash_start
    {}
  end

  def hash_set(hash, key, value)
    hash[key] = value
  end
end

The below diagram further illustrates the state at each callback:

Finally, the missing piece that gels our JsonArrayStreamer together is the #run method which is responsible for streaming the JSON data in chunks and Oj.sc_parse parses the JSON string which is then handled by the various callbacks mentioned above.

When a top level JSON has been read completely, the add_value callback is called so let’s remember to also process the remaining data in the batch in the event where the number of records in the data is smaller than the batch size.

class JsonArrayStreamer < Oj::ScHandler
    ...

  def run(enumerable_data_source, batch_size: 500, &block)
    @yielder = block
    @batch_size = batch_size

    # This turns your enumerator into an IO class, very handy
    # as Oj's sc_parse method wants an IO object.
    io = Piperator::IO.new(enumerable_data_source)
    Oj.sc_parse(self, io)
  end

  # Process remaining data in the batch even if it has not met the batch size
  def add_value(*)
    @yielder.(@batch) unless @batch.empty?
  end
end

Next, we’ll develop JsonGzipProcessor which is designed to decompress the GZIP file and stream the data in chunks.

class JsonGzipProcessor
  CHUNK_SIZE = 1024 # 1MB
  BATCH_SIZE = 500

  def initialize(streamer: JsonArrayStreamer.new)
    @streamer = streamer
  end

  def call(file_path:, chunk_size: CHUNK_SIZE, batch_size: BATCH_SIZE, &block)
    raise ArgumentError.new("Block must be provided to process batch") unless block_given?

    parser(file_path, chunk_size, batch_size).each(&block)
  end

  private

    attr_reader :streamer
end

The following methods are responsible for:

  • #fetch_chunks — Recall that we want to avoid loading all 22k records into memory. By reading the file in chunks of 1MB, we ensure that only 1MB is loaded into memory and is garbage collected once we no longer need it. This keeps the amount of data loaded into memory small.
  • #parser — We rely on JsonArrayStreamer to stream the JSON data and process it in batches of 500 to ensure that our array does not need to hold all 22k records
class JsonGzipProcessor
    ...

  private

    def parser(file_path, chunk_size, batch_size)
      Enumerator.new do |yielder|
        data = fetch_chunks(file_path, chunk_size)
        streamer.run(data, batch_size:) do |batch|
          yielder << batch
        end
      rescue Zlib::GzipFile::Error, Errno::ENOENT, Oj::ParseError => e
        raise Errors::IllFormattedFile.new("Failed to process #{file_path}: #{e.message}")
      end
    end

    def fetch_chunks(file_path, chunk_size)
      Enumerator.new do |yielder|
        Zlib::GzipReader.open(file_path) do |gz|
          buffer = ''
          while (chunk = gz.read(chunk_size))
            chunk = buffer + chunk
            yielder << chunk
          end
        end
      end
    end
end

By marrying JsonGzipProcessor and JsonArrayStreamer, our solution can manage memory usage more effectively and process data with greater efficiency.

Putting Our Chisel to the Test

Having crafted our helper classes to stream the GZIP file in manageable chunks, let’s put the memory optimisation to the test. By leveraging the JsonArrayStreamer and JsonGzipProcessor, we transformed our memory footprint from a hefty 150MB down to a lean 15MB, demonstrating a notable enhancement in efficiency.

after downloading MEMORY USAGE(MB): 310
after processing file MEMORY USAGE(MB): 325 # +15MB

On the staging environment, we’ve trimmed memory usage down to just 16MB, achieving 89% reduction. This means we can now reliably process the GZIP JSON file with consistent success on our 512MB instance 🎉.

The Calm After the Storm

Ingesting small files is straightforward, but processing large files can overwhelm the memory usage of our resources. Employing strategies such as streaming a file in chunks can help ensure that processing remains reliable and efficient even as the file sizes grow.

As Ascenda continues to scale our systems, this is just one of the many tales we have to tell. If you’re eager to learn more about our journey and what we do, be sure to check us out at our Ascenda page!

References