I’ve been working with Kiba ETL a lot. I love this tool. It doesn’t do much itself, and it’s premise is so simple. It’s a DSL that asks for objects to implement a very simple interface to orchestrate an ETL pipeline. Sources should enumerate data with #each, transformations are performed by running an object through a #process method, and destinations get #writen to. Kiba helps you compose many elements together in a way that is very nice to work with and extend. If you aren’t familiar with it, I’d love to recommend some of Thibaut’s content: https://github.com/thbar/kiba#useful-links

I wanted to jot down some of my moves and patterns for future reference. Each of the examples below are self-contained, and generally use the kiba and awesome_print gems.

Since it’s a long post with lots of code, here’s a quick table of contents:

Making transformations versatile by accepting input_key: and output_key: args

Many transformations work on a specific value, and it’s easy to make your transformer flexible enough to be reused with different datasets. This also gives better compatibility regardless of hash keys being symbols or strings. This KeyGenerator has been useful in many different projects.

require 'digest'

class KeyGenerator
  attr_reader :input_keys, :output_key, :digest

  # input_keys: an array of fields to watch the values from
  # output_key: Where to store the calculated result
  # digest: Set this to true if you want to hash the values.
  #   This is helpful if you have many fields, or they aren't meaningful.
  #
  def initialize(input_keys: [], output_key: :key, digest: false)
    @input_keys = input_keys
    @output_key = output_key
    @digest = digest
  end

  def process(row)
    key = row.values_at(*input_keys)
             .map{|val| remove_insignificant_chars(val)}
             .join(':')
    key = Digest::MD5.hexdigest(key) if digest
    row[output_key] = key
    row
  end

  # Normalize data by removing anything that isn't [a-z0-9].
  def remove_insignificant_chars(val)
    val.to_s.downcase.gsub(/[^a-z0-9]/, '')
  end
end

class FakeData
  def each
    100.times.each do
      yield({
        website: ['www.example.com', '   www.example.com!  '].sample,
        zip_code: %w(55802 90210).sample,
        brand: 'Tireco',
        sku: %w(19953 16516).sample,
        mpc: %w(123123123 456456456).sample,
        section: 205,
        aspect: 50,
        rim: 16,
        load: [nil, 87].sample,
        speed: [nil, 'W'].sample,
        ply: [nil, 10].sample,
        sidewall: [nil, 'BL'].sample
      })
    end
  end
end

require 'kiba'
require 'awesome_print'

Kiba.run(
  Kiba.parse do
    source FakeData

    transform KeyGenerator,
      input_keys: [:website, :zip_code],
      output_key: :location_key

    transform KeyGenerator,
      input_keys: [:brand, :mpc],
      output_key: :product_key

    transform KeyGenerator,
      input_keys: [:website, :sku],
      output_key: :retailer_product_key

    transform KeyGenerator,
      input_keys: [:retailer_product_key, :section, :aspect, :rim, :load, :speed, :ply, :sidewall],
      digest: true

    transform {|row| ap row}
  end
)

{
                 :website => "   www.example.com!  ",
                :zip_code => "90210",
                   :brand => "Tireco",
                     :sku => "16516",
                     :mpc => "456456456",
                 :section => 205,
                  :aspect => 50,
                     :rim => 16,
                    :load => 87,
                   :speed => nil,
                     :ply => 10,
                :sidewall => "BL",
            :location_key => "wwwexamplecom:90210",
             :product_key => "tireco:456456456",
    :retailer_product_key => "wwwexamplecom:16516",
                     :key => "187291fc8d24640cc83d0bb463df4528"
}
{
                 :website => "www.example.com",
                :zip_code => "90210",
                   :brand => "Tireco",
                     :sku => "19953",
                     :mpc => "456456456",
                 :section => 205,
                  :aspect => 50,
                     :rim => 16,
                    :load => 87,
                   :speed => nil,
                     :ply => nil,
                :sidewall => nil,
            :location_key => "wwwexamplecom:90210",
             :product_key => "tireco:456456456",
    :retailer_product_key => "wwwexamplecom:19953",
                     :key => "6f9b364254a06cc06d99910520a58bdb"
}

