package org.egov.wm.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.egov.wm.model.VehicleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Component;

@Component
@Order(1)
/* loaded from: input_file:BOOT-INF/classes/org/egov/wm/service/Processor.class */
public class Processor {

    @Autowired
    public static ResourceLoader resourceLoader;

    @Autowired
    private static Environment env;

    @Autowired
    private SocketIO socketIO;

    @Autowired
    private CassandraConnector cassandraConnector;

    @Value("${spring.kafka.bootstrap.servers}")
    private String bootStrapServer;

    @Value("${kafka.stream.in.topic}")
    private String kafkaInStreamTopic;

    @Value("${kafka.stream.cassandra.topic}")
    private String kafkaStreamCassandraTopic;
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) Processor.class);

    @PostConstruct
    public void kafkaStreamProcessor() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wm-kafka-streams");
        properties.put("bootstrap.servers", this.bootStrapServer);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        logger.info("Into the method");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(this.kafkaInStreamTopic);
        stream.to(this.kafkaStreamCassandraTopic);
        stream.foreach(new ForeachAction<String, String>() { // from class: org.egov.wm.service.Processor.1
            @Override // org.apache.kafka.streams.kstream.ForeachAction
            public void apply(String str, String str2) {
                Processor.logger.info(str + ": " + str2);
                try {
                    Processor.this.socketIO.pushToSocketIO(str, str2);
                } catch (Exception e) {
                    Processor.logger.error("Couldn't post to socketIO: ", (Throwable) e);
                    Processor.this.transformDataAndPersist(Processor.this.cassandraConnector, str2);
                }
                Processor.this.transformDataAndPersist(Processor.this.cassandraConnector, str2);
            }
        });
        try {
            new KafkaStreams(streamsBuilder.build(), properties).start();
        } catch (Throwable th) {
            logger.error("Exception while starting stream: ", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void transformDataAndPersist(CassandraConnector cassandraConnector, String str) {
        logger.info("Transforming Data...");
        ObjectMapper objectMapper = new ObjectMapper();
        VehicleInfo vehicleInfo = new VehicleInfo();
        try {
            vehicleInfo = (VehicleInfo) objectMapper.readValue(str, VehicleInfo.class);
        } catch (Exception e) {
            logger.error("Couldn't convert to VehicleInfo: ", (Throwable) e);
            cassandraConnector.connect();
            cassandraConnector.insertVehicleInfo(vehicleInfo);
        }
        cassandraConnector.connect();
        cassandraConnector.insertVehicleInfo(vehicleInfo);
    }
}
