package org.cerberus.service.kafka.impl;

import com.jayway.jsonpath.PathNotFoundException;
import com.mchange.v2.c3p0.subst.C3P0Substitutions;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tika.mime.MimeTypesReaderMetKeys;
import org.cerberus.crud.entity.AppService;
import org.cerberus.crud.entity.AppServiceHeader;
import org.cerberus.crud.entity.TestCaseStep;
import org.cerberus.crud.entity.TestCaseStepAction;
import org.cerberus.crud.factory.IFactoryAppService;
import org.cerberus.crud.factory.IFactoryAppServiceHeader;
import org.cerberus.crud.service.IAppServiceService;
import org.cerberus.crud.service.IParameterService;
import org.cerberus.engine.entity.MessageEvent;
import org.cerberus.engine.entity.MessageGeneral;
import org.cerberus.engine.execution.IRecorderService;
import org.cerberus.enums.MessageEventEnum;
import org.cerberus.enums.MessageGeneralEnum;
import org.cerberus.exception.CerberusException;
import org.cerberus.service.json.IJsonService;
import org.cerberus.service.kafka.IKafkaService;
import org.cerberus.service.proxy.IProxyService;
import org.cerberus.util.StringUtil;
import org.cerberus.util.answer.AnswerItem;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:WEB-INF/classes/org/cerberus/service/kafka/impl/KafkaService.class */
public class KafkaService implements IKafkaService {

    @Autowired
    IRecorderService recorderService;

    @Autowired
    IFactoryAppServiceHeader factoryAppServiceHeader;

    @Autowired
    IParameterService parameterService;

    @Autowired
    IFactoryAppService factoryAppService;

    @Autowired
    IAppServiceService appServiceService;

    @Autowired
    IProxyService proxyService;

    @Autowired
    IJsonService jsonService;
    protected final Logger LOG = LogManager.getLogger(getClass());