A simple dedupe filter

Ruby’s Set makes quick work of this. We can check if we are allowed to add a new value. If not, we must have already seen it.

It’s easy to create a dedupliation filter because Ruby evaluates hash and array equality by their contained values. Sweet!

[1, 'Thing'] == [1, 'Thing'] #=> True

Under the hood, this calls object#hash which returns an Integer. We can use this instead of the actual values to save space when working with large strings or many objects.

Note: The value returned by #hash is not consistent between runs of your application, so it’s not a good key like when we used MD5 above.

require 'set'
class DedupeFilter
  attr_reader :keys

  def initialize(keys: [])
    @keys = keys
    @seen = Set.new
  end

  # If we are allowed to add the rows "value" to the set, return the row.
  # Otherwise return nil (which drops it from the pipeline).
  #
  def process(row)
    @seen.add?(row.values_at(*keys).hash) ? row : nil
  end
end

class FakeData
  def each
    1000.times.each do
      yield({
        name: %w(Buddy Pal Guy Dude Bro).sample,
      })
    end
  end
end

require 'kiba'

Kiba.run(
  Kiba.parse do
    source FakeData
    transform DedupeFilter, keys: [:name]
    transform {|row| puts row}
  end
)
{:name=>"Bro"}
{:name=>"Guy"}
{:name=>"Buddy"}
{:name=>"Pal"}
{:name=>"Dude"}

Using pre-allocated Kiba ETL classes outside of Kiba’s direct supervision

The simplicity of Kiba comes from it’s simple, generic workflow. A Ruby class just needs to respond to one of three methods to perform the expected role of a source, transformation or destination. They don’t actually do anything other than direct traffic. Because of this, these simple objects are just as useful outside of Kiba’s ETL declarations. You can instantiate any of these same objects and use them in new ways, even if you’re not using Kiba at all!

The following examples involve maintaining a reference to an ETL class in your job, instead of letting Kiba manage it’s lifecycle:

Composing Transformations

Since a transformation class is simply an object that responds to the #process message by taking a row input, and returning a row output, they are easy to use outside of the main Kiba transform pipeline.

Here’s an example that composes multiple transformations into a single class transformation using delegation.

# Transformation to Violently convert all string values to #upcase.
#
# Maybe we should be using the input_key move above? Oh well.
#
class Upcaser
  def process(row)
    row.transform_values{|val| val.upcase rescue val}
  end
end

# Transformation to violently stuff things between all string value chars.
# (why???????)
#
class Spacer
  def initialize(space = ' ')
    @space = space
  end

  def process(row)
    row.transform_values{|val| val.chars.join(@space) rescue val}
  end
end

# Why don't we glue them all together? This series of transformations
#   seems like a really common thing I want to do...
#
# More importantly, it's really easy to compose Transformations together
# because they all follow the Kiba Transform interface.
#
class FieldSpacer
  ##
  # Forever immortalize this maligned text butcherizer.
  # Make it nice and easy for everyone to butcher text with.
  #
  # Notice that if we _did_ use the input_key move from above,
  # we can forward that on down to each of these guys as well.
  #
  def initialize
    @transformers = [
      Upcaser.new,
      Spacer.new(':'),
      Spacer.new
    ]
  end

  # "Reduce" the row by applying the list of transformations against it
  def process(row)
    @transformers.reduce(row){|out, transformer| transformer.process(out)}
  end
end

class FakeData
  def each
    10.times.each do
      yield({
        name: %w(Buddy Pal Guy Dude Bro).sample,
        age: rand(21..100)
      })
    end
  end
end


