kafka broker id

根据Kafka官方文档 (见下文)

Broker Node Registry

/brokers/ids/[0...N] --> host:port (ephemeral node)

This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.

Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).

每个broker的信息是保存在zookeeper的/brokers/ids/[id]中的, 其值是该broker的一些信息, 例如

{
    "jmx_port": -1,
    "timestamp": "1433071965453",
    "host": "qc-bj2-kafka4",
    "version": 1,
    "port": 9092
}

这里用到的id就是配置文件中需要配置的broker.id

同时, 因为这个/brokers/ids/[id]是一个ephemeral node, 所以可以用它来实现kafka broker的健康检查, 即watch /brokers/ids 节点, 当有子节点的变化的时候则通知相应的服务, 具体可以参见kakfa源码中的kafka.server.KafkaHealthcheck类.

注: kafka的java client已经通过这种方式来自动感知broker的变化了.

这里还有个小问题是默认情况下, kafka broker是对外暴露的是hostname, 这就要求集群中配置好DNS或者每个机器都要维护好/etc/hosts, 比较麻烦, 或者如果通过VPN连接到集群中的话, 也会遇到本地机器无法通过hostname来访问到kafka broker的问题.

这个问题可以通过配置advertised.host.name属性为broker ip来解决

自动分配Broker ID

因为一个Kafka Cluster中的broker id不能够重复, 这就给自动部署引起了很大的麻烦, 因为集群中的每个节点都需要一份不同的配置文件了.

例如, Sematext[^1]的人就提问了, 他们是把所有的服务都封装进一个VM里面了, 这样的话, 部署就很方便了, 直接把VM跑起来就可以, 但是当需要scale out的时候, 问题出来了, 因为VM是同一个, 每个VM实例中的配置文件自然也就相同, 于是kafka cluster跑不起来……

自己动手丰衣足食

不过广大人民的智慧是无穷的, 你kafka不就是要求broker.id是一个小于Integer.MAX_VALUE的不重复自然数么, 啥符合要求呢? IP地址啊,集群中的每个机器的IP肯定是不一样的, 那么去掉每个机器IP地址中间的点, 自然就是在一个不重复的数字了.

不过因为Integer.MAX_VALUE=2147483647, 而IP最大的为255255255255, 比要求的大了两个数量级呢, 还是有一定风险的, 但是因为自己的系统的IP号段分配状况自己知道, 所以很容易就可以判断出这种方案可不可行.

如果IP号段符合的话(也就是所有自己集群中可用的IP地址去掉其中的句点之后的数字都比2147483647)小的话, 那就可以先用一个配置文件去部署kafka, 在启动前再用个脚本去获取IP地址然后去更新配置文件就行了, Puppet, Chef之类的工具也很容易的能够做到这一点.

这个方案的变种还有使用MAC地址之类的, 就不多说了.

KAFKA-1070

需要的人多了, 于是乎, 就有人在Kafka 的JIRA上提了这个改进的需求[^2]

[^1]: Sematext是美国的一个APM服务提供商, 其提供kafka的监控服务
[^2]: 虽然这个JIRA的类型是Bug, 但是我认为实际上应该是Improvement或者New Feature的, 因为Bug的定义是行为和承诺不符, 如果文档中写了kafka会自动分配broker id, 却没有自动分配或者分配了重复的ID, 那么是一个bug, 可是现在kafka的文档中明明写着需要用户自己配置broker.id的属性, 所以并不存在行为和预期不符的情况, 这个JIRA更多的是需要kafka提供一个新的功能来自动分配broker id, 所以应该Improvement或者New Feature.

启用JMX

可以在kafka-run-class.sh的脚本中加入下面两行的方式来启用kafka的JMX支持

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
export JMX_PORT=${JMX_PORT:-9999}                                    

另外, 可以下载MX4j并把其中的mx4j-tools.jar放到kafka的libs目录下, kafka会自动加载的.

MX4j能够提供给我们一种通过Web方式来访问系统的JMX服务, 避免了通过jconsole, jvisualvm的麻烦, 这样我们就能够直接在web端通过标准的JMX服务来查看或者动态调整一些系统参数和指标了.

MX4j不仅在kafka中有所应用, 在其他很多开源项目中也都有被使用, 例如cassandra

因为MX4j提供的是一个web服务, 所以需要制定端口号和绑定的IP

