SNMP-CAMEL-KAFKA - PullRequest
       20

SNMP-CAMEL-KAFKA

0 голосов
/ 22 января 2019

Я ищу ссылку, где можно найти простую программу для отправки SNMP-ловушки в тему Apache Kafka с помощью Apache Camel.

Пожалуйста, помогите мне, если кто-то может объяснить это с помощью простой Java-программы.

Моя конфигурация RouteBuilder

import org.apache.camel.builder.RouteBuilder;

public class SimpleRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {

        String topicName = "topic=first_topic";
        String kafkaServer = "kafka:localhost:9092";
        String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
        String serializerClass = "serializerClass=kafka.serializer.StringEncoder";

        String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
                .append(zooKeeperHost).append("&").append(serializerClass).toString();
    System.out.println(toKafka);

    from("snmp:127.0.0.1:161?protocol=udp&type=POLL&oids=1.3.6.1.2.1.1.5.0").split().tokenize("\n").to(toKafka);
    }
}

Основной метод

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.snmp4j.Snmp;

public class MainApp {

public static void main(String[] args) {
    SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
    CamelContext ctx = new DefaultCamelContext();
    try {
        ctx.addRoutes(routeBuilder);
        ctx.start();
        Thread.sleep(5 * 60 * 1000);
        ctx.stop();
    }
    catch (Exception e) {
        e.printStackTrace();
    }

}
}

1 Ответ

0 голосов
/ 28 января 2019

Я был в неверном направлении. Направление записи следующее:

  1. Создание программы отправителя ловушек.
  2. Создать программу приема / прослушивания ловушек.
  3. Внутри получателя или слушателя Trap, получите trap и отправьте его в тему Apache Kafka через Apache camel.

pom.xml

добавить ниже зависимости -

  1. верблюжий ядро ​​
  2. snmp4j
  3. верблюд-Кафка

Программа отправителя ловушек

package <>;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.snmp4j.*;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.mp.MPv2c;
import org.snmp4j.mp.MPv3;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.*;
import org.snmp4j.smi.*;
import org.snmp4j.transport.DefaultUdpTransportMapping;

import java.util.Date;

public class Trapsender {

public static final String community = "public";

public static final String Oid = ".1.3.6.1.2.1.1.8";
public static final String ipAddress = "127.0.0.1";
public static final int port = 162;

public static void main(String[] args) {
    Trapsender trapv3 = new Trapsender();
    trapv3.sendTrap_Version3();
}

public void sendTrap_Version3() {
    try {
        // Create Transport Mapping
        TransportMapping transport = new DefaultUdpTransportMapping();
        transport.listen();

        // Create Target
        CommunityTarget cTarget = new CommunityTarget();
        cTarget.setCommunity(new OctetString(community));
        cTarget.setVersion(SnmpConstants.version2c);
        cTarget.setAddress(new UdpAddress(ipAddress + "/" + port));
        cTarget.setRetries(2);
        cTarget.setTimeout(10000);

        // Create PDU for V3
        PDU pdu = new PDU();
        pdu.setType(PDU.TRAP);

        // need to specify the system up time
        pdu.add(new VariableBinding(SnmpConstants.sysUpTime, new OctetString(new Date().toString())));
        pdu.add(new VariableBinding(SnmpConstants.snmpTrapOID, new OID(Oid)));
        pdu.add(new VariableBinding(new OID(Oid), new OctetString("Major")));


        // Send the PDU
        Snmp snmp = new Snmp(transport);
        System.out.println("Sending V2 Trap... Check Wheather NMS is Listening or not? ");
        ResponseEvent send = snmp.send(pdu, cTarget);
                  snmp.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Ловушка приемника с верблюдом Apache

 package <>;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.snmp4j.*;
 import org.snmp4j.mp.MPv1;
 import org.snmp4j.mp.MPv2c;
 import org.snmp4j.security.Priv3DES;
 import org.snmp4j.security.SecurityProtocols;
 import org.snmp4j.smi.OctetString;
 import org.snmp4j.smi.TcpAddress;
 import org.snmp4j.smi.TransportIpAddress;
 import org.snmp4j.smi.UdpAddress;
 import org.snmp4j.transport.AbstractTransportMapping;
 import org.snmp4j.transport.DefaultTcpTransportMapping;
 import org.snmp4j.transport.DefaultUdpTransportMapping;
 import org.snmp4j.util.MultiThreadedMessageDispatcher;
 import org.snmp4j.util.ThreadPool;

 import java.io.IOException;

 public class Trapreceiver implements CommandResponder {

public static CamelContext ctx=null;
public static ProducerTemplate producer=null;

public static void main(String[] args) {

   Trapreceiver snmp4jTrapReceiver = new Trapreceiver();
   SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
   ctx = new DefaultCamelContext();
   producer = ctx.createProducerTemplate();
   try {
       ctx.addRoutes(routeBuilder);
       ctx.start();
   }
   catch (Exception e) {
       e.printStackTrace();
   }


  // producer.sendBody("direct:start", snmp);
    try {
        snmp4jTrapReceiver.listen(new UdpAddress("localhost/162"), producer);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

/**
 * Trap Listner
 */
public synchronized void listen(TransportIpAddress address, ProducerTemplate producer)
        throws IOException {
    AbstractTransportMapping transport;
    if (address instanceof TcpAddress) {
        transport = new DefaultTcpTransportMapping((TcpAddress) address);
    } else {
        transport = new DefaultUdpTransportMapping((UdpAddress) address);
    }

    ThreadPool threadPool = ThreadPool.create("DispatcherPool", 10);
    MessageDispatcher mDispathcher = new MultiThreadedMessageDispatcher(
            threadPool, new MessageDispatcherImpl());

    // add message processing models
    mDispathcher.addMessageProcessingModel(new MPv1());
    mDispathcher.addMessageProcessingModel(new MPv2c());

    // add all security protocols
    SecurityProtocols.getInstance().addDefaultProtocols();
    SecurityProtocols.getInstance().addPrivacyProtocol(new Priv3DES());

    // Create Target
    CommunityTarget target = new CommunityTarget();
    target.setCommunity(new OctetString("public"));

    Snmp snmp = new Snmp(mDispathcher, transport);
    snmp.addCommandResponder(this);

    transport.listen();
    System.out.println("Listening on " + address);

    try {
        this.wait();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

/**
 * This method will be called whenever a pdu is received on the given port
 * specified in the listen() method
 */
public synchronized void processPdu(CommandResponderEvent cmdRespEvent) {
    System.out.println("Received PDU...");
    PDU pdu = cmdRespEvent.getPDU();
    if (pdu != null) {
        System.out.println("Trap Type = " + pdu.getType());
        System.out.println("Variables = " + pdu.getVariableBindings());
        producer.sendBody("direct:start","Variables = " + pdu.getVariableBindings() );
    }
}

}
...