nodejs连接kafka踩坑记
nodejs 连接kafka踩坑记
背景
由于所在的架构团队中很多后台应用使用的nodejs写的代码,现在要把nodejs的日志全部记入到公司的kafka中,运维团队只分配给我无一个topic,然后让我们通过结构逻辑去区分。
需求:
- 因我的应用全是容器部署,故希望通过不改代码,将容器控制台日志直接输出到kafka
- 如果不改代码不可行,则通过改代码的方式解决
kafka-server:
版本号:3.2.0
加密协议:SASL_PLAINTEXT
加密算法:SCRAM-SHA-256
收集容器日志输出到kafka
使用阿里开源的log-ploit来收集容器日志,然后输出到kafka
docker-compose:
version: '3.2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
pilot:
image: hz-log-pilot:1.0.3
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /:/host
privileged: true
environment:
- LOGGING_OUTPUT=kafka
- KAFKA_BROKERS=test-kafka-idc-1.xxx.com:9092
- KAFKA_VERSION=0.11.0
- KAFKA_USERNAME=admin
- KAFKA_PASSWORD=xxx
- KAFKA_MECHANISM=SCRAM-SHA-256
- KAFKA_SSL=true
- KAFKA_PROTOCOL=PLAIN
labels:
aliyun.global: true
hello-node:
image: hello-node:1.0
ports:
- "9003:8888"
vironment:
- aliyun_logs_topic-devops=stdout
aliyun_logs_topic-devops_format=json
- aliyun_logs_topic-devops_tags=appId=backend,appName=test-app
经过测试发现这个在我厂的kafka服务端的配置中,是无法将日志发送过去的,但在我自己搭建的kafka服务器上可以正常输出到kafka。
后面经过一通查找,发现ploit这个组件底层使用的filebeat是不支持SCRAM-SHA-256的加密的,气死。。。。。
写代码方案-选组件
必应了一下,关键字: nodejs kafka
发现很多人都推荐使用kafkajs来对接nodejs。
https://kafka.js.org/docs/getting-started
https://github.com/tulios/kafkajs
然后写了个test:
kafkajs
kafka.test-spec.ts:
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
// clientId: 'devops',
logLevel: 5,
brokers: ['10.105.141.164:49171'],
authenticationTimeout: 1000,
reauthenticationThreshold: 10000,
sasl: {
mechanism: "SCRAM-SHA-256",
username: "admin",
password: "xxxx"
}
});
const topic = 'test1'
const producer = kafka.producer()
const sendMessage = async () => {
await producer.connect()
return producer
.send({
topic,
compression: Kafka.CompressionTypes.None,
messages: [
{topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
],
})
.then(console.log)
.catch(e => console.error(`[example/producer] ${e.message}`, e))
}
describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', async function (done) {
await sendMessage();
done();
},4000);
});
});
webstorm(集成nodejs单元测试)运行一下,发现无法连接:
原因是,因为kafka-server端使用的加密协议是:SASL_PLAINTEXT
,但kafkajs只支持plain协议,所有无法连接。
再加上kafka-server端使用的版本是:2.3.0,虽然kafkajs说支持kafka的版本是0.11+,有可能2.x还不支持。
后面又看到说kafka-node也挻火,所以又写了个test。
kafka-node
https://www.npmjs.com/package/kafka-node/v/5.0.0#producer
Kafka-node is a Node.js client for Apache Kafka 0.9 and later.
https://github.com/SOHU-Co/kafka-node
是soho的库,说是支持0.9之后的版本,上代码:
var assert = require('assert');
var expect = require('chai').expect;
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({kafkaHost: 'test-kafka-idc-1.xxx.com:9092', connectTimeout: 3000, sasl:{mechanism: 'scram-sha-256', username: 'admin', password: 'xxx'} }),
producer = new Producer(client),
km = new KeyedMessage('key', 'message');
var producerOption = {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: 0 //默认为第一个分区
};
producer = new Producer(client,producerOption);
function getPayloads(){
return [
{topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
];
}
describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', function (done) {
producer.on('ready', async function () {
producer.send(getPayloads(), function (err, data) {
if (err) {
console.log('[kafka-producer -> test1]: broker update failed');
} else {
console.log('[kafka-producer -> test1]: broker update success');
}
done();
});
});
producer.on('error', function (err) {
console.log('error:'+err.toString());
done();
})
},6000);
});
});
// 函数实现,参数单位 秒 ;
function wait(second) {
// execSync 属于同步方法;异步方式请根据需要自行查询 node.js 的 child_process 相关方法;
let ChildProcess_ExecSync = require('child_process').execSync;
ChildProcess_ExecSync('sleep ' + second);
};
还是无法连接,报连接超时:
在他github上发现,已经两年没更新了,看来是不维护了,估计还是不支持那个协议,或者是客户端版本不匹配问题。
还是上kafka官网看看吧,在官网上看到kafka client端支持:
https://cwiki.apache.org/confluence/display/KAFKA/Clients
发现了nodejs的支持的库,官方推荐排在第一的是:node-rdkafka
虽然kafka-node排第二,但是貌似不支持我厂kafka服务端的协议配置。
node-rdkafka
https://github.com/Blizzard/node-rdkafka
同样写个test, rd-kafka.test-spec.ts:
var assert = require('assert');
var expect = require('chai').expect;
const Kafka = require('node-rdkafka')
const ERR_TOPIC_ALREADY_EXISTS = 36;
const config = {
'bootstrap.servers': 'test-kafka-idc-1.xxx.com:9092',
'sasl.username': 'admin',
'sasl.password': 'xxx',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256'
};
const topic = 'topic-devops';
function getPayloads(){
return [
{topic:topic,messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
];
}
describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', async function (done) {
await produceExample();
},6000);
});
});
function ensureTopicExists() {
const adminClient = Kafka.AdminClient.create(config);
return new Promise((resolve, reject) => {
adminClient.createTopic({
topic: topic,
num_partitions: 1,
replication_factor: 3
}, (err) => {
if (!err) {
console.log(`Created topic ${config.topic}`);
return resolve();
}
if (err.code === ERR_TOPIC_ALREADY_EXISTS) {
return resolve();
}
return reject(err);
});
});
}
async function produceExample() {
// await ensureTopicExists();
const producer = await createProducer((err, report) => {
if (err) {
console.warn('Error producing', err)
} else {
const {topic, partition, value} = report;
console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);
}
});
for (let idx = 0; idx < 10; ++idx) {
const key = 'alice';
const value = Buffer.from(JSON.stringify({ name:key+idx,count: idx }));
console.log(`Producing record ${key}\t${value}`);
producer.produce(topic, -1, value, key);
}
producer.flush(10000, () => {
producer.disconnect();
});
}
produceExample()
.catch((err) => {
console.error(`Something went wrong:\n${err}`);
process.exit(1);
});
function createProducer(onDeliveryReport) {
let proConfig = config;
proConfig['dr_msg_cb']=true;
const producer = new Kafka.Producer(proConfig);
return new Promise((resolve, reject) => {
producer
.on('ready', () => resolve(producer))
.on('delivery-report', onDeliveryReport)
.on('event.error', (err) => {
console.warn('event.error', err);
reject(err);
});
producer.connect();
});
}
webstorm执行一下:
发现已经发送成功,到后台查下,发现已经发过去了,太棒了。。。。。。
这个问题之前已经困扰了我差不多一周时间了,特此记录下来,方便后面的同学参考。。。。。。
https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/nodejs.html