Splitter
Time for my splitter. Which will be even more complex.
It will need a consumer that reads from transactions
. Then will load the transactions into a case class. Finally based on the transactionType
will use one of two Producers to write the content to either the withdrawals
or deposits
topics.
Case Class Transaction
case class Transaction(name: String, email: String, transactionType: String, amount: Double)
I will want to chop up the CSV string and load the data into a Scala case class.
Get Property Functions
Have to configure Kafka.
def getConsumerProperties(): Properties = {
val props: Properties = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "withdrawal-consumer")
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props
}
def getProducerProperties(): Properties = {
val props: Properties = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.setProperty(ProducerConfig.ACKS_CONFIG, "1")
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props
}
Main Method
Again the main method is handling a lot. I’ll break it down.
def main(args: Array[String]): Unit = {
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](getConsumerProperties())
consumer.subscribe(util.Arrays.asList("transactions"))
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](getProducerProperties())
val depositProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](getProducerProperties())
while(true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(5000))
records.forEach((record) => {
val stringTransaction = record.value()
val listTransaction = stringTransaction.split(",")
val transaction: Transaction = Transaction(listTransaction(0), listTransaction(1),
listTransaction(2), listTransaction(3).toDouble)
if (transaction.transactionType == "withdrawl") {
// println(transaction)
val producerRecord: ProducerRecord[String, String] = new ProducerRecord[String, String]("withdrawals",
f"${transaction.name},${transaction.email},${transaction.transactionType},${transaction.amount}")
producer.send(producerRecord)
}
else {
val producerRecord: ProducerRecord[String, String] = new ProducerRecord[String, String]("deposits",
f"${transaction.name},${transaction.email},${transaction.transactionType},${transaction.amount}")
depositProducer.send(producerRecord)
}
})
}
}
Consumer
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](getConsumerProperties())
consumer.subscribe(util.Arrays.asList("transactions"))
Creating the consumer and subscribing to the topic.
Producers
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](getProducerProperties())
val depositProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](getProducerProperties())
Infinitely Loop
while(true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(5000))
Every 5 seconds grab all of the records out of the consumer.
Foreach
records.forEach((record) => {
val stringTransaction = record.value()
Read each record
Split CSV String Record into List
val listTransaction = stringTransaction.split(",")
Convert to Case Class
val transaction: Transaction = Transaction(listTransaction(0), listTransaction(1),
listTransaction(2), listTransaction(3).toDouble)
Determine Transaction Type and Write to Appropriate Topic
if (transaction.transactionType == "withdrawl") {
val producerRecord: ProducerRecord[String, String] = new ProducerRecord[String, String]("withdrawals",
f"${transaction.name},${transaction.email},${transaction.transactionType},${transaction.amount}")
producer.send(producerRecord)
}
else {
val producerRecord: ProducerRecord[String, String] = new ProducerRecord[String, String]("deposits",
f"${transaction.name},${transaction.email},${transaction.transactionType},${transaction.amount}")
depositProducer.send(producerRecord)
}
Let’s try this whole thing out.
Validations
To truly see it working we will need to have both Scala main methods running in their own threads. We will also need to be watching all three topics with data going into them.
This should be fun and good use of tmux
to watch the fireworks.
Before turning it on, this is what it looks like:
- Deposits in the left screen
- Transactions in the middle screen
- Withdrawals in the right screen
And we’re off! 60 second gif of what it looks like:
This project made me feel real good. I think I’m getting a handle of Kafka!