require 'kiba'
require 'awesome_print'
Kiba.run(
  Kiba.parse do
    source FakeData
    transform FieldSpacer
    transform {|row| ap row}
  end
)
{
    :name => "P : A : L",
     :age => 48
}
{
    :name => "B : R : O",
     :age => 97
}
{
    :name => "B : R : O",
     :age => 46
}
{
    :name => "P : A : L",
     :age => 71
}
{
    :name => "P : A : L",
     :age => 29
}
{
    :name => "D : U : D : E",
     :age => 76
}
{
    :name => "G : U : Y",
     :age => 26
}
{
    :name => "D : U : D : E",
     :age => 74
}
{
    :name => "G : U : Y",
     :age => 44
}
{
    :name => "D : U : D : E",
     :age => 76
}

Holding reference to a reporting type transformation

Holding reference to your transformation makes it easy to do more with it whenever you want. Kiba allows you to use either a class or a block transform. The interface means you can also use both at the same time.

In this example, we create a simple package of counters using the typical Kiba #process method, but the actual reporting from them happens outside of Kiba’s row processing.

class FieldCounter
  attr_reader :input_key
  attr_accessor :counter

  def initialize(input_key)
    @input_key = input_key
    @counter = Hash.new(0)
  end

  def process(row)
    counter[row[input_key]] += 1
    row
  end

  alias count counter
end

class FakeData
  def each
    1000.times.each do
      yield({
        name: %w(Buddy Pal Guy Dude Bro).sample,
        age: rand(21..30),
        favorite_color: %w(red green blue).sample
      })
    end
  end
end


require 'kiba'
require 'json'

Kiba.run(
  Kiba.parse do
    # Initialize our transforms in our own scope so that we can use them
    #   outside of row processing, during post_process.
    counters = [:name, :age, :favorite_color].map{|field| FieldCounter.new field}

    source FakeData

    # A class transform is easy to use inside a block transform
    # since they follow Kiba's expected API.
    # 
    # This is also a way to "wrap" a class transform with additional functionality.
    #
    transform {|row| counters.each {|processor| processor.process(row)}}
    
    # At this point, the Kiba pipeline is fully executed.
    # Sources have sourced, transformers have transformed, and destinations have destined.
    #
    # We can leverage our reference to the transformations to output a simple report at the end.
    # Since it's just a class with data, we can do whatever we want with it.
    post_process do
      puts '-' * 30
      puts 'Counter summary:'
      puts '-' * 30

      counters.each do |counter|
        puts counter.input_key
        puts JSON.pretty_generate counter.count
      end

      puts "\n" * 2
      puts '-' * 30
      puts "We can interact with these however we want. Maybe this should go to a Slack channel?"
      puts '-' * 30

      puts JSON.pretty_generate(
        # Combine the counters into a single hash:
        counters.reduce({}){|out, processor| out.merge(processor.input_key => processor.count)}
      )
    end
  end
)
------------------------------
Counter summary:
------------------------------
name
{
  "Pal": 163,
  "Dude": 221,
  "Buddy": 223,
  "Guy": 197,
  "Bro": 196
}
age
{
  "22": 101,
  "26": 93,
  "24": 87,
  "27": 100,
  "29": 117,
  "23": 104,
  "30": 102,
  "25": 106,
  "28": 106,
  "21": 84
}
favorite_color
{
  "blue": 331,
  "red": 324,
  "green": 345
}


------------------------------
We can interact with these however we want. Maybe this should go to a Slack channel?
------------------------------
{
  "name": {
    "Pal": 163,
    "Dude": 221,
    "Buddy": 223,
    "Guy": 197,
    "Bro": 196
  },
  "age": {
    "22": 101,
    "26": 93,
    "24": 87,
    "27": 100,
    "29": 117,
    "23": 104,
    "30": 102,
    "25": 106,
    "28": 106,
    "21": 84
  },
  "favorite_color": {
    "blue": 331,
    "red": 324,
    "green": 345
  }
}

Aggregating data and chaining destinations

This example collects records until all rows have been processed. It then applies an aggregation to the entire dataset, and finally forwards the data as rows to a new destination that we instantiate outside of Kiba’s pipeline.

