Akka流不断地消耗websocket

0

的问题

Im还挺新斯卡拉和Akka流和我试图获得JSON串消息从一个websocket,并推动他们以一种卡夫卡的主题。

现在我只有工作"得到的消息从一个ws"的一部分。

消息来自的看起来是这样的:

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

我想要分割这个JSON消息,多个消息:

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

然后按每个这些信息卡夫卡的主题。

这是我迄今为止取得:

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

它的工作即时得到所期望输出的Json消息,但我在想,如果我可以写这个制作人在一个更加"Akka上下的"风格,使用GraphDSL. 所以我有几个问题:

  • 它是可能持续消耗一个WebSocket使用GraphDSL? 如果是的话,你可以给我一个例子吗?
  • 这是一个好的想法,以消耗WS使用GraphDSL?
  • 我应该分解收到的Json这样的消息im做之前发送给卡夫卡? 或者它是更好地发送它,因为它是对于较低的延迟?
  • 后产生的消息为卡夫卡,我是规划消费的使用Apache风暴,这是一个好主意吗? 或者我应该坚持Akka?

谢谢你读我的问候, 传统的

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

最好的答案

1

代码是多Akka上下: scaladsl 只是为Akka作 GraphDSL 或实现一个自定义 GraphStage. 的唯一原因,海事组织/E,去 GraphDSL 是如果实际形的曲线图并不是容易地表达在 scaladsl.

我会亲自去路线的定义 CoinPrice 类做的模型是明确的

case class CoinPrice(coin: String, price: BigDecimal)

然后有一个 Flow[Message, CoinPrice, NotUsed] 其分析1来的消息成为零或者更多 CoinPrices. 东西(使用玩JSON在这里),如:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

你可能,取决于什么尺寸的JSONs的消息是,要想打破这一成不同的流阶段,以允许对一个异步之间的边界id分析和提取到 CoinPrices. 例如,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

在上述,本阶段的任一侧 async 边界将执行在独立的行动者和因此,可能同时(如果有足够的CPU核可等等), 在成本的额外开销的行动者协调和交换信息。 额外的协调/通信的开销(cf。 Gunther的普遍可扩展性法律)是仅仅是值得的,如果星都足够大的和即将在充分迅速(持续进来之前一个已完成的处理)。

如果你的目的是要消费的,直到程序停止,你可能会发现它只使用更清晰的 Source.never[Message].

2021-11-21 12:42:30

谢谢你的回答,这是非常清楚,我只有一个问题寿。 我怎么能打破我的响应为不同的流阶段。 你可以你只要给我一个小例子吗? 或定向我的适当文件的一部分?
Arès

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................