在kafka中, 是通过-Dmx4jport=8082-Dmx4jaddress=0.0.0.0来完成的, 这两个也是kafka中的默认值, 具体实现可以参考kafka源码中的kafka.utils.Mx4jLoader类.

最常用的情景是我们可以通过JMX的方式来动态的调整一个类的log level, 在定位系统问题的时候会很有帮助.

监控

https://github.com/criteo/kafka-ganglia

replica相关的参数

  • default.replication.factor (1)

    The default replication factor for automatically created topics

    默认的副本数

  • replica.lag.time.max.ms (10000)

    follower的最大lag时间,即如果leader在这个时间内都没有收到follower的fetch请求,就认为它dead

    If a follower hasn’t sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead.

  • replica.lag.max.messages (4000)

    最大lag消息数,超过这个消息数,leader会认为该follower dead

    If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead.

  • replica.socket.timeout.ms (30 * 1000)

    The socket timeout for network requests to the leader for replicating data.

  • replica.socket.receive.buffer.bytes (64 * 1024)

    The socket receive buffer for network requests to the leader for replicating data.

  • replica.fetch.max.bytes (1024 * 1024)

    一次fetch request最大可以fetch的数据size

    The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.

  • replica.fetch.wait.max.ms (500)

    fetch request的等待超时

    The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.

  • replica.fetch.min.bytes (1)

    Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.

  • num.replica.fetchers (1)

    follower上用于同步leader数据的线程数

    Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

  • replica.high.watermark.checkpoint.interval.ms (5000)

    checkpoint HW的interval

    The frequency with which each replica saves its high watermark to disk to handle recovery.

Mac 初始化指南

安装xcode command line

xcode-select --install

安装rvm

curl -sSL https://get.rvm.io | bash -s stable   

安装brew

ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"

安装brew cask

brew install caskroom/cask/brew-cask

通过brew安装的工具

brew install ansible
brew install ctags
brew install git
brew install libev
brew install libyaml
brew install readline
brew install vim
brew install ant
brew install curl
brew install gmp4
brew install libevent
brew install maven
brew install redis
brew install wget
brew install autoconf
brew install dos2unix
brew install gnu-sed
brew install libgpg-error
brew install mpfr2
brew install scala
brew install wxmac
brew install automake
brew install ejabberd
brew install go
brew install libksba
brew install openssl
brew install sqlite
brew install xz
brew install boost
brew install erlang
brew install gradle
brew install libmpc08
brew install pcre
brew install ssh-copy-id
brew install brew-cask
brew install gcc46
brew install htop-osx
brew install libpng
brew install pkg-config
brew install thrift
brew install cassandra
brew install gdbm
brew install jpeg
brew install libtiff
brew install ppl011
brew install tmux
brew install cloog-ppl015
brew install gettext
brew install legit
brew install libtool
brew install python3
brew install unixodbc    

通过brew cask安装应用程序

brew cask install adium
brew cask install firefox
brew cask install limechat
brew cask install skype
brew cask install xquartz
brew cask install alfred
brew cask install github
brew cask install mou
brew cask install textmate
brew cask install youdao
brew cask install android-file-transfer
brew cask install google-chrome
brew cask install mplayerx
brew cask install thunder
brew cask install bittorrent-sync
brew cask install onyx
brew cask install vagrant
brew cask install emacs
brew cask install intellij-idea
brew cask install picasa
brew cask install virtualbox
brew cask install evernote
brew cask install libreoffice
brew cask install qq
brew cask install wechat

配置zsh

我使用的是oh-my-zsh的配置

配置vim

vim我选择的是amix的配置

配置tmux

这个基本上就是标准配置了, 只不过改了快捷键

青云,阿里云, 腾讯云内网网速测试

今天使用iperf3来测试了一下现在国内主流的三大IaaS厂商的内网通讯速度, 发现阿里云最慢啊.

注: 这并不是一个非常严谨的测试, 个人不承担相应的责任, 不过自己在考虑使用哪个平台的云服务的时候, 很是一个指标.

我们的服务有很多内网之间通讯的需求, 例如, cassandra数据节点之间的同步, 调用zookeeper, 发送消息到kafka中等等, 都需要使用大量的网络带宽, 所以也比较重视这一点.

具体的测试结果见图片:

阿里云的测试结果

ali

腾讯云的测试结果

qq

青云的测试结果

qing

从图中就可以看出, 阿里云几乎比另外两家慢了一倍, 而青云和腾讯云相比则略胜一筹.

