上班第一天,老板就甩来30G文件是种什么体验...( 二 )


线程池执行流程图
由于我们上述线程池设置的核心线程数为 5,很快就到达了最大核心线程数,后续任务只能被加入队列 。
为了后续任务不被线程池拒绝,我们可以采用如下方案:
以上两种方案都存在同样的问题,第一种是相当于将文件所有内容加载到内存,将会占用过多内存 。
而第二种创建过多的线程,同样也会占用过多内存 。
一旦内存占用过多,GC 无法清理,就可能会引起频繁的 「Full GC」,甚至导致 「OOM」,导致程序导入速度过慢 。
当然了,我们还可以第三种方案,综合前两种,设置合适队列长度,以及合适最大线程数 。不过呢,**「合适」**这个度真不好把握,另外也还是有 「OOM」 问题 。
所以为了解决这个问题,研究出两个解决方案:
批量执行
JDK 提供的,可以让主线程等待子线程都执行完成之后,再继续往下执行 。
利用这个特性,我们可以改造多线程导入的代码,主体逻辑如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {// 存储每个任务执行的行数List lines = Lists.newArrayList();// 存储异步任务List tasks = Lists.newArrayList();while (iterator.hasNext()) {String line = iterator.nextLine();lines.add(line);// 设置每个线程执行的行数if (lines.size() == 1000) {// 新建异步任务,注意这里需要创建一个 Listtasks.add(new ConvertTask(Lists.newArrayList(lines)));lines.clear();}if (tasks.size() == 10) {asyncBatchExecuteTask(tasks);}}// 文件读取结束,但是可能还存在未被内容tasks.add(new ConvertTask(Lists.newArrayList(lines)));// 最后再执行一次asyncBatchExecuteTask(tasks);}
这段代码中,每个异步任务将会导入 1000 行数据,等积累了 10 个异步任务,然后将会调用 k 使用线程池异步执行 。
/*** 批量执行任务** @param tasks*/private static void asyncBatchExecuteTask(List tasks) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(tasks.size());for (ConvertTask task : tasks) {task.setCountDownLatch(countDownLatch);executorService.submit(task);}// 主线程等待异步线程 countDownLatch 执行结束countDownLatch.await();// 清空,重新添加任务tasks.clear();}
k 方法内将会创建,然后主线程内调用 await方法等待所有异步线程执行结束 。
异步任务逻辑如下:
/*** 异步任务* 等数据导入完成之后,一定要调用 countDownLatch.countDown()* 不然,这个主线程将会被阻塞,*/private static class ConvertTask implements Runnable {private CountDownLatch countDownLatch;private List lines;public ConvertTask(List lines) {this.lines = lines;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {for (String line : lines) {convertToDB(line);}} finally {countDownLatch.countDown();}}}
任务类逻辑就非常简单,遍历所有行,将其导入到数据库中 。所有数据导入结束,调用 # 。
一旦所有异步线程执行结束,调用 #,主线程将会被唤醒,继续执行文件读取 。
虽然这种方式解决上述问题,但是这种方式,每次都需要积累一定任务数才能开始异步执行所有任务 。
另外每次都需要等待所有任务执行结束之后,才能开始下一批任务,批量执行消耗的时间等于最慢的异步任务消耗的时间 。
这种方式线程池中线程存在一定的闲置时间,那有没有办法一直压榨线程池,让它一直在干活呢?
扩展线程池
回到最开始的问题,文件读取导入,其实就是一个**「生产者-消费者」**消费模型 。
主线程作为生产者不断读取文件,然后将其放置到队列中 。