    @Override // org.cerberus.service.kafka.IKafkaService
    public String getKafkaConsumerKey(String str, String str2) {
        return str + "|" + str2;
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<AppService> produceEvent(String str, String str2, String str3, String str4, List<AppServiceHeader> list) throws InterruptedException, ExecutionException {
        MessageEvent messageEvent;
        new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
        AnswerItem<AppService> answerItem = new AnswerItem<>();
        AppService create = this.factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", "", "", "", null, "", null, null);
        Properties properties = new Properties();
        list.add(this.factoryAppServiceHeader.create(null, "bootstrap.servers", str4, "Y", 0, "", "", null, "", null));
        list.add(this.factoryAppServiceHeader.create(null, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", "Y", 0, "", "", null, "", null));
        list.add(this.factoryAppServiceHeader.create(null, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
        list.add(this.factoryAppServiceHeader.create(null, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
        for (AppServiceHeader appServiceHeader : list) {
            if (StringUtil.parseBoolean(appServiceHeader.getActive())) {
                properties.put(appServiceHeader.getKey(), appServiceHeader.getValue());
            }
        }
        create.setServicePath(str4);
        create.setKafkaTopic(str);
        create.setKafkaKey(str2);
        create.setServiceRequest(str3);
        create.setHeaderList(list);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        int i = -1;
        long j = -1;
        try {
            try {
                ProducerRecord producerRecord = new ProducerRecord(str, str2, str3);
                this.LOG.debug("Producing Kafka message - topic : " + str + " key : " + str2 + " message : " + str3);
                RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
                i = recordMetadata.partition();
                j = recordMetadata.offset();
                this.LOG.debug("Produced Kafka message - topic : " + str + " key : " + str2 + " partition : " + i + " offset : " + j);
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);
                kafkaProducer.flush();
                kafkaProducer.close();
                this.LOG.info("Closed producer");
            } catch (Exception e) {
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
                messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e.toString()));
                this.LOG.debug(e, e);
                kafkaProducer.flush();
                kafkaProducer.close();
                this.LOG.info("Closed producer");
            }
            create.setKafkaResponseOffset(j);
            create.setKafkaResponsePartition(i);
            create.setResponseHTTPBodyContentType(this.appServiceService.guessContentType(create, "JSON"));
            answerItem.setItem(create);
            messageEvent.setDescription(messageEvent.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));
            messageEvent.setDescription(messageEvent.getDescription().replace("%TOPIC%", str));
            messageEvent.setDescription(messageEvent.getDescription().replace("%PART%", String.valueOf(i)));
            messageEvent.setDescription(messageEvent.getDescription().replace("%OFFSET%", String.valueOf(j)));
            answerItem.setResultMessage(messageEvent);
            return answerItem;
        } catch (Throwable th) {
            kafkaProducer.flush();
            kafkaProducer.close();
            this.LOG.info("Closed producer");
            throw th;
        }
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<Map<TopicPartition, Long>> seekEvent(String str, String str2, List<AppServiceHeader> list) throws InterruptedException, ExecutionException {
        MessageEvent messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);
        AnswerItem<Map<TopicPartition, Long>> answerItem = new AnswerItem<>();
        KafkaConsumer kafkaConsumer = null;
        try {
            try {
                Properties properties = new Properties();
                list.add(this.factoryAppServiceHeader.create(null, "bootstrap.servers", str2, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, C3P0Substitutions.TRACE, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                for (AppServiceHeader appServiceHeader : list) {
                    if (StringUtil.parseBoolean(appServiceHeader.getActive())) {
                        properties.put(appServiceHeader.getKey(), appServiceHeader.getValue());
                    }
                }
                this.LOG.info("Open Consumer : " + getKafkaConsumerKey(str, str2));
                kafkaConsumer = new KafkaConsumer(properties);
                List list2 = (List) kafkaConsumer.partitionsFor(str).stream().map(partitionInfo -> {
                    return new TopicPartition(str, partitionInfo.partition());
                }).collect(Collectors.toList());
                kafkaConsumer.assign(list2);
                kafkaConsumer.seekToEnd(list2);
                new HashMap();
                answerItem.setItem(kafkaConsumer.endOffsets(list2));
                kafkaConsumer.close();
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
            } catch (Exception e) {
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);
                messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e.toString()).replace("%TOPIC%", str));
                this.LOG.debug(e, e);
                kafkaConsumer.close();
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
            }
            answerItem.setResultMessage(messageEvent);
            return answerItem;
        } catch (Throwable th) {
            kafkaConsumer.close();
            this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
            throw th;
        }
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<String> searchEvent(Map<TopicPartition, Long> map, String str, String str2, List<AppServiceHeader> list, String str3, String str4, int i, int i2) throws InterruptedException, ExecutionException {
        MessageEvent messageEvent;
        new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
        AnswerItem<String> answerItem = new AnswerItem<>();
        this.factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKASEARCH, "", "", "", "", "", "", "", "", "", "", "", "", null, "", null, null);
        Instant now = Instant.now();
        JSONArray jSONArray = new JSONArray();
        KafkaConsumer kafkaConsumer = null;
        int i3 = 0;
        int i4 = 0;
        try {
            try {
                Properties properties = new Properties();
                list.add(this.factoryAppServiceHeader.create(null, "bootstrap.servers", str2, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, C3P0Substitutions.TRACE, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceHeader.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                for (AppServiceHeader appServiceHeader : list) {
                    if (StringUtil.parseBoolean(appServiceHeader.getActive())) {
                        properties.put(appServiceHeader.getKey(), appServiceHeader.getValue());
                    }
                }
                this.LOG.info("Open Consumer : " + getKafkaConsumerKey(str, str2));
                kafkaConsumer = new KafkaConsumer(properties);
                kafkaConsumer.assign((List) kafkaConsumer.partitionsFor(str).stream().map(partitionInfo -> {
                    return new TopicPartition(str, partitionInfo.partition());
                }).collect(Collectors.toList()));
                for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                    kafkaConsumer.seek(entry.getKey(), entry.getValue().longValue());
                    this.LOG.debug("Partition : " + entry.getKey().partition() + " set to offset : " + entry.getValue());
                }
                boolean z = true;
                long epochMilli = Instant.now().plusSeconds(i2).toEpochMilli();
                int i5 = i2 < 5 ? i2 : 5;
                while (z) {
                    this.LOG.debug("Start Poll.");
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(i5));
                    this.LOG.debug("End Poll.");
                    if (Instant.now().toEpochMilli() > epochMilli) {
                        this.LOG.debug("Timed out searching for record");
                        kafkaConsumer.wakeup();
                    }
                    Iterator it = poll.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            try {
                                this.LOG.debug("New record " + consumerRecord.topic() + " " + consumerRecord.partition() + " " + consumerRecord.offset());
                                this.LOG.debug("  " + ((String) consumerRecord.key()) + " | " + ((String) consumerRecord.value()));
                                JSONObject jSONObject = new JSONObject((String) consumerRecord.value());
                                i4++;
                                boolean z2 = true;
                                if (!StringUtil.isNullOrEmpty(str3)) {
                                    String str5 = "";
                                    try {
                                        str5 = this.jsonService.getStringFromJson((String) consumerRecord.value(), str3);
                                    } catch (PathNotFoundException e) {
                                        z2 = false;
                                        this.LOG.debug("Record discarded - Path not found.");
                                    } catch (Exception e2) {
                                        this.LOG.error(e2, e2);
                                    }
                                    this.LOG.debug("Filtered value : " + str5);
                                    if (!str5.equals(str4)) {
                                        z2 = false;
                                        this.LOG.debug("Record discarded - Value different.");
                                    }
                                }
                                if (z2) {
                                    JSONObject jSONObject2 = new JSONObject();
                                    jSONObject2.put("key", consumerRecord.key());
                                    jSONObject2.put("value", jSONObject);
                                    jSONObject2.put(MimeTypesReaderMetKeys.MATCH_OFFSET_ATTR, consumerRecord.offset());
                                    jSONObject2.put("partition", consumerRecord.partition());
                                    jSONArray.put(jSONObject2);
                                    i3++;
                                    if (i3 >= i) {
                                        z = false;
                                        kafkaConsumer.wakeup();
                                        break;
                                    }
                                }
                            } catch (Exception e3) {
                                this.LOG.error(e3, e3);
                            }
                        }
                    }
                }
                answerItem.setItem(jSONArray.toString());
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA).resolveDescription("NBEVENT", String.valueOf(i3)).resolveDescription("NBTOT", String.valueOf(i4)).resolveDescription("NBSEC", String.valueOf(Duration.between(now, Instant.now()).getSeconds()));
                if (kafkaConsumer != null) {
                    this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                    kafkaConsumer.close();
                }
            } catch (Throwable th) {
                if (kafkaConsumer != null) {
                    this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                    kafkaConsumer.close();
                }
                throw th;
            }
        } catch (WakeupException e4) {
            answerItem.setItem(jSONArray.toString());
            messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKAPARTIALRESULT).resolveDescription("NBEVENT", String.valueOf(i3)).resolveDescription("NBTOT", String.valueOf(i4)).resolveDescription("NBSEC", String.valueOf(Duration.between(now, Instant.now()).getSeconds()));
            if (kafkaConsumer != null) {
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                kafkaConsumer.close();
            }
        } catch (Exception e5) {
            messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
            messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e5.toString()));
            this.LOG.debug(e5, e5);
            if (kafkaConsumer != null) {
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                kafkaConsumer.close();
            }
        }
        answerItem.setItem(jSONArray.toString());
        messageEvent.setDescription(messageEvent.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKASEARCH).replace("%TOPIC%", str));
        answerItem.setResultMessage(messageEvent);
        return answerItem;
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public HashMap<String, Map<TopicPartition, Long>> getAllConsumers(List<TestCaseStep> list) throws CerberusException, InterruptedException, ExecutionException {
        HashMap<String, Map<TopicPartition, Long>> hashMap = new HashMap<>();
        new AnswerItem();
        Iterator<TestCaseStep> it = list.iterator();
        while (it.hasNext()) {
            for (TestCaseStepAction testCaseStepAction : it.next().getTestCaseStepAction()) {
                if (testCaseStepAction.getAction().equals(TestCaseStepAction.ACTION_CALLSERVICE) && !testCaseStepAction.getConditionOper().equals("never")) {
                    AnswerItem<AppService> readByKeyWithDependency = this.appServiceService.readByKeyWithDependency(testCaseStepAction.getValue1(), "Y");
                    if (readByKeyWithDependency.getItem() != null && readByKeyWithDependency.getItem().getType().equals(AppService.TYPE_KAFKA) && readByKeyWithDependency.getItem().getMethod().equals(AppService.METHOD_KAFKASEARCH)) {
                        AnswerItem<Map<TopicPartition, Long>> seekEvent = seekEvent(readByKeyWithDependency.getItem().getKafkaTopic(), readByKeyWithDependency.getItem().getServicePath(), readByKeyWithDependency.getItem().getHeaderList());
                        if (!seekEvent.isCodeEquals(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA.getCode())) {
                            this.LOG.debug("TestCase interupted due to error when opening Kafka consume. " + seekEvent.getMessageDescription());
                            throw new CerberusException(new MessageGeneral(MessageGeneralEnum.VALIDATION_FAILED_KAFKACONSUMERSEEK).resolveDescription("DETAIL", seekEvent.getMessageDescription()));
                        }
                        hashMap.put(getKafkaConsumerKey(readByKeyWithDependency.getItem().getKafkaTopic(), readByKeyWithDependency.getItem().getServicePath()), seekEvent.getItem());
                    }
                }
            }
        }
        this.LOG.debug(hashMap.size() + " consumers lastest offset retrieved.");
        return hashMap;
    }
}
