package com.thebeastshop.datahub.client.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.collect.Lists;
import com.thebeastshop.datahub.client.DatahubClient;
import com.thebeastshop.datahub.client.annotation.AppId;
import com.thebeastshop.datahub.client.annotation.Creator;
import com.thebeastshop.datahub.client.annotation.DataCreateTime;
import com.thebeastshop.datahub.client.annotation.DataUpdateTime;
import com.thebeastshop.datahub.client.annotation.DbId;
import com.thebeastshop.datahub.client.annotation.Reviser;
import com.thebeastshop.datahub.client.criteria.Group;
import com.thebeastshop.datahub.client.criteria.Query;
import com.thebeastshop.datahub.client.exception.DatahubCreatorEmpty;
import com.thebeastshop.datahub.client.exception.DatahubIdEmpty;
import com.thebeastshop.datahub.client.exception.DatahubQueryException;
import com.thebeastshop.datahub.client.exception.DatahubQueryResultException;
import com.thebeastshop.datahub.client.exception.DatahubReviserEmpty;
import com.thebeastshop.datahub.client.exception.DatahubTimeoutException;
import com.thebeastshop.datahub.client.kafka.ResponseMessageListener;
import com.thebeastshop.datahub.client.utils.CriteriaUtils;
import com.thebeastshop.datahub.client.utils.DatahubBeanUtil;
import com.thebeastshop.datahub.client.utils.DatahubCallback;
import com.thebeastshop.datahub.client.utils.DatahubProperty;
import com.thebeastshop.datahub.client.utils.MessageUtil;
import com.thebeastshop.datahub.client.utils.MetaUtils;
import com.thebeastshop.datahub.common.api.AggregateResult;
import com.thebeastshop.datahub.common.api.QueryResult;
import com.thebeastshop.datahub.common.api.Result;
import com.thebeastshop.datahub.common.enums.DataTypeEnum;
import com.thebeastshop.datahub.common.enums.MessageOperationEnum;
import com.thebeastshop.datahub.common.enums.MessageProtocolEnum;
import com.thebeastshop.datahub.common.enums.ResultCodeEnum;
import com.thebeastshop.datahub.common.vo.AggregationResult;
import com.thebeastshop.datahub.common.vo.BusinessMessage;
import com.thebeastshop.datahub.common.vo.BusinessProperty;
import com.thebeastshop.datahub.common.vo.BusinessRecord;
import com.thebeastshop.datahub.common.vo.BusinessStruct;
import com.thebeastshop.datahub.common.vo.ResponseMessage;
import com.thebeastshop.kit.prop.PropConstants;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;

/* loaded from: input_file:com/thebeastshop/datahub/client/impl/KafkaDatahubClient.class */
public class KafkaDatahubClient implements DatahubClient {
    private static final int DEFAULT_TIMEOUT = 30;
    private static int BATCH_SIZE = 100;
    private static int SLEEP_TIME_EACH_BATCH = 10;
    private static int HTTP_RETRY_COUNT = 5;
    private String appId;
    private String serialization;
    private KafkaTemplate kafkaTemplate;
    private ResponseMessageListener responseMessageListener;
    private String uri;
    private RequestConfig requestConfig;
    private long defaultResponseTimeout;
    private Logger log = LoggerFactory.getLogger(getClass());
    private final PoolingHttpClientConnectionManager poolingConnManager = new PoolingHttpClientConnectionManager();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/thebeastshop/datahub/client/impl/KafkaDatahubClient$BatchCallback.class */
    public interface BatchCallback<T> {
        void call(List<T> list);
    }

