Unverified Commit 117732c5 authored by Bennett Piater's avatar Bennett Piater
Browse files

add consumer

parent 7ae13130
const { Kafka, logLevel } = require('kafkajs')
const request = require('request')
function saveToCouchDB(msg) {
const url = "https://2ce3adea-3579-4bbf-a74b-bfdc057c2806-bluemix.cloudant.com/sensors"
request.post({
url: url,
json: true,
body: msg,
auth: {
user: 'heyedgivisaguseeparrywhi',
pass: '40868bfdce1b6046a07044f3dd51b43afad3d951',
sendImmediately: true
}
})
}
async function main({id}) {
const kafka = new Kafka({
clientId: 'consumer',
brokers: ['ark-01.srvs.cloudkafka.com:9094', 'ark-02.srvs.cloudkafka.com:9094', 'ark-03.srvs.cloudkafka.com:9094'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: 'mra5q18k',
password: 'SddBJrfRgVWkWYCYhaw-J_5pDDDamZsS'
}
});
const consumer = kafka.consumer({groupId: 'mra5q18k-consumer'})
await consumer.connect()
await consumer.subscribe({topic: 'mra5q18k-sensors', fromBeginning: true})
return new Promise((resolve, reject) => {
consumer.run({
eachMessage: ({message}) => {
consumer.disconnect();
msg = JSON.parse(message.value.toString());
msg.consumer = id;
saveToCouchDB(msg);
console.log(msg);
resolve(); // return after one read
},
})
});
}
exports.main = main;
{
"name": "producer",
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"kafkajs": {
"version": "1.4.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-1.4.4.tgz",
"integrity": "sha512-f+cYW9QSkCsThY5qlzAKTedF9WpyNq+mzGZF/4lx2OJwqFkOHyMaqXgp2ILcIJZ+8O7fo7uqxuH9ODoJqy5BfQ==",
"requires": {
"long": "^4.0.0"
},
"dependencies": {
"long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
"integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA=="
}
}
}
}
}
{
"name": "producer",
"main": "index.js",
"dependencies": {
"kafkajs": "*"
}
}
......@@ -10,3 +10,9 @@ for i in {0..1}; do
$wsk -i action update producer$i $(dirname $0)/producer.zip --param id $i --kind nodejs:8
done
pushd $(dirname $0)/consumer
zip -ru ../consumer.zip * || true
popd
for i in {0..1}; do
$wsk -i action update consumer$i $(dirname $0)/consumer.zip --param id $i --kind nodejs:8
done
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment