scala circe 라이브러리를 사용하다 아래와 같은 에러를 만났다.

could not find implicit value for parameter encoder: io.circe.Encoder[Message]



대충 코드는 이렇다. Message case class를 json으로 변경하는 것이다.

object MessageType extends Enumeration {
type MessageType = Value
val DEL = Value
val INSERT = Value
}

case class Message(
val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, Object],
val createdAt: String
)


scala enumeration은 아래와 같이 처리해 주었는데.. 역시 동일한 에러이다. 

implicit val decoder: Decoder[MessageType.Value] = Decoder.enumDecoder(MessageType)
implicit val encoder: Encoder[MessageType.Value] = Encoder.enumEncoder(MessageType)



혹시나 circe는  Map의 value type을 중요하게 본다. 역시 에러다

case class Message(

val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, Any],
val createdAt: String
)



아래와 같이 value type을 String으로 변경하니 동작한다

case class Message(
val version: String,
val pipelineType: MessageType.Value,
val headers: Map[String, String],
val createdAt: String
)



경험해보니 최대한 간결한 패턴의 case class를 써야 circe가 잘 동작한다. 




Posted by '김용환'
,


보통 소켓을 다루는 간단한 자바 애플리케이션 예시의 경우, socket을 close하지 않아도 자연스럽게 정리된다.


공식 RabbitMQ 자바 Client을 사용할 때 

publish 코드에서 사용하는 connection을 종료하지 않으면 계속 hang된다.



val connection = connectionFactory.newConnection
val channel = connection.createChannel
channel.exchangeDeclare(exchange, builtinExchangeType, false)
messages.foreach { message =>
channel.basicPublish(exchange, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}


jstack을 통해  비동기 쓰레드 폴링을 확인할 수 있다.

"Process Proxy: RabbitPublisher" #473 prio=6 os_prio=31 tid=0x00007f854bd53000 nid=0x1572b runnable [0x000070000cde6000]

   java.lang.Thread.State: RUNNABLE

at sun.nio.ch.KQueue.keventPoll(Native Method)

at sun.nio.ch.KQueuePort$EventHandlerTask.poll(KQueuePort.java:196)

at sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:276)

at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)



항상 자원을 close를 처리할 필요가 있다.

val connection = connectionFactory.newConnection
val channel = connection.createChannel
channel.exchangeDeclare(exchange, builtinExchangeType, false)
messages.foreach { message =>
channel.basicPublish(exchange, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
channel.close()
connection.close()


Posted by '김용환'
,