note: In version 2.5 of Kiba, this example isn’t necessary as Transformation classes can implement a #close method of their own, and the streaming runner in 2.5 can yield multiple records. This means a transformation can play the part of “hold all of the records until I’ve seen them all, then modify and send them”.

# A destination that collects all of the data before doing a summarization, which triggers a new destination
class ChainableAggregateDestination

  # aggregation: A proc that transforms an array of rows
  # next_destination: a Kiba Destination that will receive the transformed rows
  def initialize(aggregation:, next_destination:)
    @aggregation = aggregation
    @next_destination = next_destination
    @records = []
  end

  # Temorarily hold the records in an array so we can reprocess them once we've received all records
  def write(row)
    @records << row
  end

  # Call the aggregation transformation, and send the resulting data to the next destination
  # Additionally, trigger the next_destination's close method, since Kiba will not be directly
  #   interacting with this object.
  def close
    @aggregation.call(@records).each {|row| @next_destination.write(row)}
    @next_destination.close
  end
end


# A common CSV destination class
require 'csv'
class CSVDestination
  attr_reader :headers

  def initialize(filename: 'output.csv')
    @csv = CSV.open(filename, 'w')
  end

  def write(row)
    @csv << @headers = row.keys unless headers
    @csv << row.values_at(*headers)
  end

  def close
    @csv.close
  end
end

# Some product data
class FakeData
  def each
    1000.times.each do
      yield({
        website: 'www.example.com',
        zip_code: %w(55802 55811).sample,
        sku: %w(11111 33333 55555).sample,
        price: rand(50.0..200.0).round(2)
      })
    end
  end
end

require 'kiba'
require 'awesome_print'

job = Kiba.parse do

  # A simple helper function to keep the code below cleaner
  def median(array)
    sorted = array.sort
    len = sorted.length
    (sorted[(len - 1) / 2] + sorted[len / 2]) / 2.0
  end

  source FakeData

  # We'll save all of the source data in a CSV
  destination CSVDestination, filename: 'alldata.csv'

  # And also perform an aggregation against all of the data, and save that too.
  destination ChainableAggregateDestination,
    aggregation: ->(rows){
      rows.group_by{|row| row.slice(:website, :zip_code, :sku)}.map do |key, values|
        prices = values.map{|row| row[:price]}

        key.merge(
          sample_size: prices.size,
          min_price: prices.min,
          median_price: median(prices),
          max_price: prices.max,
        ).tap{|row| ap row}
      end
    },
    next_destination: CSVDestination.new(filename: 'summary_data.csv')
    # Notice that we are instantiating our own Destination class, instead of letting Kiba handle it directly
end

Kiba.run(job)
{
         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "33333",
     :sample_size => 175,
       :min_price => 51.15,
    :median_price => 117.88,
       :max_price => 199.77
}
{
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "33333",
     :sample_size => 157,
       :min_price => 50.76,
    :median_price => 130.42,
       :max_price => 199.41
}
{
         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "55555",
     :sample_size => 166,
       :min_price => 50.15,
    :median_price => 128.875,
       :max_price => 199.38
}
{
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "55555",
     :sample_size => 169,
       :min_price => 50.73,
    :median_price => 125.67,
       :max_price => 199.86
}
{
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "11111",
     :sample_size => 174,
       :min_price => 53.09,
    :median_price => 130.97,
       :max_price => 199.65
}
{
         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "11111",
     :sample_size => 159,
       :min_price => 50.66,
    :median_price => 125.34,
       :max_price => 199.94
}

Write records in batches

Collecting my records in a destination before actually doing something with them reminded me of a move I do for batching. This is helpful when posting data to an API, or writing to a DB. I won’t do a full example, but consider this destination:

class SqlServerDestination
  attr_reader :batch_size

  def initialize(dataset:, batch_size: 1_000)
    @dataset = dataset
    @batch_size = batch_size

    @rows = []
  end

  def write(row)
    @rows << row
    flush if @rows.size >= batch_size
  end

  def flush
    @dataset.multi_insert(@rows)
    @rows = []
  end
end

Whew. That’s basically every move I know.