I’ve been working with Kiba ETL a lot. I love this tool. It doesn’t do much itself, and it’s premise is so simple. It’s a DSL that asks for objects to implement a very simple interface to orchestrate an ETL pipeline. Sources should enumerate data with #each, transformations are performed by running an object through a #process method, and destinations get #writen to. Kiba helps you compose many elements together in a way that is very nice to work with and extend. If you aren’t familiar with it, I’d love to recommend some of Thibaut’s content: https://github.com/thbar/kiba#useful-links

I wanted to jot down some of my moves and patterns for future reference. Each of the examples below are self-contained, and generally use the kiba and awesome_print gems.

Since it’s a long post with lots of code, here’s a quick table of contents:

Making transformations versatile by accepting input_key: and output_key: args

Many transformations work on a specific value, and it’s easy to make your transformer flexible enough to be reused with different datasets. This also gives better compatibility regardless of hash keys being symbols or strings. This KeyGenerator has been useful in many different projects.

require 'digest'

class KeyGenerator
  attr_reader :input_keys, :output_key, :digest

  # input_keys: an array of fields to watch the values from
  # output_key: Where to store the calculated result
  # digest: Set this to true if you want to hash the values.
  #   This is helpful if you have many fields, or they aren't meaningful.
  def initialize(input_keys: [], output_key: :key, digest: false)
    @input_keys = input_keys
    @output_key = output_key
    @digest = digest

  def process(row)
    key = row.values_at(*input_keys)
             .map{|val| remove_insignificant_chars(val)}
    key = Digest::MD5.hexdigest(key) if digest
    row[output_key] = key

  # Normalize data by removing anything that isn't [a-z0-9].
  def remove_insignificant_chars(val)
    val.to_s.downcase.gsub(/[^a-z0-9]/, '')

class FakeData
  def each
    100.times.each do
        website: ['www.example.com', '   www.example.com!  '].sample,
        zip_code: %w(55802 90210).sample,
        brand: 'Tireco',
        sku: %w(19953 16516).sample,
        mpc: %w(123123123 456456456).sample,
        section: 205,
        aspect: 50,
        rim: 16,
        load: [nil, 87].sample,
        speed: [nil, 'W'].sample,
        ply: [nil, 10].sample,
        sidewall: [nil, 'BL'].sample

require 'kiba'
require 'awesome_print'

  Kiba.parse do
    source FakeData

    transform KeyGenerator,
      input_keys: [:website, :zip_code],
      output_key: :location_key

    transform KeyGenerator,
      input_keys: [:brand, :mpc],
      output_key: :product_key

    transform KeyGenerator,
      input_keys: [:website, :sku],
      output_key: :retailer_product_key

    transform KeyGenerator,
      input_keys: [:retailer_product_key, :section, :aspect, :rim, :load, :speed, :ply, :sidewall],
      digest: true

    transform {|row| ap row}

                 :website => "   www.example.com!  ",
                :zip_code => "90210",
                   :brand => "Tireco",
                     :sku => "16516",
                     :mpc => "456456456",
                 :section => 205,
                  :aspect => 50,
                     :rim => 16,
                    :load => 87,
                   :speed => nil,
                     :ply => 10,
                :sidewall => "BL",
            :location_key => "wwwexamplecom:90210",
             :product_key => "tireco:456456456",
    :retailer_product_key => "wwwexamplecom:16516",
                     :key => "187291fc8d24640cc83d0bb463df4528"
                 :website => "www.example.com",
                :zip_code => "90210",
                   :brand => "Tireco",
                     :sku => "19953",
                     :mpc => "456456456",
                 :section => 205,
                  :aspect => 50,
                     :rim => 16,
                    :load => 87,
                   :speed => nil,
                     :ply => nil,
                :sidewall => nil,
            :location_key => "wwwexamplecom:90210",
             :product_key => "tireco:456456456",
    :retailer_product_key => "wwwexamplecom:19953",
                     :key => "6f9b364254a06cc06d99910520a58bdb"

A simple dedupe filter

Ruby’s Set makes quick work of this. We can check if we are allowed to add a new value. If not, we must have already seen it.

It’s easy to create a dedupliation filter because Ruby evaluates hash and array equality by their contained values. Sweet!

[1, 'Thing'] == [1, 'Thing'] #=> True

Under the hood, this calls object#hash which returns an Integer. We can use this instead of the actual values to save space when working with large strings or many objects.

Note: The value returned by #hash is not consistent between runs of your application, so it’s not a good key like when we used MD5 above.

require 'set'
class DedupeFilter
  attr_reader :keys

  def initialize(keys: [])
    @keys = keys
    @seen = Set.new

  # If we are allowed to add the rows "value" to the set, return the row.
  # Otherwise return nil (which drops it from the pipeline).
  def process(row)
    @seen.add?(row.values_at(*keys).hash) ? row : nil

