kafka官方golang库?

Redis、Kafka或RabbitMQ:哪个作为微服务消息代理最合适?

将异步通信用于微服务的场合,通常使用消息代理(Message Broker)。消息代理确保不同微服务之间的通信可靠稳定,保证消息在系统内得到管理和监视,并且消息不会被丢失。

开发者可以选择的一些消息代理有很多,它们的规模和数据功能各不相同。本篇文章将比较三种最受欢迎的消息代理:RabbitMQ,Kafka与Redis。

首先让我们了解微服务通信。

在微服务之间有常见的两种通信方式:同步与异步。

在同步通信中,调用方在发送下一条消息之前等待响应,并且它作为HTTP之上的REST协议运行。相反,在异步通信中,无需等待响应即可发送消息。这适用于分布式系统,通常需要消息代理来管理消息。

你选择的通信类型应考虑不同的参数,例如微服务的结构方式,适当的基础架构,延迟,规模,依赖关系以及通信目的。异步通信的建立可能会更加复杂,并且需要添加更多组件才能堆叠,但是将异步通信用于微服务的好处远大于缺点。

首先根据定义,异步通信是非阻塞的;第二,它也比同步操作支持更好的缩放;第三,在微服务崩溃的情况下,异步通信机制提供了各种恢复技术,通常更擅长处理与崩溃有关的错误。

另外,当使用代理而不是REST协议时,接收通信的服务实际上并不需要彼此了解。在旧的服务运行了很长时间之后,甚至可以引入新的服尺誉务,即能做到更好的解耦服务。

最后,在选择异步操作时,您将增强将来创建集中发现,监视,负载平衡甚至策略执行器的能力。这将为您提供在代码和系统构建中具有灵活性,可伸缩性和更多功能的功能。

异步通信通常通过消息代理进行管理。改掘也有其他方法,例如aysncio,但它们更加稀少和有限。

在选择代理执行异步操作时,应考虑以下几点:

一对一

一对多

我们检查了那里最新和最出色的服务,以找出这三个类别中最强的提供商。

RabbitMQ(AMQP)

规模:根据配置和资源,这里的运行速度约为每秒50K msg。

持久性:支持持久性消息和瞬时消息。

一对一与一对多的消费者:两者都有。

RabbitMQ于2007年发布,是最早创建的常见消息代理之一。它是一个开放源代码,通过实现高级消息队列协议(AMQP)通过点对点和pub-sub方法传递消息。它旨在支持复杂的路由逻辑。

有一些托管服务可让您将其用作SaaS,但它不是本机主要云提供商堆栈的一部分。RabbitMQ支持所有主要语言,包括Python,Java,.NET,PHP,Ruby,JavaScript,Go,Swift等。

在持久模式下,可能会遇到一些性能问题。

kafka

规模:每秒最多可以发送一百万条消息。

持久性:是的。

一对一vs一对多的消费者:只有一对多陵歼段(乍一看似乎很奇怪,对吧?!)。

Kafka曾在Azure,AWS和Confluent上管理SaaS。他们都是Kafka项目的创建者和主要贡献者。Kafka支持所有主要语言,包括Python,Java,C C ++,Clojure,.NET,PHP,Ruby,JavaScript,Go,Swift等。

Redis

规模:每秒最多可以发送一百万条消息。

持久性:基本上不是,它是内存中的数据存储。

一对一与一对多的消费者:两者都有。

Redis与其他消息代理有点不同。Redis的核心是一个内存中的数据存储,可以用作高性能键值存储或消息代理。另一个区别是Redis没有持久性,而是将其内存转储到Disk DB中。它还非常适合实时数据处理。

最初,Redis不是一对一和一对多的。但是,由于Redis 5.0引入了pub-sub,因此功能得到了增强,一对多成为真正的选择。

我们介绍了RabbitMQ,Kafka和Redis的一些特征。这三种动物都是它们的类别,但是如上所述,它们的运行方式大不相同。这是我们建议正确的消息代理根据不同用例使用的建议。

短命消息:Redis

Redis的内存数据库几乎适用于不需要持久性的消息短暂的用例。因为Redis提供了非常快速的服务和内存功能,所以它是短保留消息的理想选择,在这些消息中持久性不是很重要,您可以容忍一些丢失。随着5.0中Redis流的发布,它也成为了一对多用例的候选者,由于局限性和旧的pub-sub功能,绝对需要使用它。

大量数据:Kafka

Kafka是一个高吞吐量的分布式队列,用于长时间存储大量数据。对于需要持久性的一对多用例,Kafka是理想的选择。

复杂路由:RabbitMQ

RabbitMQ是一个较老但很成熟的代理,具有许多支持复杂路由的功能。当所需速率不高(超过数万msg sec)时,它甚至将支持复杂的路由通信。

