nodejs连接kafka踩坑记
nodejs 连接kafka踩坑记
背景
由于所在的架构团队中很多后台应用使用的nodejs写的代码,现在要把nodejs的日志全部记入到公司的kafka中,运维团队只分配给我无一个topic,然后让我们通过结构逻辑去区分。
需求:
- 因我的应用全是容器部署,故希望通过不改代码,将容器控制台日志直接输出到kafka
 - 如果不改代码不可行,则通过改代码的方式解决
 
kafka-server:
版本号:3.2.0
加密协议:SASL_PLAINTEXT
加密算法:SCRAM-SHA-256
收集容器日志输出到kafka
使用阿里开源的log-ploit来收集容器日志,然后输出到kafka
docker-compose:
version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  pilot:
    image: hz-log-pilot:1.0.3
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /:/host
    privileged: true
    environment:
      - LOGGING_OUTPUT=kafka
      - KAFKA_BROKERS=test-kafka-idc-1.xxx.com:9092
      - KAFKA_VERSION=0.11.0
      - KAFKA_USERNAME=admin
      - KAFKA_PASSWORD=xxx
      - KAFKA_MECHANISM=SCRAM-SHA-256
      - KAFKA_SSL=true
      - KAFKA_PROTOCOL=PLAIN
    labels:
      aliyun.global: true
  hello-node:
    image: hello-node:1.0
    ports:
   - "9003:8888"
     vironment:
        - aliyun_logs_topic-devops=stdout
          aliyun_logs_topic-devops_format=json
             - aliyun_logs_topic-devops_tags=appId=backend,appName=test-app
经过测试发现这个在我厂的kafka服务端的配置中,是无法将日志发送过去的,但在我自己搭建的kafka服务器上可以正常输出到kafka。
后面经过一通查找,发现ploit这个组件底层使用的filebeat是不支持SCRAM-SHA-256的加密的,气死。。。。。
写代码方案-选组件
必应了一下,关键字: nodejs kafka
发现很多人都推荐使用kafkajs来对接nodejs。
https://kafka.js.org/docs/getting-started
https://github.com/tulios/kafkajs
然后写了个test:
kafkajs
kafka.test-spec.ts:
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
    // clientId: 'devops',
    logLevel: 5,
    brokers: ['10.105.141.164:49171'],
    authenticationTimeout: 1000,
    reauthenticationThreshold: 10000,
    sasl: {
        mechanism: "SCRAM-SHA-256",
        username: "admin",
        password: "xxxx"
    }
});
const topic = 'test1'
const producer = kafka.producer()
const sendMessage = async () => {
    await producer.connect()
    return producer
        .send({
            topic,
            compression: Kafka.CompressionTypes.None,
            messages: [
                {topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
            ],
        })
        .then(console.log)
        .catch(e => console.error(`[example/producer] ${e.message}`, e))
}
describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success', async function (done) {
                await sendMessage();
            done();
        },4000);
    });
});
webstorm(集成nodejs单元测试)运行一下,发现无法连接:

原因是,因为kafka-server端使用的加密协议是:SASL_PLAINTEXT,但kafkajs只支持plain协议,所有无法连接。
再加上kafka-server端使用的版本是:2.3.0,虽然kafkajs说支持kafka的版本是0.11+,有可能2.x还不支持。
后面又看到说kafka-node也挻火,所以又写了个test。
kafka-node
https://www.npmjs.com/package/kafka-node/v/5.0.0#producer
Kafka-node is a Node.js client for Apache Kafka 0.9 and later.
https://github.com/SOHU-Co/kafka-node
是soho的库,说是支持0.9之后的版本,上代码:
var assert = require('assert');
var expect = require('chai').expect;
var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.KafkaClient({kafkaHost: 'test-kafka-idc-1.xxx.com:9092', connectTimeout: 3000, sasl:{mechanism: 'scram-sha-256', username: 'admin', password: 'xxx'} }),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message');
var producerOption = {
    requireAcks: 1,
    ackTimeoutMs: 100,
    partitionerType: 0 //默认为第一个分区
};
 producer = new Producer(client,producerOption);