class FakeData
  def each
    1000.times.each do
        name: %w(Buddy Pal Guy Dude Bro).sample,

require 'kiba'

  Kiba.parse do
    source FakeData
    transform DedupeFilter, keys: [:name]
    transform {|row| puts row}

Using pre-allocated Kiba ETL classes outside of Kiba’s direct supervision

The simplicity of Kiba comes from it’s simple, generic workflow. A Ruby class just needs to respond to one of three methods to perform the expected role of a source, transformation or destination. They don’t actually do anything other than direct traffic. Because of this, these simple objects are just as useful outside of Kiba’s ETL declarations. You can instantiate any of these same objects and use them in new ways, even if you’re not using Kiba at all!

The following examples involve maintaining a reference to an ETL class in your job, instead of letting Kiba manage it’s lifecycle:

Composing Transformations

Since a transformation class is simply an object that responds to the #process message by taking a row input, and returning a row output, they are easy to use outside of the main Kiba transform pipeline.

Here’s an example that composes multiple transformations into a single class transformation using delegation.

# Transformation to Violently convert all string values to #upcase.
# Maybe we should be using the input_key move above? Oh well.
class Upcaser
  def process(row)
    row.transform_values{|val| val.upcase rescue val}

# Transformation to violently stuff things between all string value chars.
# (why???????)
class Spacer
  def initialize(space = ' ')
    @space = space

  def process(row)
    row.transform_values{|val| val.chars.join(@space) rescue val}