    public KafkaDatahubClient(Map<String, String> map, KafkaTemplate kafkaTemplate, ResponseMessageListener responseMessageListener) {
        this.kafkaTemplate = kafkaTemplate;
        this.responseMessageListener = responseMessageListener;
        this.appId = StringUtils.isNotBlank(map.get("appId")) ? map.get("appId") : PropConstants.getAppId();
        if (StringUtils.isBlank(this.appId)) {
            throw new NoSuchFieldError("appId is blank");
        }
        this.serialization = StringUtils.isNotBlank(map.get("serialization")) ? map.get("serialization") : "kryo";
        this.defaultResponseTimeout = Long.parseLong((StringUtils.isNotBlank(map.get("responseTimeout")) ? map.get("responseTimeout") : "60000").trim());
        String str = map.get("address");
        if (StringUtils.isBlank(str)) {
            throw new NoSuchFieldError("address is blank");
        }
        if (str.startsWith("http")) {
            this.uri = str;
        } else {
            this.uri = "http://" + str;
        }
        this.uri += "/datahub/" + this.appId;
        this.poolingConnManager.setMaxTotal(50);
        this.poolingConnManager.setDefaultMaxPerRoute(50);
        int intValue = StringUtils.isNotBlank(map.get("timeout")) ? Integer.valueOf(map.get("timeout")).intValue() : DEFAULT_TIMEOUT;
        this.requestConfig = RequestConfig.custom().setConnectTimeout(intValue * 1000).setConnectionRequestTimeout(intValue * 1000).setSocketTimeout(intValue * 1000).build();
        JSON.DEFAULT_GENERATE_FEATURE |= SerializerFeature.DisableCircularReferenceDetect.getMask();
    }

