发/生产json的消息通过卡夫卡

0

的问题

这是我第一次的时间使用卡夫卡和我计划使用卡夫卡。净

我想知道,如果我可以发送JSON作为一个消息时我制造一个事件

我下面的教程: https://developer.confluent.io/get-started/dotnet/#build-producer

此外,有没有办法,值映射到一个模型,以便值/json结构是总是联系在一起的模式

所以,例如:如果我想要我的json值

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

大多数例子,我可以看到的是这样的:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

我想该项目是一个类关系图来描述软件所需的依赖结构。

.net apache-kafka asp.net-core
2021-11-23 21:53:21
1

最好的答案

0

想知道,如果我可以发送JSON作为一个消息时我制造一个事件

是的。 卡夫卡的商店字节和转换为字节使用的序列化程序。 当建立一个生产者,你的呼叫 SetValueSerializer.

一些内在的序列化程序,可以发现在- https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

你会需要写你自己来一般性地处理任何JSON模式类型。

当使用的Utf8Serializer字符串,则需要预先serialize的对象是从你的模型类,然后发送这样的价值。 在你的榜样,你要替换 var item 有一些化的对象。

我怎么打开C#对象入JSON字符串中。净?

当使用的模型类数据通常会被强类型直到你开始手写id或使用的典型类型。 如果你想要的外部信息验证,汇合的模式注册是一个实例,它支持JSONSchema和 JsonSerializerconfluent-dotnet-kafka 项目支持这一点。

2021-11-23 22:27:28

只是一个后续行动的问题。 你知道如果我可以限制消息的大小以及是否有办法对于生产者来检查是什么尺寸的事件之前,发送和不允许发送信息如果的尺寸是多限制?
Learn AspNet

卡夫卡默认限制的1MB消息的批。 如果你的尺寸化byte array,这应该是一个近似值的个人记录的大小(有额外的开销等记录的标题和时间戳,不过)
OneCricketeer

谢谢你。 可以请你回答: stackoverflow.com/questions/70097676/...
Learn AspNet

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................