Kiba ETL Patterns and Moves
I’ve been working with Kiba ETL a lot. I love this tool. It doesn’t do much itself, and it’s premise is so simple. It’s a DSL that asks for objects to implement a very simple interface to orchestrate an ETL pipeline. Sources should enumerate data with #each
, transformations are performed by running an object through a #process
method, and destinations get #write
n to. Kiba helps you compose many elements together in a way that is very nice to work with and extend. If you aren’t familiar with it, I’d love to recommend some of Thibaut’s content: https://github.com/thbar/kiba#useful-links
I wanted to jot down some of my moves and patterns for future reference. Each of the examples below are self-contained, and generally use the kiba
and awesome_print
gems.
Since it’s a long post with lots of code, here’s a quick table of contents:
- Making transformations versatile by accepting
input_key:
andoutput_key:
args - A simple dedupe filter
- Using pre-allocated Kiba ETL classes outside of Kiba’s direct supervision
- Write records in batches
Making transformations versatile by accepting input_key:
and output_key:
args
Many transformations work on a specific value, and it’s easy to make your transformer flexible enough to be reused with different datasets. This also gives better compatibility regardless of hash keys being symbols or strings. This KeyGenerator
has been useful in many different projects.
require 'digest'
class KeyGenerator
attr_reader :input_keys, :output_key, :digest
# input_keys: an array of fields to watch the values from
# output_key: Where to store the calculated result
# digest: Set this to true if you want to hash the values.
# This is helpful if you have many fields, or they aren't meaningful.
#
def initialize(input_keys: [], output_key: :key, digest: false)
@input_keys = input_keys
@output_key = output_key
@digest = digest
end
def process(row)
key = row.values_at(*input_keys)
.map{|val| remove_insignificant_chars(val)}
.join(':')
key = Digest::MD5.hexdigest(key) if digest
row[output_key] = key
row
end
# Normalize data by removing anything that isn't [a-z0-9].
def remove_insignificant_chars(val)
val.to_s.downcase.gsub(/[^a-z0-9]/, '')
end
end
class FakeData
def each
100.times.each do
yield({
website: ['www.example.com', ' www.example.com! '].sample,
zip_code: %w(55802 90210).sample,
brand: 'Tireco',
sku: %w(19953 16516).sample,
mpc: %w(123123123 456456456).sample,
section: 205,
aspect: 50,
rim: 16,
load: [nil, 87].sample,
speed: [nil, 'W'].sample,
ply: [nil, 10].sample,
sidewall: [nil, 'BL'].sample
})
end
end
end
require 'kiba'
require 'awesome_print'
Kiba.run(
Kiba.parse do
source FakeData
transform KeyGenerator,
input_keys: [:website, :zip_code],
output_key: :location_key
transform KeyGenerator,
input_keys: [:brand, :mpc],
output_key: :product_key
transform KeyGenerator,
input_keys: [:website, :sku],
output_key: :retailer_product_key
transform KeyGenerator,
input_keys: [:retailer_product_key, :section, :aspect, :rim, :load, :speed, :ply, :sidewall],
digest: true
transform {|row| ap row}
end
)
{
:website => " www.example.com! ",
:zip_code => "90210",
:brand => "Tireco",
:sku => "16516",
:mpc => "456456456",
:section => 205,
:aspect => 50,
:rim => 16,
:load => 87,
:speed => nil,
:ply => 10,
:sidewall => "BL",
:location_key => "wwwexamplecom:90210",
:product_key => "tireco:456456456",
:retailer_product_key => "wwwexamplecom:16516",
:key => "187291fc8d24640cc83d0bb463df4528"
}
{
:website => "www.example.com",
:zip_code => "90210",
:brand => "Tireco",
:sku => "19953",
:mpc => "456456456",
:section => 205,
:aspect => 50,
:rim => 16,
:load => 87,
:speed => nil,
:ply => nil,
:sidewall => nil,
:location_key => "wwwexamplecom:90210",
:product_key => "tireco:456456456",
:retailer_product_key => "wwwexamplecom:19953",
:key => "6f9b364254a06cc06d99910520a58bdb"
}
A simple dedupe filter
Ruby’s Set
makes quick work of this. We can check if we are allowed to add a new value. If not, we must have already seen it.
It’s easy to create a dedupliation filter because Ruby evaluates hash and array equality by their contained values. Sweet!
[1, 'Thing'] == [1, 'Thing'] #=> True
Under the hood, this calls object#hash
which returns an Integer. We can use this instead of the actual values to save space when working with large strings or many objects.
Note: The value returned by #hash
is not consistent between runs of your application, so it’s not a good key
like when we used MD5 above.
require 'set'
class DedupeFilter
attr_reader :keys
def initialize(keys: [])
@keys = keys
@seen = Set.new
end
# If we are allowed to add the rows "value" to the set, return the row.
# Otherwise return nil (which drops it from the pipeline).
#
def process(row)
@seen.add?(row.values_at(*keys).hash) ? row : nil
end
end
class FakeData
def each
1000.times.each do
yield({
name: %w(Buddy Pal Guy Dude Bro).sample,
})
end
end
end
require 'kiba'
Kiba.run(
Kiba.parse do
source FakeData
transform DedupeFilter, keys: [:name]
transform {|row| puts row}
end
)
{:name=>"Bro"}
{:name=>"Guy"}
{:name=>"Buddy"}
{:name=>"Pal"}
{:name=>"Dude"}
Using pre-allocated Kiba ETL classes outside of Kiba’s direct supervision
The simplicity of Kiba comes from it’s simple, generic workflow. A Ruby class just needs to respond to one of three methods to perform the expected role of a source, transformation or destination. They don’t actually do anything other than direct traffic. Because of this, these simple objects are just as useful outside of Kiba’s ETL declarations. You can instantiate any of these same objects and use them in new ways, even if you’re not using Kiba at all!
The following examples involve maintaining a reference to an ETL class in your job, instead of letting Kiba manage it’s lifecycle:
Composing Transformations
Since a transformation class is simply an object that responds to the #process
message by taking a row input, and returning a row output, they are easy to use outside of the main Kiba transform
pipeline.
Here’s an example that composes multiple transformations into a single class transformation using delegation.
# Transformation to Violently convert all string values to #upcase.
#
# Maybe we should be using the input_key move above? Oh well.
#
class Upcaser
def process(row)
row.transform_values{|val| val.upcase rescue val}
end
end
# Transformation to violently stuff things between all string value chars.
# (why???????)
#
class Spacer
def initialize(space = ' ')
@space = space
end
def process(row)
row.transform_values{|val| val.chars.join(@space) rescue val}
end
end
# Why don't we glue them all together? This series of transformations
# seems like a really common thing I want to do...
#
# More importantly, it's really easy to compose Transformations together
# because they all follow the Kiba Transform interface.
#
class FieldSpacer
##
# Forever immortalize this maligned text butcherizer.
# Make it nice and easy for everyone to butcher text with.
#
# Notice that if we _did_ use the input_key move from above,
# we can forward that on down to each of these guys as well.
#
def initialize
@transformers = [
Upcaser.new,
Spacer.new(':'),
Spacer.new
]
end
# "Reduce" the row by applying the list of transformations against it
def process(row)
@transformers.reduce(row){|out, transformer| transformer.process(out)}
end
end
class FakeData
def each
10.times.each do
yield({
name: %w(Buddy Pal Guy Dude Bro).sample,
age: rand(21..100)
})
end
end
end
require 'kiba'
require 'awesome_print'
Kiba.run(
Kiba.parse do
source FakeData
transform FieldSpacer
transform {|row| ap row}
end
)
{
:name => "P : A : L",
:age => 48
}
{
:name => "B : R : O",
:age => 97
}
{
:name => "B : R : O",
:age => 46
}
{
:name => "P : A : L",
:age => 71
}
{
:name => "P : A : L",
:age => 29
}
{
:name => "D : U : D : E",
:age => 76
}
{
:name => "G : U : Y",
:age => 26
}
{
:name => "D : U : D : E",
:age => 74
}
{
:name => "G : U : Y",
:age => 44
}
{
:name => "D : U : D : E",
:age => 76
}
Holding reference to a reporting type transformation
Holding reference to your transformation makes it easy to do more with it whenever you want. Kiba allows you to use either a class or a block transform. The interface means you can also use both at the same time.
In this example, we create a simple package of counters using the typical Kiba #process
method, but the actual reporting from them happens outside of Kiba’s row processing.
class FieldCounter
attr_reader :input_key
attr_accessor :counter
def initialize(input_key)
@input_key = input_key
@counter = Hash.new(0)
end
def process(row)
counter[row[input_key]] += 1
row
end
alias count counter
end
class FakeData
def each
1000.times.each do
yield({
name: %w(Buddy Pal Guy Dude Bro).sample,
age: rand(21..30),
favorite_color: %w(red green blue).sample
})
end
end
end
require 'kiba'
require 'json'
Kiba.run(
Kiba.parse do
# Initialize our transforms in our own scope so that we can use them
# outside of row processing, during post_process.
counters = [:name, :age, :favorite_color].map{|field| FieldCounter.new field}
source FakeData
# A class transform is easy to use inside a block transform
# since they follow Kiba's expected API.
#
# This is also a way to "wrap" a class transform with additional functionality.
#
transform {|row| counters.each {|processor| processor.process(row)}}
# At this point, the Kiba pipeline is fully executed.
# Sources have sourced, transformers have transformed, and destinations have destined.
#
# We can leverage our reference to the transformations to output a simple report at the end.
# Since it's just a class with data, we can do whatever we want with it.
post_process do
puts '-' * 30
puts 'Counter summary:'
puts '-' * 30
counters.each do |counter|
puts counter.input_key
puts JSON.pretty_generate counter.count
end
puts "\n" * 2
puts '-' * 30
puts "We can interact with these however we want. Maybe this should go to a Slack channel?"
puts '-' * 30
puts JSON.pretty_generate(
# Combine the counters into a single hash:
counters.reduce({}){|out, processor| out.merge(processor.input_key => processor.count)}
)
end
end
)
------------------------------
Counter summary:
------------------------------
name
{
"Pal": 163,
"Dude": 221,
"Buddy": 223,
"Guy": 197,
"Bro": 196
}
age
{
"22": 101,
"26": 93,
"24": 87,
"27": 100,
"29": 117,
"23": 104,
"30": 102,
"25": 106,
"28": 106,
"21": 84
}
favorite_color
{
"blue": 331,
"red": 324,
"green": 345
}
------------------------------
We can interact with these however we want. Maybe this should go to a Slack channel?
------------------------------
{
"name": {
"Pal": 163,
"Dude": 221,
"Buddy": 223,
"Guy": 197,
"Bro": 196
},
"age": {
"22": 101,
"26": 93,
"24": 87,
"27": 100,
"29": 117,
"23": 104,
"30": 102,
"25": 106,
"28": 106,
"21": 84
},
"favorite_color": {
"blue": 331,
"red": 324,
"green": 345
}
}
Aggregating data and chaining destinations
This example collects records until all rows have been processed. It then applies an aggregation to the entire dataset, and finally forwards the data as rows to a new destination that we instantiate outside of Kiba’s pipeline.
note: In version 2.5 of Kiba, this example isn’t necessary as Transformation classes can implement a #close
method of their own, and the streaming runner in 2.5 can yield multiple records. This means a transformation can play the part of “hold all of the records until I’ve seen them all, then modify and send them”.
# A destination that collects all of the data before doing a summarization, which triggers a new destination
class ChainableAggregateDestination
# aggregation: A proc that transforms an array of rows
# next_destination: a Kiba Destination that will receive the transformed rows
def initialize(aggregation:, next_destination:)
@aggregation = aggregation
@next_destination = next_destination
@records = []
end
# Temorarily hold the records in an array so we can reprocess them once we've received all records
def write(row)
@records << row
end
# Call the aggregation transformation, and send the resulting data to the next destination
# Additionally, trigger the next_destination's close method, since Kiba will not be directly
# interacting with this object.
def close
@aggregation.call(@records).each {|row| @next_destination.write(row)}
@next_destination.close
end
end
# A common CSV destination class
require 'csv'
class CSVDestination
attr_reader :headers
def initialize(filename: 'output.csv')
@csv = CSV.open(filename, 'w')
end
def write(row)
@csv << @headers = row.keys unless headers
@csv << row.values_at(*headers)
end
def close
@csv.close
end
end
# Some product data
class FakeData
def each
1000.times.each do
yield({
website: 'www.example.com',
zip_code: %w(55802 55811).sample,
sku: %w(11111 33333 55555).sample,
price: rand(50.0..200.0).round(2)
})
end
end
end
require 'kiba'
require 'awesome_print'
job = Kiba.parse do
# A simple helper function to keep the code below cleaner
def median(array)
sorted = array.sort
len = sorted.length
(sorted[(len - 1) / 2] + sorted[len / 2]) / 2.0
end
source FakeData
# We'll save all of the source data in a CSV
destination CSVDestination, filename: 'alldata.csv'
# And also perform an aggregation against all of the data, and save that too.
destination ChainableAggregateDestination,
aggregation: ->(rows){
rows.group_by{|row| row.slice(:website, :zip_code, :sku)}.map do |key, values|
prices = values.map{|row| row[:price]}
key.merge(
sample_size: prices.size,
min_price: prices.min,
median_price: median(prices),
max_price: prices.max,
).tap{|row| ap row}
end
},
next_destination: CSVDestination.new(filename: 'summary_data.csv')
# Notice that we are instantiating our own Destination class, instead of letting Kiba handle it directly
end
Kiba.run(job)
{
:website => "www.example.com",
:zip_code => "55802",
:sku => "33333",
:sample_size => 175,
:min_price => 51.15,
:median_price => 117.88,
:max_price => 199.77
}
{
:website => "www.example.com",
:zip_code => "55811",
:sku => "33333",
:sample_size => 157,
:min_price => 50.76,
:median_price => 130.42,
:max_price => 199.41
}
{
:website => "www.example.com",
:zip_code => "55802",
:sku => "55555",
:sample_size => 166,
:min_price => 50.15,
:median_price => 128.875,
:max_price => 199.38
}
{
:website => "www.example.com",
:zip_code => "55811",
:sku => "55555",
:sample_size => 169,
:min_price => 50.73,
:median_price => 125.67,
:max_price => 199.86
}
{
:website => "www.example.com",
:zip_code => "55811",
:sku => "11111",
:sample_size => 174,
:min_price => 53.09,
:median_price => 130.97,
:max_price => 199.65
}
{
:website => "www.example.com",
:zip_code => "55802",
:sku => "11111",
:sample_size => 159,
:min_price => 50.66,
:median_price => 125.34,
:max_price => 199.94
}
Write records in batches
Collecting my records in a destination before actually doing something with them reminded me of a move I do for batching. This is helpful when posting data to an API, or writing to a DB. I won’t do a full example, but consider this destination:
class SqlServerDestination
attr_reader :batch_size
def initialize(dataset:, batch_size: 1_000)
@dataset = dataset
@batch_size = batch_size
@rows = []
end
def write(row)
@rows << row
flush if @rows.size >= batch_size
end
def close
flush
end
def flush
@dataset.multi_insert(@rows)
@rows = []
end
end
Whew. That’s basically every move I know.