    private HttpClient buildHttpClient() {
        return HttpClients.custom().setConnectionManager(this.poolingConnManager).setDefaultRequestConfig(this.requestConfig).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> BusinessStruct convertT2Struct(T t) {
        BusinessStruct businessStruct = new BusinessStruct(MetaUtils.getBusinessName(t.getClass()));
        Map<String, DatahubProperty> allProperties = DatahubBeanUtil.getMetaClass(t.getClass()).getAllProperties();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(allProperties.size());
        for (DatahubProperty datahubProperty : allProperties.values()) {
            if (!datahubProperty.isCommon()) {
                String name = datahubProperty.getName();
                try {
                    DataTypeEnum dataType = datahubProperty.getDataType();
                    if (dataType != null) {
                        newArrayListWithCapacity.add(new BusinessProperty(dataType, name, dataType.castValue(datahubProperty.getValue(t), (Type) null)));
                    }
                } catch (IllegalAccessException e) {
                    this.log.error("泛型实例转换业务结构出错: 字段: {}", name, e);
                } catch (InvocationTargetException e2) {
                    this.log.error("泛型实例转换业务结构出错: 字段: {}", name, e2);
                }
            }
        }
        businessStruct.setProperties(newArrayListWithCapacity);
        return businessStruct;
    }

    private <T> T convertStruct2T(BusinessRecord businessRecord, Class<T> cls) {
        T t = null;
        try {
            t = cls.newInstance();
            HashMap hashMap = new HashMap();
            hashMap.put(DbId.class, businessRecord.getId());
            hashMap.put(AppId.class, businessRecord.getAppId());
            hashMap.put(Creator.class, businessRecord.getCreator());
            hashMap.put(Reviser.class, businessRecord.getReviser());
            hashMap.put(DataCreateTime.class, businessRecord.getCreateTime());
            hashMap.put(DataUpdateTime.class, businessRecord.getUpdateTime());
            HashMap hashMap2 = new HashMap();
            for (BusinessProperty businessProperty : businessRecord.getBusiness().getProperties()) {
                Object value = businessProperty.getValue();
                if (value != null) {
                    hashMap2.put(businessProperty.getName(), value);
                }
            }
            for (Map.Entry<String, DatahubProperty> entry : DatahubBeanUtil.getMetaClass(cls).getAllProperties().entrySet()) {
                String key = entry.getKey();
                DatahubProperty value2 = entry.getValue();
                Object obj = value2.isCommon() ? hashMap.get(value2.getAnnotationType()) : hashMap2.get(key);
                if (obj != null) {
                    try {
                        value2.setValue(t, value2.getDataType().castValue(obj, value2.getGenericPropertyType()));
                    } catch (Throwable th) {
                        this.log.error("设置泛型实例属性值出错: fieldName: {}", value2.getName(), th);
                    }
                }
            }
        } catch (Exception e) {
            this.log.error("业务结构转换泛型实例出错:", e);
        }
        return t;
    }

    private <T> void batchProcess(List<T> list, BatchCallback batchCallback) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        int size = list.size() / BATCH_SIZE;
        if (list.size() % BATCH_SIZE > 0) {
            size++;
        }
        this.log.info("[DATAHUB] batch start =>> batch size: " + size);
        int i = 0;
        for (int i2 = 0; i2 < size; i2++) {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(BATCH_SIZE);
            int i3 = BATCH_SIZE;
            if (i2 == size - 1) {
                i3 = list.size() - (BATCH_SIZE * (size - 1));
            }
            for (int i4 = 0; i4 < i3; i4++) {
                int i5 = i;
                i++;
                newArrayListWithExpectedSize.add(list.get(i5));
            }
            batchCallback.call(newArrayListWithExpectedSize);
            if (i2 < size - 1) {
                try {
                    Thread.sleep(SLEEP_TIME_EACH_BATCH);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private ResponseMessage awaitMessage(boolean z, BlockingQueue<ResponseMessage> blockingQueue, long j) {
        ResponseMessage responseMessage = null;
        try {
            responseMessage = z ? blockingQueue.poll(j, TimeUnit.MILLISECONDS) : blockingQueue.poll(this.defaultResponseTimeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (responseMessage == null) {
            throw new DatahubTimeoutException();
        }
        return responseMessage;
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public List<String> getBusinessNameList() {
        try {
            String httpGet = httpGet("biznames");
            if (StringUtils.isNotBlank(httpGet)) {
                return JSON.parseArray(httpGet, String.class);
            }
            throw new DatahubQueryException(new Exception("获取业务实体名称列表时发生错误"));
        } catch (IOException e) {
            if (e instanceof SocketTimeoutException) {
                this.log.error("[DATAHUB] 查询网络超时", e);
                throw new RuntimeException("[DATAHUB] 查询网络超时");
            }
            this.log.error("[DATAHUB] 根据条件查询记录出错：", e);
            throw new DatahubQueryException(e);
        }
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String create(T t) {
        return batchCreate(Lists.newArrayList(new Object[]{t}));
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String create(T t, DatahubCallback datahubCallback) {
        return batchCreate(Lists.newArrayList(new Object[]{t}), datahubCallback);
    }

    private <T> ResponseMessage createAndWait(boolean z, T t, long j) {
        String messageId = MessageUtil.getMessageId();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        batchCreate(messageId, Lists.newArrayList(new Object[]{t}), responseMessage -> {
            linkedBlockingQueue.add(responseMessage);
        });
        return awaitMessage(z, linkedBlockingQueue, j);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage createAndWait(T t) {
        return createAndWait(false, t, this.defaultResponseTimeout);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage createAndWait(T t, long j) {
        return createAndWait(true, t, j);
    }

    private <T> void batchCreate(final String str, List<T> list, final DatahubCallback datahubCallback) {
        batchProcess(list, new BatchCallback<T>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.1
            @Override // com.thebeastshop.datahub.client.impl.KafkaDatahubClient.BatchCallback
            public void call(List<T> list2) {
                if (CollectionUtils.isEmpty(list2)) {
                    return;
                }
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list2.size());
                for (T t : list2) {
                    String creator = DatahubBeanUtil.getCreator(t);
                    if (StringUtils.isBlank(creator)) {
                        throw new DatahubCreatorEmpty(t.getClass());
                    }
                    Date createTime = DatahubBeanUtil.getCreateTime(t);
                    Date updateTime = DatahubBeanUtil.getUpdateTime(t);
                    BusinessRecord businessRecord = new BusinessRecord();
                    businessRecord.setAppId(KafkaDatahubClient.this.appId);
                    businessRecord.setCreator(creator);
                    businessRecord.setCreateTime(createTime);
                    businessRecord.setUpdateTime(updateTime);
                    businessRecord.setBusiness(KafkaDatahubClient.this.convertT2Struct(t));
                    newArrayListWithExpectedSize.add(businessRecord);
                }
                KafkaDatahubClient.this.kafkaTemplate.send("datahub", new BusinessMessage(str, KafkaDatahubClient.this.responseMessageListener.getResponseTopicName(), MessageOperationEnum.CREATE, MessageProtocolEnum.valueOf(KafkaDatahubClient.this.serialization.toUpperCase()), newArrayListWithExpectedSize));
                KafkaDatahubClient.this.responseMessageListener.putCallback(str, datahubCallback);
            }
        });
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String batchCreate(List<T> list) {
        return batchCreate(list, null);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String batchCreate(List<T> list, DatahubCallback datahubCallback) {
        String messageId = MessageUtil.getMessageId();
        batchCreate(messageId, list, datahubCallback);
        return messageId;
    }

    private <T> ResponseMessage batchCreateAndWait(boolean z, List<T> list, long j) {
        String messageId = MessageUtil.getMessageId();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        batchCreate(messageId, list, responseMessage -> {
            linkedBlockingQueue.add(responseMessage);
        });
        return awaitMessage(z, linkedBlockingQueue, j);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage batchCreateAndWait(List<T> list) {
        return batchCreateAndWait(false, list, this.defaultResponseTimeout);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage batchCreateAndWait(List<T> list, long j) {
        return batchCreateAndWait(true, list, j);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> void update(T t) {
        batchUpdate(Lists.newArrayList(new Object[]{t}));
    }

    private <T> void batchUpdate(final String str, List<T> list, final DatahubCallback datahubCallback) {
        batchProcess(list, new BatchCallback<T>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.2
            @Override // com.thebeastshop.datahub.client.impl.KafkaDatahubClient.BatchCallback
            public void call(List<T> list2) {
                if (CollectionUtils.isEmpty(list2)) {
                    return;
                }
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list2.size());
                for (T t : list2) {
                    String id = DatahubBeanUtil.getId(t);
                    if (id == null) {
                        throw new DatahubIdEmpty(t.getClass(), id);
                    }
                    String reviser = DatahubBeanUtil.getReviser(t);
                    if (reviser == null) {
                        throw new DatahubReviserEmpty(t.getClass(), id);
                    }
                    BusinessRecord businessRecord = new BusinessRecord();
                    businessRecord.setId(id);
                    businessRecord.setReviser(reviser);
                    businessRecord.setAppId(KafkaDatahubClient.this.appId);
                    businessRecord.setBusiness(KafkaDatahubClient.this.convertT2Struct(t));
                    newArrayListWithExpectedSize.add(businessRecord);
                }
                KafkaDatahubClient.this.kafkaTemplate.send("datahub", new BusinessMessage(str, KafkaDatahubClient.this.responseMessageListener.getResponseTopicName(), MessageOperationEnum.UPDATE, MessageProtocolEnum.valueOf(KafkaDatahubClient.this.serialization.toUpperCase()), newArrayListWithExpectedSize));
                KafkaDatahubClient.this.responseMessageListener.putCallback(str, datahubCallback);
            }
        });
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String batchUpdate(List<T> list) {
        String messageId = MessageUtil.getMessageId();
        batchUpdate(messageId, list, null);
        return messageId;
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> String batchUpdate(List<T> list, DatahubCallback datahubCallback) {
        String messageId = MessageUtil.getMessageId();
        batchUpdate(messageId, list, datahubCallback);
        return messageId;
    }

    private <T> ResponseMessage batchUpdateAndWait(boolean z, List<T> list, long j) {
        String messageId = MessageUtil.getMessageId();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        batchUpdate(messageId, list, responseMessage -> {
            linkedBlockingQueue.add(responseMessage);
        });
        return awaitMessage(z, linkedBlockingQueue, j);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage batchUpdateAndWait(List<T> list) {
        return batchUpdateAndWait(false, list, this.defaultResponseTimeout);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> ResponseMessage batchUpdateAndWait(List<T> list, long j) {
        return batchUpdateAndWait(true, list, j);
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> T getByDbId(Class<T> cls, String str) {
        return (T) getByDbId(cls, str, 0);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> T getByDbId(Class<T> cls, String str, int i) {
        try {
            String entityUtils = EntityUtils.toString(buildHttpClient().execute(new HttpGet(this.uri + "/" + DatahubBeanUtil.getMetaClass(cls).getName() + "/" + str)).getEntity());
            if (!StringUtils.isNotBlank(entityUtils)) {
                return null;
            }
            Result result = (Result) JSON.parseObject(entityUtils, new TypeReference<Result<BusinessRecord>>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.3
            }, new Feature[0]);
            if (!result.getCode().equals(ResultCodeEnum.OK.code)) {
                this.log.error("[DATAHUB] 根据dbId查询记录api出错: code: {}, message: {}", result.getCode(), result.getMessage());
                throw new DatahubQueryResultException(result);
            }
            T t = null;
            if (result.getData() != null) {
                t = convertStruct2T((BusinessRecord) result.getData(), cls);
            }
            return t;
        } catch (Exception e) {
            if (e instanceof DatahubQueryResultException) {
                throw ((DatahubQueryResultException) e);
            }
            if (e instanceof SocketTimeoutException) {
                this.log.error("[DATAHUB] 查询网络超时", e);
                throw new RuntimeException("[DATAHUB] 查询网络超时");
            }
            if (!(e instanceof NoHttpResponseException)) {
                this.log.error("[DATAHUB] 根据条件查询记录出错：", e);
                throw new DatahubQueryException(e);
            }
            if (i >= HTTP_RETRY_COUNT) {
                throw new RuntimeException("[DATAHUB] 查询时网络出现错误, 重试次数：" + i + ", 原因：" + e);
            }
            this.log.warn("[DATAHUB] 查询时出现网络错误，进行重新查询，重试次数：" + i);
            return (T) getByDbId(cls, str, i + 1);
        }
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public List<AggregateResult> aggregate(Query query, Group... groupArr) {
        return aggregate(query, groupArr, 0);
    }

    public List<AggregateResult> aggregate(Query query, Group[] groupArr, int i) {
        ArrayList newArrayList;
        try {
            String httpPost = httpPost(MetaUtils.getBusinessName(query.getEntityClass()) + "/aggregate", JSON.toJSONString(CriteriaUtils.toAggregateNode(query, groupArr)));
            if (!StringUtils.isNotBlank(httpPost)) {
                return null;
            }
            Result result = (Result) JSON.parseObject(httpPost, new TypeReference<Result<List<AggregationResult>>>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.4
            }, new Feature[0]);
            if (!result.getCode().equals(ResultCodeEnum.OK.code)) {
                this.log.error("[DATAHUB] 根据条件查询记录api出错: code: {}, message: {}", result.getCode(), result.getMessage());
                throw new DatahubQueryResultException(result);
            }
            List<AggregationResult> list = (List) result.getData();
            if (list != null) {
                newArrayList = Lists.newArrayListWithCapacity(list.size());
                for (AggregationResult aggregationResult : list) {
                    AggregateResult aggregateResult = new AggregateResult();
                    BeanUtils.copyProperties(aggregationResult, aggregateResult);
                    newArrayList.add(aggregateResult);
                }
            } else {
                newArrayList = Lists.newArrayList();
            }
            return newArrayList;
        } catch (Throwable th) {
            if (th instanceof DatahubQueryResultException) {
                throw ((DatahubQueryResultException) th);
            }
            if (th instanceof SocketTimeoutException) {
                this.log.error("[DATAHUB] 查询网络超时", th);
                throw new RuntimeException("[DATAHUB] 查询网络超时");
            }
            if (!(th instanceof NoHttpResponseException)) {
                this.log.error("[DATAHUB] 根据条件查询记录出错：", th);
                throw new DatahubQueryException(th);
            }
            if (i >= HTTP_RETRY_COUNT) {
                throw new RuntimeException("[DATAHUB] 查询时网络出现错误，重试次数：" + i + "，错误：" + th);
            }
            int i2 = i + 1;
            this.log.warn("[DATAHUB] 查询时出现网络错误，进行重新查询，重试次数：" + i2);
            return aggregate(query, groupArr, i2);
        }
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public <T> QueryResult<T> find(Query query) {
        return find(query, 0);
    }

    public <T> QueryResult<T> find(Query query, int i) {
        try {
            String httpPost = httpPost(MetaUtils.getBusinessName(query.getEntityClass()) + "/search", JSON.toJSONString(CriteriaUtils.toQueryNode(query)));
            if (!StringUtils.isNotBlank(httpPost)) {
                return null;
            }
            Result result = (Result) JSON.parseObject(httpPost, new TypeReference<Result<QueryResult<BusinessRecord>>>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.5
            }, new Feature[0]);
            if (!result.getCode().equals(ResultCodeEnum.OK.code)) {
                this.log.error("[DATAHUB] 根据条件查询记录api出错: code: {}, message: {}", result.getCode(), result.getMessage());
                throw new DatahubQueryResultException(result);
            }
            ArrayList arrayList = null;
            QueryResult<T> queryResult = new QueryResult<>();
            QueryResult queryResult2 = (QueryResult) result.getData();
            List data = queryResult2.getData();
            if (data != null) {
                arrayList = Lists.newArrayListWithCapacity(data.size());
                Iterator it = data.iterator();
                while (it.hasNext()) {
                    arrayList.add(convertStruct2T((BusinessRecord) it.next(), query.getEntityClass()));
                }
            }
            queryResult.setData(arrayList);
            queryResult.setPage(queryResult2.getPage());
            queryResult.setPageSize(queryResult2.getPageSize());
            queryResult.setNextId(queryResult2.getNextId());
            queryResult.setHasNextPage(queryResult2.getHasNextPage());
            queryResult.setTotalCount(queryResult2.getTotalCount());
            return queryResult;
        } catch (Throwable th) {
            if (th instanceof DatahubQueryResultException) {
                throw ((DatahubQueryResultException) th);
            }
            if (th instanceof SocketTimeoutException) {
                this.log.error("[DATAHUB] 查询网络超时", th);
                throw new RuntimeException("[DATAHUB] 查询网络超时");
            }
            if (!(th instanceof NoHttpResponseException)) {
                this.log.error("[DATAHUB] 根据条件查询记录出错：", th);
                throw new DatahubQueryException(th);
            }
            if (i >= HTTP_RETRY_COUNT) {
                throw new RuntimeException("[DATAHUB] 查询时网络出现错误，重试次数：" + i + "，错误：" + th);
            }
            int i2 = i + 1;
            this.log.warn("[DATAHUB] 查询时出现网络错误，进行重新查询，重试次数：" + i2);
            return find(query, i2);
        }
    }

    @Override // com.thebeastshop.datahub.client.DatahubClient
    public QueryResult<BusinessRecord> findAsRecords(Query query, String str) {
        return findAsRecord(query, str, 0);
    }

    public QueryResult<BusinessRecord> findAsRecord(Query query, String str, int i) {
        try {
            String httpPost = httpPost(str + "/search", JSON.toJSONString(CriteriaUtils.toQueryNode(query)));
            if (StringUtils.isNotBlank(httpPost)) {
                return (QueryResult) ((Result) JSON.parseObject(httpPost, new TypeReference<Result<QueryResult<BusinessRecord>>>() { // from class: com.thebeastshop.datahub.client.impl.KafkaDatahubClient.6
                }, new Feature[0])).getData();
            }
            return null;
        } catch (Throwable th) {
            if (th instanceof DatahubQueryResultException) {
                throw ((DatahubQueryResultException) th);
            }
            if (th instanceof SocketTimeoutException) {
                this.log.error("[DATAHUB] 查询网络超时", th);
                throw new RuntimeException("[DATAHUB] 查询网络超时");
            }
            if (!(th instanceof NoHttpResponseException)) {
                this.log.error("[DATAHUB] 根据条件查询记录出错：", th);
                throw new DatahubQueryException(th);
            }
            if (i >= HTTP_RETRY_COUNT) {
                throw new RuntimeException("[DATAHUB] 查询时网络出现错误，重试次数：" + i + "，错误：" + th);
            }
            int i2 = i + 1;
            this.log.warn("[DATAHUB] 查询时出现网络错误，进行重新查询，重试次数：" + i2);
            return findAsRecord(query, str, i2);
        }
    }

    private String httpPost(String str, String str2) throws IOException {
        HttpClient buildHttpClient = buildHttpClient();
        HttpPost httpPost = new HttpPost(this.uri + "/" + str);
        httpPost.setEntity(new StringEntity(str2, ContentType.APPLICATION_JSON));
        return EntityUtils.toString(buildHttpClient.execute(httpPost).getEntity());
    }

    private String httpGet(String str) throws IOException {
        return EntityUtils.toString(buildHttpClient().execute(new HttpGet(this.uri + "/" + str)).getEntity());
    }
}
