/*
 * Decompiled with CFR 0.152.
 */
package org.egov.wm.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.egov.wm.model.VehicleInfo;
import org.egov.wm.service.CassandraConnector;
import org.egov.wm.service.SocketIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Component;

@Component
@Order(value=1)
public class Processor {
    @Autowired
    public static ResourceLoader resourceLoader;
    @Autowired
    private static Environment env;
    @Autowired
    private SocketIO socketIO;
    @Autowired
    private CassandraConnector cassandraConnector;
    @Value(value="${spring.kafka.bootstrap.servers}")
    private String bootStrapServer;
    @Value(value="${kafka.stream.in.topic}")
    private String kafkaInStreamTopic;
    @Value(value="${kafka.stream.cassandra.topic}")
    private String kafkaStreamCassandraTopic;
    public static final Logger logger;

    @PostConstruct
    public void kafkaStreamProcessor() {
        Properties props = new Properties();
        props.put("application.id", "wm-kafka-streams");
        props.put("bootstrap.servers", this.bootStrapServer);
        props.put("default.key.serde", Serdes.String().getClass());
        props.put("default.value.serde", Serdes.String().getClass());
        logger.info("Into the method");
        StreamsBuilder builder = new StreamsBuilder();
        KStream source = builder.stream(this.kafkaInStreamTopic);
        source.to(this.kafkaStreamCassandraTopic);
        source.foreach((ForeachAction)new /* Unavailable Anonymous Inner Class!! */);
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        try {
            streams.start();
        }
        catch (Throwable e) {
            logger.error("Exception while starting stream: ", e);
        }
    }

    private void transformDataAndPersist(CassandraConnector cassandraConnector, String value) {
        logger.info("Transforming Data...");
        ObjectMapper mapper = new ObjectMapper();
        VehicleInfo vehicleInfo = new VehicleInfo();
        try {
            vehicleInfo = (VehicleInfo)mapper.readValue(value, VehicleInfo.class);
        }
        catch (Exception e) {
            logger.error("Couldn't convert to VehicleInfo: ", (Throwable)e);
            cassandraConnector.connect();
            cassandraConnector.insertVehicleInfo(vehicleInfo);
        }
        cassandraConnector.connect();
        cassandraConnector.insertVehicleInfo(vehicleInfo);
    }

    static /* synthetic */ SocketIO access$000(Processor x0) {
        return x0.socketIO;
    }

    static /* synthetic */ CassandraConnector access$100(Processor x0) {
        return x0.cassandraConnector;
    }

    static /* synthetic */ void access$200(Processor x0, CassandraConnector x1, String x2) {
        x0.transformDataAndPersist(x1, x2);
    }

    static {
        logger = LoggerFactory.getLogger(Processor.class);
    }
}

