/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.SyncTask;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskDispatcher {
    @Autowired
    private GlobalConfig partitionConfig;
    @Autowired
    private DataSyncer dataSyncer;
    private List<TaskScheduler> taskSchedulerList = new ArrayList<TaskScheduler>();
    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        for (int i = 0; i < this.cpuCoreCount; ++i) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            this.taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        this.taskSchedulerList.get(UtilsAndCommons.shakeUp(key, this.cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler
    implements Runnable {
        private int index;
        private int dataSize = 0;
        private long lastDispatchTime = 0L;
        private BlockingQueue<String> queue = new LinkedBlockingQueue<String>(131072);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            this.queue.offer(key);
        }

        public int getIndex() {
            return this.index;
        }

        @Override
        public void run() {
            ArrayList<String> keys = new ArrayList<String>();
            while (true) {
                try {
                    while (true) {
                        String key = this.queue.poll(TaskDispatcher.this.partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS);
                        if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank((CharSequence)key)) {
                            Loggers.DISTRO.debug("got key: {}", (Object)key);
                        }
                        if (TaskDispatcher.this.dataSyncer.getServers() == null || TaskDispatcher.this.dataSyncer.getServers().isEmpty() || StringUtils.isBlank((CharSequence)key)) continue;
                        if (this.dataSize == 0) {
                            keys = new ArrayList();
                        }
                        keys.add(key);
                        ++this.dataSize;
                        if (this.dataSize != TaskDispatcher.this.partitionConfig.getBatchSyncKeyCount() && System.currentTimeMillis() - this.lastDispatchTime <= (long)TaskDispatcher.this.partitionConfig.getTaskDispatchPeriod()) continue;
                        for (Member member : TaskDispatcher.this.dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getAddress())) continue;
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getAddress());
                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank((CharSequence)key)) {
                                Loggers.DISTRO.debug("add sync task: {}", (Object)JacksonUtils.toJson((Object)syncTask));
                            }
                            TaskDispatcher.this.dataSyncer.submit(syncTask, 0L);
                        }
                        this.lastDispatchTime = System.currentTimeMillis();
                        this.dataSize = 0;
                    }
                }
                catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", (Throwable)e);
                    continue;
                }
                break;
            }
        }
    }
}

