Im还挺新斯卡拉和Akka流和我试图获得JSON串消息从一个websocket,并推动他们以一种卡夫卡的主题。
现在我只有工作"得到的消息从一个ws"的一部分。
消息来自的看起来是这样的:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
我想要分割这个JSON消息,多个消息:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
然后按每个这些信息卡夫卡的主题。
这是我迄今为止取得:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
它的工作即时得到所期望输出的Json消息,但我在想,如果我可以写这个制作人在一个更加"Akka上下的"风格,使用GraphDSL. 所以我有几个问题:
- 它是可能持续消耗一个WebSocket使用GraphDSL? 如果是的话,你可以给我一个例子吗?
- 这是一个好的想法,以消耗WS使用GraphDSL?
- 我应该分解收到的Json这样的消息im做之前发送给卡夫卡? 或者它是更好地发送它,因为它是对于较低的延迟?
- 后产生的消息为卡夫卡,我是规划消费的使用Apache风暴,这是一个好主意吗? 或者我应该坚持Akka?
谢谢你读我的问候, 传统的