Java long running task

Handling Long-Running Tasks

Often, server-side tasks can take a long time to complete. For example, a task that requires fetching a large amount of data from the database can take a long time to finish. In such cases, a poorly designed application can freeze the UI and prevent the user from interacting with the application.

This guide shows you how to handle long-running tasks in Vaadin applications in a way that:

  1. Allows the user to continue interacting with the application while the task is running, possibly allowing the user to cancel the task.
  2. Allows the user to see the progress of the task.

The examples used here are based on Spring Boot, but a similar approach can be used with other technologies.

The Problem with Long-Running Tasks

To illustrate how a poorly designed handling of long-running tasks can affect users, consider the following back-end service that simulates a long-running task.

@Service public class BackendService < public String longRunningTask() < try < // Simulate a long running task Thread.sleep(6000); >catch (InterruptedException e) < e.printStackTrace(); >return "Some result"; > >

Consider also that the BackendService.longRunningTask() method is being called from the following Vaadin UI:

@Route("") public class MainView extends VerticalLayout < public MainView(BackendService backendService) < Button startButton = new Button("Start long-running task", clickEvent ->< String result = backendService.longRunningTask(); Notification.show(result); >); Button isBlockedButton = new Button("Is UI blocked?", clickEvent -> < Notification.show("UI isn't blocked!"); >); add(startButton, isBlockedButton); > >

In this example, if the user clicks the «Start long-running task» button, the application freezes the UI and prevent the user from interacting with the other parts of the application (for example, with the isBlockedButton ). This happens because Vaadin is waiting for the long-running task to finish before it sends the response back to the user, at which point the user can continue interacting with the application.

Читайте также:  Double to int in cpp

Handling Long-Running Tasks Asynchronously

The recommended way to handle long-running tasks is to use an asynchronous approach. This means that the long-running task is executed in a separate thread, and the UI isn’t blocked while the task is running.

An asynchronous model can be achieved in several ways. But in the context of a Spring Boot application, one way is to use the @Async annotation. The @Async annotation is used to mark a method as an asynchronous task. From the Vaadin UI side, a response to the user request is immediately sent back to the browser, and thus the user can continue interacting with the application right away without being blocked by the long-running task. When the asynchronous task is finished, Vaadin uses Server Push to let the user know that the task is completed.

The following example shows how the BackendService.longRunningTask() method can be adjusted to run asynchronously in a separate thread.

@Service public class BackendService < @Async // (1) public ListenableFuturelongRunningTask() < // (2) try < // Simulate a long running task Thread.sleep(6000); >catch (InterruptedException e) < e.printStackTrace(); >return AsyncResult.forValue("Some result"); // (3) > >
  1. @Async annotation to mark the method for asynchronous execution.
  2. The method now returns a ListenableFuture object.
  3. The method’s return value is a ListenableFuture object that contains the result of the asynchronous task.

Now the BackendService.longRunningTask() method is annotated with the @Async annotation, and the long-running task is executed in a separate thread. The BackendService.longRunningTask() method now returns a ListenableFuture instead of a String (returning a ListenableFuture or a CompletableFuture is a requirement for any asynchronous service). The ListenableFuture is a special type of Future that allows the caller to register a callback to be notified when the task is completed.

With these changes in place, you can change the UI to allow the user to start the long-running task and still be able to interact with the application. Vaadin can then use the ListenableFuture and the UI.access() method of Server Push to notify the user when the task is completed. This is how MainView.java could look now:

@Route("") public class MainView extends VerticalLayout < public MainView(BackendService backendService) < Button startButton = new Button("Start long-running task", clickEvent -> < UI ui = clickEvent.getSource().getUI().orElseThrow(); // (1) ListenableFuturefuture = backendService.longRunningTask(); future.addCallback( successResult -> updateUi(ui, "Task finished: " + successResult), // (2) failureException -> updateUi(ui, "Task failed: " + failureException.getMessage()) // (3) ); >); Button isBlockedButton = new Button("Is UI blocked?", clickEvent -> < Notification.show("UI isn't blocked!"); >); add(startButton, isBlockedButton); > private void updateUi(UI ui, String result) < // (4) ui.access(() ->< Notification.show(result); >); > >
  1. Save the current UI in a local variable, so that you can use it later to update the UI through the UI.access() method.
  2. The callback is called when the task is completed successfully.
  3. The callback is called if the task failed.
  4. The UI.access() method is used to update the UI in a thread-safe manner through server-side push.

You’re still not done. For the above example to work as intended, you need two extra annotations for the @Async annotation and the UI.access() method to work.

  • For the @Async annotation, you need to add the @EnableAsync annotation to the application.
  • For the UI.access() method, you need to add the @Push annotation to the class implementing the AppShellConfigurator interface.

You can make both changes in the same class as illustrated in the following Application class (which both extends SpringBootServletInitializer and implements AppShellConfigurator ):

Источник

Spring Boot REST API for Long-Running Tasks

Learn to design and implement a Spring Boot REST API that accepts long-running tasks and publishes the execution progress to a Kafka topic.

A long-running task is an operation that requires a considerable amount of server resources and/or time. To avoid blocking the client, the task must be completed asynchronously without the persistent connection between the client and the server. After submitting the task, the client needs to poll to a provided URL for the task-execution progress.

In this tutorial, we will create REST APIs with Spring Boot that will perform the followings:

  • Accept the API request and return the 202 (Accepted) status code along with a URL to poll for task-execution status in the Location header. The URL is a Apache Kafka topic polling URL.
  • Publish the task execution status to the Kafka topic.
  • Poll the Kafka topic to get the current status of the task.

2. Designing REST API for Long-Running Tasks

We will create a REST API that accepts a long-running task in the POST URL:

HTTP POST: http://localhost:9000/tasks

It returns an empty response with the Location header containing the Kafka topic URL. A new Kafka topic is created programmatically for each submitted task so its result can be polled without affecting the polling for results of other tasks.

In the following response sample, eaa98908-0115-4cd7-8884-0b2155f7811e is a dynamically generated identifier and it is the topic name. The client can, itself, poll the Kafka topic or it can invoke the provided API URL that will fetch the task status.

HTTP 201 Location: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress

When the client invokes the API, it polls the Kafka topic and returns the current status of the long-running task. A sample response body can be:

HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress

Once the task is finished, it can either provide the task execution result in the same response, or it can provide another URL that will return the task execution result.

HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress < "taskId": "eaa98908-0115-4cd7-8884-0b2155f7811e", "taskName": "test-task", "percentageComplete": 100.0, "status": "FINISHED", "resultUrl": "http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/result" >HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/result < "taskId": "eaa98908-0115-4cd7-8884-0b2155f7811e", "taskName": "test-task", "status": "FINISHED", "sys-log-location":"/log/. ", "err-log-location":"/log/. " >

3. Implementation of Long-running REST API with Spring Boot

Let us implement the above-discussed API design in a Spring boot application.

The models consist of three classes: TaskRequest, TaskResponse and TaskStatus. We can add/modify the fields according to application requirements.

public class TaskRequest < private String name; >public class TaskResponse implements Serializable < private String taskId; private String name; >public class TaskStatus implements Serializable < private String taskId; private String taskName; private float percentageComplete; private Status status; private String resultUrl; public enum Status < SUBMITTED, STARTED, RUNNING, FINISHED, TERMINATED >> 

We are writing a very basic RESt API that accepts the task request and returns the progress URL in Location header.

@Autowired TaskService taskService; @PostMapping public ResponseEntity processAsync(@RequestBody TaskRequest taskRequest, UriComponentsBuilder b) < String taskId = UUID.randomUUID().toString(); UriComponents progressURL = b.path("/tasks//progress").buildAndExpand(taskId); taskService.process(taskId, taskRequest, b); return ResponseEntity.accepted().location(progressURL.toUri()).build(); >

The TaskService uses a @Async method to process the task asynchronously. It first creates a new Kafka topic so it can publish the progress. Then it processes the task and regularly posts the task progress status to the Topic. We are using Thread.currentThread().sleep() to simulate the task processing time.

Also, notice that we are using the KafkaAdmin to create new topics. After we create a new Kafka topic, KafkaConsumer.subscribe() to subscribe to the topic so the application can later poll to the topic.

@Log @Service public class TaskService < @Autowired KafkaConsumerkafkaConsumer; @Autowired KafkaProducerService kafkaProducerService; @Autowired KafkaAdmin kafkaAdmin; final static Logger LOGGER = LoggerFactory.getLogger(TaskService.class); @Async public void process(String taskId, TaskRequest taskRequest, UriComponentsBuilder b) < try < createNewTopic(taskId); updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 0.0f, Status.SUBMITTED, null)); Thread.currentThread().sleep(2000l); updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 10.0f, Status.STARTED, null)); Thread.currentThread().sleep(5000l); updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 50.0f, Status.RUNNING, null)); UriComponents resultURL = b.path("/tasks//result").buildAndExpand(taskId); Thread.currentThread().sleep(5000l); updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.FINISHED, resultURL.toUriString())); > catch (InterruptedException | ExecutionException e) < updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.TERMINATED, null)); throw new RuntimeException(e); >> private void createNewTopic(String topicName) throws ExecutionException, InterruptedException < MaptopicConfig = new HashMap<>(); topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig); try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) < adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); >kafkaConsumer.subscribe(Collections.singletonList(topicName)); > void updateTaskExecutionProgess(TaskStatus taskStatus) < kafkaProducerService.send(taskStatus.getTaskId(), taskStatus.getTaskId(), taskStatus); >>

