Java分页、集合拆分、分批多线程执行工具类SplitUtil

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 分批工具类
 */
public class SplitUtil {

    /**
     * 集合转成ArrayList
     *
     * @param collection
     * @param <T>
     * @return
     */
    public static <T> List<T> toArrayList(Collection<T> collection) {
        return collection == null ? new ArrayList<>() : new ArrayList<>(collection);
    }

    /**
     * 对List进行截取即subList,返回ArrayList
     *
     * @param list
     * @param fromIndex
     * @param toIndex
     * @param <T>
     * @return
     */
    public static <T> List<T> subList(List<T> list, int fromIndex, int toIndex) {
        fromIndex = Math.max(fromIndex, 0);
        toIndex = Math.max(toIndex, 0);
        if (fromIndex > toIndex) {
            throw new IllegalArgumentException("toIndex must be greater than fromIndex");
        }
        return list.stream().skip(fromIndex).limit(toIndex - fromIndex).collect(Collectors.toList());
    }

    /**
     * 对List进行分页(页码从1开始),返回ArrayList
     *
     * @param list
     * @param pageNum
     * @param pageSize
     * @param <T>
     * @return
     */
    public static <T> List<T> pageList(List<T> list, int pageNum, int pageSize) {
        if (pageNum <= 0 || pageSize <= 0) {
            throw new IllegalArgumentException("pageNum or pageSize must be greater than 0");
        }
        int fromIndex = (pageNum - 1) * pageSize;
        int toIndex = fromIndex + pageSize;
        return subList(list, fromIndex, toIndex);
    }

    /**
     * 对集合进行拆分,返回ArrayList
     *
     * @param collection 原始集合
     * @param size       子集合长度
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> splitList(Collection<T> collection, int size) {
        if (size <= 0) {
            throw new IllegalArgumentException("size must be greater than 0");
        }
        List<T> list = toArrayList(collection);
        //计算可以拆分成几个list
        int count = (int) Math.ceil(list.size() / (double) size);
        //使用流进行拆分
        return Stream.iterate(1, n -> n + 1).limit(count)
                .map(i -> pageList(list, i, size)).collect(Collectors.toList());
    }

    /**
     * 分批执行任务(同步)
     *
     * @param collection
     * @param size
     * @param consumer
     * @param <T>
     */
    public static <T> void splitRun(Collection<T> collection, int size, Consumer<List<T>> consumer) {
        List<List<T>> list = splitList(collection, size);
        for (List<T> part : list) {
            consumer.accept(part);
        }
    }

    /**
     * 分批执行任务(异步多线程)
     *
     * @param collection
     * @param size
     * @param consumer
     * @param <T>
     */
    public static <T> void splitRunAsync(Collection<T> collection, int size, Consumer<List<T>> consumer) {
        splitRunAsync(collection, size, consumer, null);
    }

    /**
     * 分批执行任务(异步多线程)
     *
     * @param collection
     * @param size
     * @param consumer
     * @param executor
     * @param <T>
     */
    public static <T> void splitRunAsync(Collection<T> collection, int size, Consumer<List<T>> consumer, Executor executor) {
        List<List<T>> list = splitList(collection, size);
        List<CompletableFuture<Void>> futureList = new ArrayList<>();
        for (List<T> part : list) {
            futureList.add(runAsync(() -> consumer.accept(part), executor));
        }
        //等待所有任务完成
        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
    }

    /**
     * 异步执行
     *
     * @param runnable
     * @return
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return runAsync(runnable, null);
    }

    /**
     * 异步执行
     *
     * @param runnable
     * @param executor
     * @return
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
        if (executor == null) {
            //默认使用通用的executor
            executor = COMMON_EXECUTOR;
        }
        return CompletableFuture.runAsync(runnable, executor);
    }

    /**
     * 通用线程池
     */
    public static final ThreadPoolExecutor COMMON_EXECUTOR = new ThreadPoolExecutor(
            4, 8,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(32),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

}

觉得内容还不错?打赏个钢镚鼓励鼓励!!👍