function getPayloads(){
    return [
        {topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
    ];
}
describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success',  function (done) {
            producer.on('ready', async function () {
                producer.send(getPayloads(), function (err, data) {
                    if (err) {
                        console.log('[kafka-producer -> test1]: broker update failed');
                    } else {
                        console.log('[kafka-producer -> test1]: broker update success');
                    }
                    done();
                });
            });
            producer.on('error', function (err) {
                console.log('error:'+err.toString());
                done();
            })
        },6000);
    });
});
// 函数实现,参数单位 秒 ;
function wait(second) {
    // execSync 属于同步方法;异步方式请根据需要自行查询 node.js 的 child_process 相关方法;
    let ChildProcess_ExecSync = require('child_process').execSync;
    ChildProcess_ExecSync('sleep ' + second);
};
还是无法连接,报连接超时:

在他github上发现,已经两年没更新了,看来是不维护了,估计还是不支持那个协议,或者是客户端版本不匹配问题。
还是上kafka官网看看吧,在官网上看到kafka client端支持:
https://cwiki.apache.org/confluence/display/KAFKA/Clients
发现了nodejs的支持的库,官方推荐排在第一的是:node-rdkafka

虽然kafka-node排第二,但是貌似不支持我厂kafka服务端的协议配置。
node-rdkafka
https://github.com/Blizzard/node-rdkafka
同样写个test, rd-kafka.test-spec.ts:
var assert = require('assert');
var expect = require('chai').expect;
const Kafka = require('node-rdkafka')
const ERR_TOPIC_ALREADY_EXISTS = 36;
const config = {
    'bootstrap.servers': 'test-kafka-idc-1.xxx.com:9092',
    'sasl.username': 'admin',
    'sasl.password': 'xxx',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'SCRAM-SHA-256'
};
const topic = 'topic-devops';
function getPayloads(){
    return [
        {topic:topic,messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
    ];
}
describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success', async function (done) {
            await produceExample();
        },6000);
    });
});
function ensureTopicExists() {
    const adminClient = Kafka.AdminClient.create(config);
    return new Promise((resolve, reject) => {
        adminClient.createTopic({
            topic: topic,
            num_partitions: 1,
            replication_factor: 3
        }, (err) => {
            if (!err) {
                console.log(`Created topic ${config.topic}`);
                return resolve();
            }
            if (err.code === ERR_TOPIC_ALREADY_EXISTS) {
                return resolve();
            }
            return reject(err);
        });
    });
}
async function produceExample() {
    // await ensureTopicExists();
    const producer = await createProducer((err, report) => {
        if (err) {
            console.warn('Error producing', err)
        } else {
            const {topic, partition, value} = report;
            console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);
        }
    });
    for (let idx = 0; idx < 10; ++idx) {
        const key = 'alice';
        const value = Buffer.from(JSON.stringify({ name:key+idx,count: idx }));
        console.log(`Producing record ${key}\t${value}`);
        producer.produce(topic, -1, value, key);
    }
    producer.flush(10000, () => {
        producer.disconnect();
    });
}
produceExample()
    .catch((err) => {
        console.error(`Something went wrong:\n${err}`);
        process.exit(1);
    });
function createProducer(onDeliveryReport) {
    let proConfig = config;
    proConfig['dr_msg_cb']=true;
    const producer = new Kafka.Producer(proConfig);
    return new Promise((resolve, reject) => {
        producer
            .on('ready', () => resolve(producer))
            .on('delivery-report', onDeliveryReport)
            .on('event.error', (err) => {
                console.warn('event.error', err);
                reject(err);
            });
        producer.connect();
    });
}
webstorm执行一下:

发现已经发送成功,到后台查下,发现已经发过去了,太棒了。。。。。。
这个问题之前已经困扰了我差不多一周时间了,特此记录下来,方便后面的同学参考。。。。。。
https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/nodejs.html