3.3. Publishing the Task Execution Progress

Notice that we are using the KafkaProducerService.send() to send the progress status to the topic. It internally uses the default configured KafkaTemplate to publish new messages.

@Service @Log public class KafkaProducerService < private final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerService.class); @Autowired KafkaTemplatekafkaTemplate; public void send(String topicName, String key, TaskStatus value) < var future = kafkaTemplate.send(topicName, key, value); future.whenComplete((sendResult, exception) -> < if (exception != null) < future.completeExceptionally(exception); >else < future.complete(sendResult); >LOGGER.info("Task status send to Kafka topic : "+ value); >); > >

3.3. Polling the Task Execution Status

To get the current status of the long-running task, we can invoke the API URL in the Location header. It uses the KafkaConsumerService.getLatestTaskStatus() method to get the latest status published to the topic.

@Autowired KafkaConsumerService kafkaConsumerService; @GetMapping("/progress") public ResponseEntity processAsync(@PathVariable String taskId) < TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId); if (taskStatus == null) < return ResponseEntity.noContent().build(); >return ResponseEntity.ok().body(taskStatus); >

The KafkaConsumerService uses the KafkaConsumer bean and polls the last message published to the topic.

@Service @Log public class KafkaConsumerService < private final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class); @Autowired KafkaConsumerkafkaConsumer; public TaskStatus getLatestTaskStatus(String taskId) < ConsumerRecordlatestUpdate = null; ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); if (!consumerRecords.isEmpty()) < Iterator itr = consumerRecords.records(taskId).iterator(); while(itr.hasNext()) < latestUpdate = (ConsumerRecord) itr.next(); > LOGGER.info("Latest updated status : "+ latestUpdate.value()); > return latestUpdate != null ? latestUpdate.value() : null; > >

We are using the TaskStatus class for publishing the message body. Kafka, by default, supports String keys and values. To support custom message types, we need to define beans of the type ProducerFactory and ConsumerFactory as follows. It uses the JsonDeserializer and JsonSerializer for serializing and deserializing the TaskStatus instances.

@Configuration public class KafkaConfig < @Autowired KafkaProperties kafkaProperties; @Bean public KafkaAdmin kafkaAdmin() < KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties()); kafkaAdmin.setFatalIfBrokerNotAvailable(kafkaProperties.getAdmin().isFailFast()); return kafkaAdmin; >@Bean public ConsumerFactory consumerFactory() < MapconfigProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "task-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(configProps); > @Bean public KafkaConsumer kafkaConsumer() < return (KafkaConsumer) consumerFactory().createConsumer(); > @Bean public ProducerFactory producerFactory() < Mapprops = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(props); > @Bean public KafkaTemplate kafkaTemplate() < var kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setConsumerFactory(consumerFactory()); return kafkaTemplate; > >

Let us submit a new task using the POST API http://localhost:9000/tasks

Submit a new long running task

We can hit the API after some time to check the final status of the task.

In this Spring boot tutorial, we learned to create a REST API that accepts long-running tasks and publishes the execution progress/status in a Kafka topic. We also learned to configure Kafka beans to support the publishing and polling of messages containing the task progress.

Источник

Оцените статью