Concatinating两个流动Akka流

0

的问题

我想concat两个流动和我不能解释输出我的执行情况。

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

我希望以下的产出从这个代码。

2
3
4
.
.
.
11
10
20
.
.
.
100

相反,我看到的只有"2"正在印制。 能否请你解释什么是错误的,在我implmentation和我应该如何改变程序,以获得所需的产出。

akka akka-stream scala
2021-10-21 17:29:00
2

最好的答案

3

从Akka流的API文档:

Concat:

发射时的电流流经的一个元素;如果目前的输入完成,它试图在下一个

Broadcast:

发射时所有的产出停止backpressuring和有输入单元的可用

这两个运营商不会的工作结合为存在冲突中如何,他们的工作-- Concat 试图把所有的元素中的一个 Broadcast's输出之前转换到另一个,而 Broadcast 不会这些的除非有需求的所有其产出。

你需要什么,可以合并使用 concat 作为建议的评论:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

或者,使用 Source.combine 如下:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

使用 GraphDSL,这是一个简化版本的执行情况的 来源。结合:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................