package org.cerberus.service.kafka.impl;

import com.jayway.jsonpath.PathNotFoundException;
import com.mchange.v2.c3p0.subst.C3P0Substitutions;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tika.mime.MimeTypesReaderMetKeys;
import org.cerberus.crud.entity.AppService;
import org.cerberus.crud.entity.AppServiceContent;
import org.cerberus.crud.entity.AppServiceHeader;
import org.cerberus.crud.entity.TestCaseExecution;
import org.cerberus.crud.entity.TestCaseStep;
import org.cerberus.crud.entity.TestCaseStepAction;
import org.cerberus.crud.entity.TestDataLib;
import org.cerberus.crud.factory.IFactoryAppService;
import org.cerberus.crud.factory.IFactoryAppServiceContent;
import org.cerberus.crud.factory.IFactoryAppServiceHeader;
import org.cerberus.crud.service.IAppServiceService;
import org.cerberus.crud.service.IParameterService;
import org.cerberus.engine.entity.MessageEvent;
import org.cerberus.engine.entity.MessageGeneral;
import org.cerberus.engine.execution.IRecorderService;
import org.cerberus.engine.gwt.IVariableService;
import org.cerberus.enums.MessageEventEnum;
import org.cerberus.enums.MessageGeneralEnum;
import org.cerberus.exception.CerberusEventException;
import org.cerberus.exception.CerberusException;
import org.cerberus.service.json.IJsonService;
import org.cerberus.service.kafka.IKafkaService;
import org.cerberus.service.proxy.IProxyService;
import org.cerberus.util.StringUtil;
import org.cerberus.util.answer.AnswerItem;
import org.json.JSONArray;
import org.json.JSONObject;
import org.quartz.impl.jdbcjobstore.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:WEB-INF/classes/org/cerberus/service/kafka/impl/KafkaService.class */
public class KafkaService implements IKafkaService {

    @Autowired
    IRecorderService recorderService;

    @Autowired
    IFactoryAppServiceHeader factoryAppServiceHeader;

    @Autowired
    IFactoryAppServiceContent factoryAppServiceContent;

    @Autowired
    IParameterService parameterService;

    @Autowired
    IFactoryAppService factoryAppService;

    @Autowired
    IAppServiceService appServiceService;

    @Autowired
    IProxyService proxyService;

    @Autowired
    IJsonService jsonService;

    @Autowired
    private IVariableService variableService;
    protected final Logger LOG = LogManager.getLogger(getClass());

