Java分页、集合拆分、分批多线程执行工具类SplitUtil
2024-05-09 16:08
Java
阅78
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 分批工具类
*/
public class SplitUtil {
/**
* 集合转成ArrayList
*
* @param collection
* @param <T>
* @return
*/
public static <T> List<T> toArrayList(Collection<T> collection) {
return collection == null ? new ArrayList<>() : new ArrayList<>(collection);
}
/**
* 对List进行截取即subList,返回ArrayList
*
* @param list
* @param fromIndex
* @param toIndex
* @param <T>
* @return
*/
public static <T> List<T> subList(List<T> list, int fromIndex, int toIndex) {
fromIndex = Math.max(fromIndex, 0);
toIndex = Math.max(toIndex, 0);
if (fromIndex > toIndex) {
throw new IllegalArgumentException("toIndex must be greater than fromIndex");
}
return list.stream().skip(fromIndex).limit(toIndex - fromIndex).collect(Collectors.toList());
}
/**
* 对List进行分页(页码从1开始),返回ArrayList
*
* @param list
* @param pageNum
* @param pageSize
* @param <T>
* @return
*/
public static <T> List<T> pageList(List<T> list, int pageNum, int pageSize) {
if (pageNum <= 0 || pageSize <= 0) {
throw new IllegalArgumentException("pageNum or pageSize must be greater than 0");
}
int fromIndex = (pageNum - 1) * pageSize;
int toIndex = fromIndex + pageSize;
return subList(list, fromIndex, toIndex);
}
/**
* 对集合进行拆分,返回ArrayList
*
* @param collection 原始集合
* @param size 子集合长度
* @param <T>
* @return
*/
public static <T> List<List<T>> splitList(Collection<T> collection, int size) {
if (size <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
}
List<T> list = toArrayList(collection);
//计算可以拆分成几个list
int count = (int) Math.ceil(list.size() / (double) size);
//使用流进行拆分
return Stream.iterate(1, n -> n + 1).limit(count)
.map(i -> pageList(list, i, size)).collect(Collectors.toList());
}
/**
* 分批执行任务(同步)
*
* @param collection
* @param size
* @param consumer
* @param <T>
*/
public static <T> void splitRun(Collection<T> collection, int size, Consumer<List<T>> consumer) {
List<List<T>> list = splitList(collection, size);
for (List<T> part : list) {
consumer.accept(part);
}
}
/**
* 分批执行任务(异步多线程)
*
* @param collection
* @param size
* @param consumer
* @param <T>
*/
public static <T> void splitRunAsync(Collection<T> collection, int size, Consumer<List<T>> consumer) {
splitRunAsync(collection, size, consumer, null);
}
/**
* 分批执行任务(异步多线程)
*
* @param collection
* @param size
* @param consumer
* @param executor
* @param <T>
*/
public static <T> void splitRunAsync(Collection<T> collection, int size, Consumer<List<T>> consumer, Executor executor) {
List<List<T>> list = splitList(collection, size);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (List<T> part : list) {
futureList.add(runAsync(() -> consumer.accept(part), executor));
}
//等待所有任务完成
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
}
/**
* 异步执行
*
* @param runnable
* @return
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return runAsync(runnable, null);
}
/**
* 异步执行
*
* @param runnable
* @param executor
* @return
*/
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
if (executor == null) {
//默认使用通用的executor
executor = COMMON_EXECUTOR;
}
return CompletableFuture.runAsync(runnable, executor);
}
/**
* 通用线程池
*/
public static final ThreadPoolExecutor COMMON_EXECUTOR = new ThreadPoolExecutor(
4, 8,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(32),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}