考虑您的软件堆栈

当然,最后要考虑的是你当前的软件堆栈。如果你正在寻找一个相对简单的集成过程,并且不想在堆栈中维护其他代理,那么你可能更倾向于使用已由堆栈支持的代理。

例如,如果你在RabbitMQ之上的系统中使用Celery for Task Queue,那么您会获得与RabbitMQ或Redis一起使用的动力,而不是不支持Kafka且需要进行一些重写的Kafka。

我们通过平台的发展和壮大使用了以上所有内容,然后再进行一些使用!重要的是要记住,每种工具都有自己的优点和缺点,这与了解它们并为工作以及特定的时机,情况和要求选择合适的工具有关。

kafka官方golang库?  第1张

Golang kafka简述和操作(sarama同步异步和消费组)

一、Kafka简述

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达简并不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三尘带方库拦兄迹

github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

3.3. 消费者

单个消费者

funcconsumer(){varwg sync.WaitGroup  consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{      fmt.Println("Failed to start consumer: %s", err)return}  partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{      fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {      pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{        fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}      wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))        }deferpc.AsyncClose()        wg.Done()      }(pc)  }  wg.Wait()}funcmain(){  consumer()}

消费组

funcconsumerCluster(){  groupID :="group-1"config := cluster.NewConfig()  config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second  config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{      glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){      errors := c.Errors()      noti := c.Notifications()for{select{caseerr := -errors:            glog.Errorln(err)case-noti:        }      }  }(c)formsg :=rangec.Messages() {      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))      c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){  config := sarama.NewConfig()  config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}  msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")  client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{      fmt.Println("producer close err, ", err)return}deferclient.Close()  pid, offset, err := client.SendMessage(msg)iferr !=nil{      fmt.Println("send message failed, ", err)return}  fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

异步生产者

funcasyncProducer(){  config := sarama.NewConfig()  config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second  p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){      errors := p.Errors()      success := p.Successes()for{select{caseerr := -errors:iferr !=nil{              glog.Errorln(err)            }case-success:        }      }  }(p)for{      v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))      fmt.Fprintln(os.Stdout, v)      msg := sarama.ProducerMessage{        Topic: topics,        Value: sarama.ByteEncoder(v),      }      p.Input() - msg      time.Sleep(time.Second *1)  }}funcmain(){goasyncProducer()select{      }}

3.5. 结果展示-

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:Hello World!

异步生产打印:

async:7272async:7616async:998

消费打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

docker 配置 kafka+zookeeper,golang接入示例

配置zookeeper 使用kafka/bin/下自带的zk

运行 报错 卒。配置低了

docker-compose.yml

报错

换云搬瓦工的机器试一下滑颂粗

但是docker ps -a 发现只有zookeeper启动了,kafka失败, 检查日志 发现kafka运行需要java环境,而且对内存有要求,搬瓦工的vps不足够

因此修改docker-compose.yml 加入以下

stop 再启动

完美

测试

进入容器

查看已信镇经建好的topic (docker-compose.yml)

发送樱碧消息

接收消息

接下来是golang接入kafka了

运行

聊聊golang的zap的ZapKafkaWriter

本文主要研羡芹究一下golang的zap的ZapKafkaWriter

WriteSyncer内嵌了io.Writer接兄李毕口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;扰搏ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。

一次golang sarama kafka内存占用大的排查经历

环境:

现象:golang微服务内存占用超过镇信物1G,查看日坦明志发现大量kafka相关错误日志,继而查看kafka集群,其中一个kafka节点容器挂掉了。

疑问 为什么kafka集群只有一个broker挂了,客户端就大量报错呢

通过beego admin页面获取 mem-1.memprof

可以看到调用栈为 withRecover backgroundMetadataUpdataer refreshMeaatdata RefreshMetada tryRefreshMetadata ...

sarama-cluster: NewClient

为什么kafka集群只有一个broker,但是NewClient确失败了?

在kafka容器里查看topic, 发现Replicas和Isr只有一个,找御液到kafka官方配置说明,自动生成的topic需要配置default.replication.factor这个参数,才会生成3副本。

以上内容为新媒号(sinv.com.cn)为大家提供!新媒号,坚持更新大家所需的互联网后端知识。希望您喜欢!

版权申明:新媒号所有作品(图文、音视频)均由用户自行上传分享,仅供网友学习交流,不声明或保证其内容的正确性,如发现本站有涉嫌抄袭侵权/违法违规的内容。请发送邮件至 k2#88.com(替换@) 举报,一经查实,本站将立刻删除。

(0)
上一篇 2023-09-23 14:26
下一篇 2023-09-23 14:26

相关推荐

发表回复

登录后才能评论