# Why don't we glue them all together? This series of transformations
#   seems like a really common thing I want to do...
# More importantly, it's really easy to compose Transformations together
# because they all follow the Kiba Transform interface.
class FieldSpacer
  # Forever immortalize this maligned text butcherizer.
  # Make it nice and easy for everyone to butcher text with.
  # Notice that if we _did_ use the input_key move from above,
  # we can forward that on down to each of these guys as well.
  def initialize
    @transformers = [

  # "Reduce" the row by applying the list of transformations against it
  def process(row)
    @transformers.reduce(row){|out, transformer| transformer.process(out)}

class FakeData
  def each
    10.times.each do
        name: %w(Buddy Pal Guy Dude Bro).sample,
        age: rand(21..100)

require 'kiba'
require 'awesome_print'
  Kiba.parse do
    source FakeData
    transform FieldSpacer
    transform {|row| ap row}
    :name => "P : A : L",
     :age => 48
    :name => "B : R : O",
     :age => 97
    :name => "B : R : O",
     :age => 46
    :name => "P : A : L",
     :age => 71
    :name => "P : A : L",
     :age => 29
    :name => "D : U : D : E",
     :age => 76
    :name => "G : U : Y",
     :age => 26
    :name => "D : U : D : E",
     :age => 74
    :name => "G : U : Y",
     :age => 44
    :name => "D : U : D : E",
     :age => 76

Holding reference to a reporting type transformation

Holding reference to your transformation makes it easy to do more with it whenever you want. Kiba allows you to use either a class or a block transform. The interface means you can also use both at the same time.

In this example, we create a simple package of counters using the typical Kiba #process method, but the actual reporting from them happens outside of Kiba’s row processing.

class FieldCounter
  attr_reader :input_key
  attr_accessor :counter

  def initialize(input_key)
    @input_key = input_key
    @counter = Hash.new(0)

  def process(row)
    counter[row[input_key]] += 1

  alias count counter

class FakeData
  def each
    1000.times.each do
        name: %w(Buddy Pal Guy Dude Bro).sample,
        age: rand(21..30),
        favorite_color: %w(red green blue).sample

require 'kiba'
require 'json'

  Kiba.parse do
    # Initialize our transforms in our own scope so that we can use them
    #   outside of row processing, during post_process.
    counters = [:name, :age, :favorite_color].map{|field| FieldCounter.new field}

    source FakeData

    # A class transform is easy to use inside a block transform
    # since they follow Kiba's expected API.
    # This is also a way to "wrap" a class transform with additional functionality.
    transform {|row| counters.each {|processor| processor.process(row)}}
    # At this point, the Kiba pipeline is fully executed.
    # Sources have sourced, transformers have transformed, and destinations have destined.
    # We can leverage our reference to the transformations to output a simple report at the end.
    # Since it's just a class with data, we can do whatever we want with it.
    post_process do
      puts '-' * 30
      puts 'Counter summary:'
      puts '-' * 30

      counters.each do |counter|
        puts counter.input_key
        puts JSON.pretty_generate counter.count

      puts "\n" * 2
      puts '-' * 30
      puts "We can interact with these however we want. Maybe this should go to a Slack channel?"
      puts '-' * 30

      puts JSON.pretty_generate(
        # Combine the counters into a single hash:
        counters.reduce({}){|out, processor| out.merge(processor.input_key => processor.count)}
Counter summary:
  "Pal": 163,
  "Dude": 221,
  "Buddy": 223,
  "Guy": 197,
  "Bro": 196
  "22": 101,
  "26": 93,
  "24": 87,
  "27": 100,
  "29": 117,
  "23": 104,
  "30": 102,
  "25": 106,
  "28": 106,
  "21": 84
  "blue": 331,
  "red": 324,
  "green": 345

We can interact with these however we want. Maybe this should go to a Slack channel?
  "name": {
    "Pal": 163,
    "Dude": 221,
    "Buddy": 223,
    "Guy": 197,
    "Bro": 196
  "age": {
    "22": 101,
    "26": 93,
    "24": 87,
    "27": 100,
    "29": 117,
    "23": 104,
    "30": 102,
    "25": 106,
    "28": 106,
    "21": 84
  "favorite_color": {
    "blue": 331,
    "red": 324,
    "green": 345

Aggregating data and chaining destinations

This example collects records until all rows have been processed. It then applies an aggregation to the entire dataset, and finally forwards the data as rows to a new destination that we instantiate outside of Kiba’s pipeline.

note: In version 2.5 of Kiba, this example isn’t necessary as Transformation classes can implement a #close method of their own, and the streaming runner in 2.5 can yield multiple records. This means a transformation can play the part of “hold all of the records until I’ve seen them all, then modify and send them”.

# A destination that collects all of the data before doing a summarization, which triggers a new destination
class ChainableAggregateDestination

  # aggregation: A proc that transforms an array of rows
  # next_destination: a Kiba Destination that will receive the transformed rows
  def initialize(aggregation:, next_destination:)
    @aggregation = aggregation
    @next_destination = next_destination
    @records = []

  # Temorarily hold the records in an array so we can reprocess them once we've received all records
  def write(row)
    @records << row

  # Call the aggregation transformation, and send the resulting data to the next destination
  # Additionally, trigger the next_destination's close method, since Kiba will not be directly
  #   interacting with this object.
  def close
    @aggregation.call(@records).each {|row| @next_destination.write(row)}

# A common CSV destination class
require 'csv'
class CSVDestination
  attr_reader :headers

  def initialize(filename: 'output.csv')
    @csv = CSV.open(filename, 'w')

  def write(row)
    @csv << @headers = row.keys unless headers
    @csv << row.values_at(*headers)

  def close

# Some product data
class FakeData
  def each
    1000.times.each do
        website: 'www.example.com',
        zip_code: %w(55802 55811).sample,
        sku: %w(11111 33333 55555).sample,
        price: rand(50.0..200.0).round(2)

require 'kiba'
require 'awesome_print'

job = Kiba.parse do

  # A simple helper function to keep the code below cleaner
  def median(array)
    sorted = array.sort
    len = sorted.length
    (sorted[(len - 1) / 2] + sorted[len / 2]) / 2.0

  source FakeData

  # We'll save all of the source data in a CSV
  destination CSVDestination, filename: 'alldata.csv'

  # And also perform an aggregation against all of the data, and save that too.
  destination ChainableAggregateDestination,
    aggregation: ->(rows){
      rows.group_by{|row| row.slice(:website, :zip_code, :sku)}.map do |key, values|
        prices = values.map{|row| row[:price]}

          sample_size: prices.size,
          min_price: prices.min,
          median_price: median(prices),
          max_price: prices.max,
        ).tap{|row| ap row}
    next_destination: CSVDestination.new(filename: 'summary_data.csv')
    # Notice that we are instantiating our own Destination class, instead of letting Kiba handle it directly

         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "33333",
     :sample_size => 175,
       :min_price => 51.15,
    :median_price => 117.88,
       :max_price => 199.77
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "33333",
     :sample_size => 157,
       :min_price => 50.76,
    :median_price => 130.42,
       :max_price => 199.41
         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "55555",
     :sample_size => 166,
       :min_price => 50.15,
    :median_price => 128.875,
       :max_price => 199.38
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "55555",
     :sample_size => 169,
       :min_price => 50.73,
    :median_price => 125.67,
       :max_price => 199.86
         :website => "www.example.com",
        :zip_code => "55811",
             :sku => "11111",
     :sample_size => 174,
       :min_price => 53.09,
    :median_price => 130.97,
       :max_price => 199.65
         :website => "www.example.com",
        :zip_code => "55802",
             :sku => "11111",
     :sample_size => 159,
       :min_price => 50.66,
    :median_price => 125.34,
       :max_price => 199.94

Write records in batches

Collecting my records in a destination before actually doing something with them reminded me of a move I do for batching. This is helpful when posting data to an API, or writing to a DB. I won’t do a full example, but consider this destination:

class SqlServerDestination
  attr_reader :batch_size

  def initialize(dataset:, batch_size: 1_000)
    @dataset = dataset
    @batch_size = batch_size

    @rows = []

  def write(row)
    @rows << row
    flush if @rows.size >= batch_size

  def close

  def flush
    @rows = []

Whew. That’s basically every move I know.