    @Override // org.cerberus.service.kafka.IKafkaService
    public String getKafkaConsumerKey(String str, String str2) {
        return str2 + "|" + str;
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<AppService> produceEvent(String str, String str2, String str3, String str4, List<AppServiceHeader> list, List<AppServiceContent> list2, String str5, int i) {
        MessageEvent messageEvent;
        new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
        AnswerItem<AppService> answerItem = new AnswerItem<>();
        AppService create = this.factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", true, "", "", "", null, "", null, null);
        if (!StringUtil.isNullOrEmpty(str5)) {
            list.add(this.factoryAppServiceHeader.create(null, "cerberus-token", str5, "Y", 0, "", "", null, "", null));
        }
        boolean z = false;
        for (AppServiceContent appServiceContent : list2) {
            if (!StringUtil.parseBoolean(appServiceContent.getActive()) && appServiceContent.getKey().contains("enable_avro")) {
                z = true;
            }
        }
        Properties properties = new Properties();
        list2.add(this.factoryAppServiceContent.create(null, "bootstrap.servers", str4, "Y", 0, "", "", null, "", null));
        list2.add(this.factoryAppServiceContent.create(null, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", "Y", 0, "", "", null, "", null));
        list2.add(this.factoryAppServiceContent.create(null, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
        if (!z) {
            list2.add(this.factoryAppServiceContent.create(null, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));
        }
        list2.add(this.factoryAppServiceContent.create(null, "request.timeout.ms", String.valueOf(i), "Y", 0, "", "", null, "", null));
        list2.add(this.factoryAppServiceContent.create(null, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, String.valueOf(i), "Y", 0, "", "", null, "", null));
        for (AppServiceContent appServiceContent2 : list2) {
            if (StringUtil.parseBoolean(appServiceContent2.getActive())) {
                properties.put(appServiceContent2.getKey(), appServiceContent2.getValue());
            }
        }
        if (z) {
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            properties.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());
        }
        create.setServicePath(str4);
        create.setKafkaTopic(str);
        create.setKafkaKey(str2);
        create.setServiceRequest(str3);
        create.setHeaderList(list);
        create.setContentList(list2);
        int i2 = -1;
        long j = -1;
        KafkaProducer kafkaProducer = null;
        try {
            try {
                this.LOG.info("Open Producer : " + getKafkaConsumerKey(str, str4));
                kafkaProducer = new KafkaProducer(properties);
                ProducerRecord producerRecord = new ProducerRecord(str, str2, str3);
                for (AppServiceHeader appServiceHeader : list) {
                    if (StringUtil.parseBoolean(appServiceHeader.getActive())) {
                        producerRecord.headers().add(new RecordHeader(appServiceHeader.getKey(), appServiceHeader.getValue().getBytes()));
                    }
                }
                this.LOG.debug("Producing Kafka message - topic : " + str + " key : " + str2 + " message : " + str3);
                RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
                i2 = recordMetadata.partition();
                j = recordMetadata.offset();
                this.LOG.debug("Produced Kafka message - topic : " + str + " key : " + str2 + " partition : " + i2 + " offset : " + j);
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);
                if (kafkaProducer != null) {
                    kafkaProducer.flush();
                    if (kafkaProducer != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Exception e) {
                            this.LOG.error(e, e);
                        }
                    }
                    this.LOG.info("Closed Producer : " + getKafkaConsumerKey(str, str4));
                } else {
                    this.LOG.info("Producer not opened : " + getKafkaConsumerKey(str, str4));
                }
            } catch (Throwable th) {
                if (kafkaProducer != null) {
                    kafkaProducer.flush();
                    if (kafkaProducer != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Exception e2) {
                            this.LOG.error(e2, e2);
                        }
                    }
                    this.LOG.info("Closed Producer : " + getKafkaConsumerKey(str, str4));
                } else {
                    this.LOG.info("Producer not opened : " + getKafkaConsumerKey(str, str4));
                }
                throw th;
            }
        } catch (Exception e3) {
            messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);
            messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e3.toString() + " " + StringUtil.getExceptionCauseFromString(e3)));
            this.LOG.error(e3, e3);
            if (kafkaProducer != null) {
                kafkaProducer.flush();
                if (kafkaProducer != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Exception e4) {
                        this.LOG.error(e4, e4);
                    }
                }
                this.LOG.info("Closed Producer : " + getKafkaConsumerKey(str, str4));
            } else {
                this.LOG.info("Producer not opened : " + getKafkaConsumerKey(str, str4));
            }
        }
        create.setKafkaResponseOffset(j);
        create.setKafkaResponsePartition(i2);
        create.setResponseHTTPBodyContentType(this.appServiceService.guessContentType(create, "JSON"));
        answerItem.setItem(create);
        messageEvent.setDescription(messageEvent.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));
        messageEvent.setDescription(messageEvent.getDescription().replace("%TOPIC%", str));
        messageEvent.setDescription(messageEvent.getDescription().replace("%PART%", String.valueOf(i2)));
        messageEvent.setDescription(messageEvent.getDescription().replace("%OFFSET%", String.valueOf(j)));
        answerItem.setResultMessage(messageEvent);
        return answerItem;
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<Map<TopicPartition, Long>> seekEvent(String str, String str2, List<AppServiceContent> list, int i) {
        MessageEvent messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);
        AnswerItem<Map<TopicPartition, Long>> answerItem = new AnswerItem<>();
        AutoCloseable autoCloseable = null;
        try {
            try {
                Properties properties = new Properties();
                list.add(this.factoryAppServiceContent.create(null, "bootstrap.servers", str2, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, C3P0Substitutions.TRACE, "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, "request.timeout.ms", String.valueOf(i), "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(i), "Y", 0, "", "", null, "", null));
                list.add(this.factoryAppServiceContent.create(null, ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(i), "Y", 0, "", "", null, "", null));
                for (AppServiceContent appServiceContent : list) {
                    if (StringUtil.parseBoolean(appServiceContent.getActive())) {
                        properties.put(appServiceContent.getKey(), appServiceContent.getValue());
                    }
                }
                this.LOG.info("Open Consumer : " + getKafkaConsumerKey(str, str2));
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
                if (partitionsFor == null) {
                    messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);
                    messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", str).replace("%HOSTS%", str2));
                } else {
                    List list2 = (List) partitionsFor.stream().map(partitionInfo -> {
                        return new TopicPartition(str, partitionInfo.partition());
                    }).collect(Collectors.toList());
                    kafkaConsumer.assign(list2);
                    kafkaConsumer.seekToEnd(list2);
                    new HashMap();
                    answerItem.setItem(kafkaConsumer.endOffsets(list2));
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                    this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                } else {
                    this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
                }
            } catch (Exception e) {
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);
                messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e.toString() + " " + StringUtil.getExceptionCauseFromString(e)).replace("%TOPIC%", str).replace("%HOSTS%", str2));
                this.LOG.debug(e, e);
                if (0 != 0) {
                    autoCloseable.close();
                    this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                } else {
                    this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
                }
            }
            answerItem.setResultMessage(messageEvent);
            return answerItem;
        } catch (Throwable th) {
            if (0 != 0) {
                autoCloseable.close();
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
            } else {
                this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
            }
            throw th;
        }
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public AnswerItem<String> searchEvent(Map<TopicPartition, Long> map, String str, String str2, List<AppServiceHeader> list, List<AppServiceContent> list2, String str3, String str4, int i, int i2) {
        MessageEvent messageEvent;
        new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
        AnswerItem<String> answerItem = new AnswerItem<>();
        Instant now = Instant.now();
        JSONArray jSONArray = new JSONArray();
        AutoCloseable autoCloseable = null;
        int i3 = 0;
        int i4 = 0;
        try {
            try {
                try {
                    Properties properties = new Properties();
                    list2.add(this.factoryAppServiceContent.create(null, "bootstrap.servers", str2, "Y", 0, "", "", null, "", null));
                    list2.add(this.factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));
                    list2.add(this.factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, C3P0Substitutions.TRACE, "Y", 0, "", "", null, "", null));
                    list2.add(this.factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                    list2.add(this.factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));
                    for (AppServiceContent appServiceContent : list2) {
                        if (StringUtil.parseBoolean(appServiceContent.getActive())) {
                            properties.put(appServiceContent.getKey(), appServiceContent.getValue());
                        }
                    }
                    this.LOG.info("Open Consumer : " + getKafkaConsumerKey(str, str2));
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                    List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
                    if (partitionsFor == null) {
                        messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
                        messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", str).replace("%HOSTS%", str2));
                    } else {
                        kafkaConsumer.assign((List) partitionsFor.stream().map(partitionInfo -> {
                            return new TopicPartition(str, partitionInfo.partition());
                        }).collect(Collectors.toList()));
                        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                            kafkaConsumer.seek(entry.getKey(), entry.getValue().longValue());
                            this.LOG.debug("Partition : " + entry.getKey().partition() + " set to offset : " + entry.getValue());
                        }
                        boolean z = true;
                        long epochMilli = Instant.now().plusSeconds(i2).toEpochMilli();
                        int i5 = i2 < 5 ? i2 : 5;
                        while (z) {
                            this.LOG.debug("Start Poll.");
                            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(i5));
                            this.LOG.debug("End Poll.");
                            if (Instant.now().toEpochMilli() > epochMilli) {
                                this.LOG.debug("Timed out searching for record");
                                kafkaConsumer.wakeup();
                            }
                            Iterator it = poll.iterator();
                            while (true) {
                                if (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    try {
                                        this.LOG.debug("New record " + consumerRecord.topic() + " " + consumerRecord.partition() + " " + consumerRecord.offset());
                                        this.LOG.debug("  " + ((String) consumerRecord.key()) + " | " + ((String) consumerRecord.value()));
                                        JSONObject jSONObject = new JSONObject((String) consumerRecord.value());
                                        JSONObject jSONObject2 = new JSONObject();
                                        for (Header header : consumerRecord.headers()) {
                                            jSONObject2.put(header.key(), new String(header.value()));
                                        }
                                        i4++;
                                        boolean z2 = true;
                                        if (!StringUtil.isNullOrEmpty(str3)) {
                                            String str5 = "";
                                            try {
                                                str5 = this.jsonService.getStringFromJson((String) consumerRecord.value(), str3);
                                            } catch (PathNotFoundException e) {
                                                z2 = false;
                                                this.LOG.debug("Record discarded - Path not found.");
                                            } catch (Exception e2) {
                                                this.LOG.error(e2, e2);
                                            }
                                            this.LOG.debug("Filtered value : " + str5);
                                            if (!str5.equals(str4)) {
                                                z2 = false;
                                                this.LOG.debug("Record discarded - Value different.");
                                            }
                                        }
                                        if (z2) {
                                            JSONObject jSONObject3 = new JSONObject();
                                            jSONObject3.put("key", consumerRecord.key());
                                            jSONObject3.put("value", jSONObject);
                                            jSONObject3.put(MimeTypesReaderMetKeys.MATCH_OFFSET_ATTR, consumerRecord.offset());
                                            jSONObject3.put("partition", consumerRecord.partition());
                                            jSONObject3.put("header", jSONObject2);
                                            jSONArray.put(jSONObject3);
                                            i3++;
                                            if (i3 >= i) {
                                                z = false;
                                                kafkaConsumer.wakeup();
                                                break;
                                            }
                                        }
                                    } catch (Exception e3) {
                                        this.LOG.error(e3, e3);
                                    }
                                }
                            }
                        }
                        answerItem.setItem(jSONArray.toString());
                        messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA).resolveDescription("NBEVENT", String.valueOf(i3)).resolveDescription("NBTOT", String.valueOf(i4)).resolveDescription("NBSEC", String.valueOf(Duration.between(now, Instant.now()).getSeconds()));
                    }
                    if (kafkaConsumer != null) {
                        this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                        kafkaConsumer.close();
                    } else {
                        this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                        autoCloseable.close();
                    } else {
                        this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
                    }
                    throw th;
                }
            } catch (NullPointerException e4) {
                messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
                messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e4.toString()).replace("%TOPIC%", str).replace("%HOSTS%", str2));
                this.LOG.error(e4, e4);
                if (0 != 0) {
                    this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                    autoCloseable.close();
                } else {
                    this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
                }
            }
        } catch (WakeupException e5) {
            answerItem.setItem(jSONArray.toString());
            messageEvent = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKAPARTIALRESULT).resolveDescription("NBEVENT", String.valueOf(0)).resolveDescription("NBTOT", String.valueOf(0)).resolveDescription("NBSEC", String.valueOf(Duration.between(now, Instant.now()).getSeconds()));
            if (0 != 0) {
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                autoCloseable.close();
            } else {
                this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
            }
        } catch (Exception e6) {
            messageEvent = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);
            messageEvent.setDescription(messageEvent.getDescription().replace("%EX%", e6.toString() + " " + StringUtil.getExceptionCauseFromString(e6)).replace("%TOPIC%", str).replace("%HOSTS%", str2));
            this.LOG.debug(e6, e6);
            if (0 != 0) {
                this.LOG.info("Closed Consumer : " + getKafkaConsumerKey(str, str2));
                autoCloseable.close();
            } else {
                this.LOG.info("Consumer not opened : " + getKafkaConsumerKey(str, str2));
            }
        }
        answerItem.setItem(jSONArray.toString());
        messageEvent.setDescription(messageEvent.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKASEARCH).replace("%TOPIC%", str));
        answerItem.setResultMessage(messageEvent);
        return answerItem;
    }

    @Override // org.cerberus.service.kafka.IKafkaService
    public HashMap<String, Map<TopicPartition, Long>> getAllConsumers(List<TestCaseStep> list, TestCaseExecution testCaseExecution) throws CerberusException {
        HashMap<String, Map<TopicPartition, Long>> hashMap = new HashMap<>();
        new AnswerItem();
        new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS);
        Iterator<TestCaseStep> it = list.iterator();
        while (it.hasNext()) {
            for (TestCaseStepAction testCaseStepAction : it.next().getActions()) {
                if (testCaseStepAction.getAction().equals(TestCaseStepAction.ACTION_CALLSERVICE) && !testCaseStepAction.getConditionOperator().equals("never")) {
                    AnswerItem<AppService> readByKeyWithDependency = this.appServiceService.readByKeyWithDependency(testCaseStepAction.getValue1(), "Y");
                    if (readByKeyWithDependency.getItem() != null && readByKeyWithDependency.getItem().getType().equals(AppService.TYPE_KAFKA) && readByKeyWithDependency.getItem().getMethod().equals(AppService.METHOD_KAFKASEARCH)) {
                        try {
                            AnswerItem<String> decodeStringCompletly = this.variableService.decodeStringCompletly(readByKeyWithDependency.getItem().getKafkaTopic(), testCaseExecution, null, false);
                            String item = decodeStringCompletly.getItem();
                            if (!decodeStringCompletly.isCodeStringEquals("OK")) {
                                String str = "Kafka Topic of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                MessageEvent resolveDescription = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly.getResultMessage().resolveDescription("FIELD", str).getDescription());
                                this.LOG.debug("Getting all consumers interupted due to decode '" + str + "'.");
                                MessageGeneral messageGeneral = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                messageGeneral.setDescription(resolveDescription.getDescription());
                                throw new CerberusException(messageGeneral);
                            }
                            AnswerItem<String> decodeStringCompletly2 = this.variableService.decodeStringCompletly(readByKeyWithDependency.getItem().getServicePath(), testCaseExecution, null, false);
                            String item2 = decodeStringCompletly2.getItem();
                            if (!decodeStringCompletly2.isCodeStringEquals("OK")) {
                                String str2 = "Kafka Service Path of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                MessageEvent resolveDescription2 = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly2.getResultMessage().resolveDescription("FIELD", str2).getDescription());
                                this.LOG.debug("Getting all consumers interupted due to decode '" + str2 + "'.");
                                MessageGeneral messageGeneral2 = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                messageGeneral2.setDescription(resolveDescription2.getDescription());
                                throw new CerberusException(messageGeneral2);
                            }
                            List<AppServiceHeader> headerList = readByKeyWithDependency.getItem().getHeaderList();
                            ArrayList arrayList = new ArrayList();
                            for (AppServiceHeader appServiceHeader : headerList) {
                                AnswerItem<String> decodeStringCompletly3 = this.variableService.decodeStringCompletly(appServiceHeader.getKey(), testCaseExecution, null, false);
                                appServiceHeader.setKey(decodeStringCompletly3.getItem());
                                if (!decodeStringCompletly3.isCodeStringEquals("OK")) {
                                    String str3 = "Header Key " + appServiceHeader.getKey() + " of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                    MessageEvent resolveDescription3 = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly3.getResultMessage().resolveDescription("FIELD", str3).getDescription());
                                    this.LOG.debug("Getting all consumers interupted due to decode '" + str3 + "'.");
                                    MessageGeneral messageGeneral3 = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                    messageGeneral3.setDescription(resolveDescription3.getDescription());
                                    throw new CerberusException(messageGeneral3);
                                }
                                AnswerItem<String> decodeStringCompletly4 = this.variableService.decodeStringCompletly(appServiceHeader.getValue(), testCaseExecution, null, false);
                                appServiceHeader.setValue(decodeStringCompletly4.getItem());
                                if (!decodeStringCompletly4.isCodeStringEquals("OK")) {
                                    String str4 = "Header Value " + appServiceHeader.getKey() + " of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                    MessageEvent resolveDescription4 = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly4.getResultMessage().resolveDescription("FIELD", str4).getDescription());
                                    this.LOG.debug("Getting all consumers interupted due to decode '" + str4 + "'.");
                                    MessageGeneral messageGeneral4 = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                    messageGeneral4.setDescription(resolveDescription4.getDescription());
                                    throw new CerberusException(messageGeneral4);
                                }
                                arrayList.add(appServiceHeader);
                            }
                            List<AppServiceContent> contentList = readByKeyWithDependency.getItem().getContentList();
                            ArrayList arrayList2 = new ArrayList();
                            for (AppServiceContent appServiceContent : contentList) {
                                AnswerItem<String> decodeStringCompletly5 = this.variableService.decodeStringCompletly(appServiceContent.getKey(), testCaseExecution, null, false);
                                appServiceContent.setKey(decodeStringCompletly5.getItem());
                                if (!decodeStringCompletly5.isCodeStringEquals("OK")) {
                                    String str5 = "Header Key " + appServiceContent.getKey() + " of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                    MessageEvent resolveDescription5 = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly5.getResultMessage().resolveDescription("FIELD", str5).getDescription());
                                    this.LOG.debug("Getting all consumers interupted due to decode '" + str5 + "'.");
                                    MessageGeneral messageGeneral5 = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                    messageGeneral5.setDescription(resolveDescription5.getDescription());
                                    throw new CerberusException(messageGeneral5);
                                }
                                AnswerItem<String> decodeStringCompletly6 = this.variableService.decodeStringCompletly(appServiceContent.getValue(), testCaseExecution, null, false);
                                appServiceContent.setValue(decodeStringCompletly6.getItem());
                                if (!decodeStringCompletly6.isCodeStringEquals("OK")) {
                                    String str6 = "Header Value " + appServiceContent.getKey() + " of Service '" + readByKeyWithDependency.getItem().getService() + "'";
                                    MessageEvent resolveDescription6 = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKALLTOPICS).resolveDescription(Constants.COL_DESCRIPTION, decodeStringCompletly6.getResultMessage().resolveDescription("FIELD", str6).getDescription());
                                    this.LOG.debug("Getting all consumers interupted due to decode '" + str6 + "'.");
                                    MessageGeneral messageGeneral6 = new MessageGeneral(MessageGeneralEnum.NO_DATA_FOUND);
                                    messageGeneral6.setDescription(resolveDescription6.getDescription());
                                    throw new CerberusException(messageGeneral6);
                                }
                                arrayList2.add(appServiceContent);
                            }
                            AnswerItem<Map<TopicPartition, Long>> seekEvent = seekEvent(item, item2, arrayList2, this.parameterService.getParameterIntegerByKey("cerberus_callservice_timeoutms", testCaseExecution.getSystem(), 60000).intValue());
                            if (!seekEvent.isCodeEquals(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA.getCode())) {
                                this.LOG.debug("TestCase interupted due to error when opening Kafka consume. " + seekEvent.getMessageDescription());
                                throw new CerberusException(new MessageGeneral(MessageGeneralEnum.VALIDATION_FAILED_KAFKACONSUMERSEEK).resolveDescription(TestDataLib.TYPE_SERVICE, readByKeyWithDependency.getItem().getService()).resolveDescription("DETAIL", seekEvent.getMessageDescription()));
                            }
                            this.LOG.debug("Saving Map to key : " + getKafkaConsumerKey(readByKeyWithDependency.getItem().getKafkaTopic(), readByKeyWithDependency.getItem().getServicePath()));
                            hashMap.put(getKafkaConsumerKey(item, item2), seekEvent.getItem());
                        } catch (CerberusEventException e) {
                            this.LOG.error(e);
                        }
                    }
                }
            }
        }
        this.LOG.debug(hashMap.size() + " consumers lastest offset retrieved.");
        return hashMap;
    }
}