另外, 发现iperf3真是个好东西啊, 用来模拟各种网络情况, 具体见下面的各种配置参数

-u, --udp                 use UDP rather than TCP
-b, --bandwidth #[KMG][/#] target bandwidth in bits/sec
                          (default 1 Mbit/sec for UDP, unlimited for TCP)
                          (optional slash and packet count for burst mode)
-t, --time      #         time in seconds to transmit for (default 10 secs)
-n, --num       #[KMG]    number of bytes to transmit (instead of -t)
-k, --blockcount #[KMG]   number of blocks (packets) to transmit (instead of -t or -n)
-l, --len       #[KMG]    length of buffer to read or write
                          (default 128 KB for TCP, 8 KB for UDP)
-P, --parallel  #         number of parallel client streams to run
-R, --reverse             run in reverse mode (server sends, client receives)
-w, --window    #[KMG]    TCP window size (socket buffer size)
-B, --bind      <host>    bind to a specific interface or multicast address
-C, --linux-congestion <algo>  set TCP congestion control algorithm (Linux only)
-M, --set-mss   #         set TCP maximum segment size (MTU - 40 bytes)
-N, --nodelay             set TCP no delay, disabling Nagle's Algorithm
-4, --version4            only use IPv4
-6, --version6            only use IPv6
-S, --tos N               set the IP 'type of service'
-L, --flowlabel N         set the IPv6 flow label (only supported on Linux)
-Z, --zerocopy            use a 'zero copy' method of sending data
-O, --omit N              omit the first n seconds
-T, --title str           prefix every output line with this string
--get-server-output       get results from server

We're Hiring

环信最新招聘信息

后台工程师

职责描述:

负责环信后台服务器程序的架构设计和开发

任职要求:

  1. 5年以上java开发工作经验,具有服务器开发工作经验者优先;
  2. 深入了解java开发工具及主流开发框架,具有扎实的技术功底,熟悉主流技术架构;
  3. 熟悉REST架构和HTTP协议,以及Nginx等;
  4. 熟悉Cassandra, Kafka,Zookeeper等流行的分布式系统及其架构;
  5. 熟悉TCP/IP协议,熟悉socket和多线程开发,具备高访问量web开发工作经验(10W同时在线或日PV达千万);
  6. 逻辑思维能力强,具有团队意识;
  7. 熟悉linux相关开发优先考虑;
  8. 熟悉ruby, python, bash 等脚本语言优先考虑;
  9. 有开源社区经验者优先考虑
  10. 全栈工程师,DevOps直接录取

前端工程师

职责描述:

负责环信网站, 管理后台, web im sdk等的开发设计

任职要求:

  1. 熟悉流行的前端技术, 包括但不限于bootstrap, html5, css3, saas, less, jquery, bower, grunt
  2. 熟悉AJAX, REST等原理和使用方式
  3. 熟悉HTTP, WebSocket, Spdy等协议
  4. 熟悉Haml, Jade, Slim等模板语言优先考虑
  5. 有设计美感者优先考虑
  6. 深刻理解Web标准,对可用性. 可访问性等相关知识有实际的了解和实践经验;
  7. 熟悉ruby, python, bash, nodejs 等脚本语言优先考虑;
  8. 有开源社区经验者优先考虑
  9. 全栈工程师,DevOps直接录取

运维工程师

职责描述:

  1. 负责网站应用的架构审核,监控,优化,排错.
  2. 能够快速定位故障的位置,原因和提供解决方案.
  3. 给其他工程师提供支持和帮助,促进团队的学习和提高.
  4. 跟踪业内新技术新动态,研究相关应用架构和运维技术,制定运维技术方案.
  5. 研究和建立运维系统,推动运维系统的规范化,标准化,自动化和智能化.
  6. 提升应用的速度,稳定性,可靠性,安全性,降低成本.

任职要求:

  1. 精通Linux以及主要Unix系统及原理,了解网络基本技术,熟悉TCP/IP协议工作原理;
  2. 熟悉nginx, tomcat, cassandra, zookeeper等技术的原理,优化,排错;
  3. 熟悉shell, perl, python, java, php, C++脚本或开发语言两种以上.
  4. 熟悉大型网站架构及优化,分布式系统,大型数据库,缓存,队列,运维系统架构等技术;
  5. 责任心强,积极主动,研究规划能力强,沟通和团队建设能力强,热爱分享.
  6. 有开源社区经验者优先考虑
  7. 全栈工程师,DevOps直接录取