Asynchronous File Processing with CompletableFuture

Welcome to the course on Mastering Java Concurrency with Practice! In this course, we’ll explore how to apply concurrency utilities such as Semaphores, Phasers, CompletableFuture, and more to real-world problems. Each unit will introduce a problem scenario, walk through its solution using concurrency tools, and follow up with similar practice problems to reinforce your understanding.

In this unit, we focus on Asynchronous File Processing using CompletableFuture. This lesson will build on the concurrency concepts you’ve already learned, showing you how to handle multiple file operations asynchronously and keep your programs efficient and responsive.

What You’ll Learn

In this unit, you’ll learn how to:

  • Apply CompletableFuture to handle asynchronous tasks.
  • Manage exceptions in an asynchronous environment.
  • Combine multiple asynchronous operations.
  • Aggregate results from concurrent file tasks efficiently.

By now, you should be comfortable with basic threading concepts, so we’ll dive deeper into how asynchronous file processing works in practice using CompletableFuture.

Recap: CompletableFuture and Its Role in Asynchronous Programming

In a previous lesson, you learned about CompletableFuture, a versatile class in Java’s java.util.concurrent package that simplifies asynchronous programming. It allows you to handle tasks without blocking the main thread, combine multiple asynchronous computations, handle exceptions gracefully, and chain operations together.

As a quick recap:

  • supplyAsync() runs a task asynchronously and returns a CompletableFuture representing the future result of that task.
  • thenApply() and thenAccept() allow you to process the result of a completed task.
  • exceptionally() provides a way to handle exceptions in an asynchronous pipeline.

In this lesson, we’ll use CompletableFuture to process a series of text files asynchronously. Each file will be processed on a separate thread, and once all files have been processed, we’ll aggregate the results to get a total word count. This approach will allow us to process multiple files simultaneously, improving efficiency and responsiveness, particularly when dealing with large files.

Now, let’s see how to set up the AsynchronousFileProcessing class and its methods.

Creating the AsynchronousFileProcessing Class

We’ll start by creating a class that manages asynchronous file processing. This class will have an ExecutorService to handle the threads and methods to process files asynchronously.

This class will include methods for counting words in a file, processing files asynchronously, and aggregating the results.

Using Stream API

Since we’ll be using Java Streams in this lesson to count words in files, let’s quickly recap the basics. Streams provide a functional way to process collections of data, offering a series of methods to transform, filter, and aggregate elements. In this case, we’ll use a stream to read file lines, split them into words, and count them. Streams make it easy to process large datasets in a concise and readable way, while leveraging the power of functional programming.

Here are some of the methods used and what they do:

  • Files.lines(): Reads all lines from a file and returns them as a Stream<String>.
  • Stream.flatMap(): Flattens a stream of streams into a single stream, in this case, breaking each line into words.
  • Stream.of(): Creates a stream from the array of words after splitting a line.
  • Stream.filter(): Filters out empty words, ensuring only valid ones are counted.
  • Stream.count(): Aggregates the results by counting the total number of elements in the stream (in this case, the total number of words).

If you want to explore more about functional programming concepts, you can learn more in the "Functional Programming in Java" Path here (Opens in the same tab by default)

Counting Words in Each File

Next, we need a method to count the words in a file. This method reads the file line by line, splits each line into words, filters out empty words, and counts the total. If an error occurs while reading the file, we throw a runtime exception with a detailed error message.

Now that we can process each file, we’ll define how to handle these tasks asynchronously.

Processing Files Asynchronously

We’ll create the processFileAsync method, which counts the number of words in the file asynchronously. This method will handle errors that may occur during the processing.

This method processes the file asynchronously using supplyAsync(). In case of errors, we handle them using exceptionally().

Aggregating Results from Multiple Files

Once all files are processed, we need to aggregate the results to calculate the total word count. We can combine the CompletableFuture instances from each file and aggregate their results.

This method waits for all the CompletableFuture instances to complete and then sums up the word counts using join(). Note how the reduce(0L, Long::sum) operation aggregates the word counts from all CompletableFuture instances by starting with an initial value of 0L (zero). It then sums each word count using Long::sum, effectively computing the total word count across all files.

Next, we’ll see how this class is used in the Main class to process the files.

Configuring the ExecutorService

Before we start processing the files, we’ll need to set up the ExecutorService, which manages the threads that will process the files.

This code creates a thread pool of 4 threads to process the files.

Processing Files in the Main Class

Here’s how we initiate the asynchronous file processing in the Main class:

This code processes the files asynchronously and prints the total word count after all tasks are completed.

Handling Exceptions and Shutting Down the Executor

We also need to ensure proper cleanup by handling any exceptions and shutting down the executor service.

If an error occurs, it is caught and logged, and then we proceed to shut down the executor service.

Shutting Down the Executor Service

Finally, we ensure the executor service shuts down properly to avoid resource leaks:

This method shuts down the executor service gracefully, and if it doesn’t shut down within 10 seconds, it forces a shutdown.

Why It Matters

Asynchronous file processing is a powerful technique for modern applications that deal with large files or datasets. By processing files concurrently, you optimize resource usage and make your applications more efficient. This is especially valuable in systems such as web servers, data analysis applications, or file-processing services, where responsiveness is critical.

With CompletableFuture, you can design systems that remain responsive even under heavy workloads.

Now that you’ve understood how asynchronous file processing works, let’s move to the practice section to see how this code performs in action and apply similar concepts in real-world scenarios.

Sign up
Join the 1M+ learners on CodeSignal
Be a part of our community of 1M+ users who develop and demonstrate their skills on CodeSignal