Combining BukkitScheduler with Google Guava and libraries expecting a ScheduledServiceExecutor

Discussion in 'Resources' started by Comphenix, May 7, 2013.

Thread Status:
Not open for further replies.
  1. Offline

    Comphenix

    Ever since Java 1.5, the standard abstraction for scheduling a bunch of tasks is the Executor interface in java.util.concurrent - along with its two derived interfaces ExecutorService and ScheduledExecutorService that define additional functionality. Many libraries are able to work (or expect) with implementations of these interfaces, and in particular, Google Guava defines a very fluent and elegant system for "chaining" the output of scheduled tasks.

    With the help of ListenableFuture, you can track multiple asynchronous threads and apply some action on the output on a different thread (say the main thread), while also handling failures:
    Code:java
    1. ListenableFuture<Integer> first = async.submit(new Callable<Integer>() {
    2. @Override
    3. public Integer call() throws Exception {
    4. // This could do anything - fetching the result of an SQL query, downloading an URL or
    5. // perhaps do some number crunching.
    6. Thread.sleep(5000);
    7. return 1;
    8. }
    9. });
    10.  
    11. ListenableFuture<Integer> second = async.submit(new Callable<Integer>() {
    12. @Override
    13. public Integer call() throws Exception {
    14. Thread.sleep(5000);
    15. return 2;
    16. }
    17. });
    18.  
    19. Futures.addCallback(Futures.allAsList(first, second), new FutureCallback<List<Integer>>() {
    20. @Override
    21. public void onSuccess(List<Integer> values) {
    22. int sum = 0;
    23.  
    24. for (int value : values)
    25. sum += value;
    26. Bukkit.broadcastMessage("Result: " + sum);
    27. }
    28.  
    29. @Override
    30. public void onFailure(Throwable ex) {
    31. // Invoked if either future fails
    32. getLogger().log(Level.SEVERE, "Failure in async computation.", ex);
    33. }
    34. });

    That's just a small fraction of all the useful helper method in Futures, and I find it really simplifies and cleans up asynchronous handling without sacrificing performance.

    Unfortunately, while Bukkit does include Google Guava and its future library, it is far from compatible. BukkitScheduler doesn't implement any of the standard interfaces in java.util.concurrent, and only callSyncMethod - which only works for calling methods on the main thread - actually return a future.

    Of course, you can always create your own scheduled executor service, but that will waste some resources if you only plan on using it occasionally. It would be much better to reuse threads already allocated by Bukkit - and, you still have to schedule a synchronous task in order to process the output on the main thread (say, setting some blocks).

    Enter BukkitExecutors (download). It is a relatively small plugin that wraps BukkitScheduler with the standard interface ScheduledExecutorService (along with ListeningExecutorService). With that, you can implement the above in Bukkit:
    Code:java
    1. public class ExampleMod extends JavaPlugin implements Listener {
    2. private final BukkitScheduledExecutorService sync = BukkitExecutors.newSynchronous(this);
    3. private final BukkitScheduledExecutorService async = BukkitExecutors.newAsynchronous(this);
    4.  
    5. @SuppressWarnings("unchecked")
    6. @Override
    7. public boolean onCommand(CommandSender sender, Command command, String label, String[] args) {
    8. ListenableFuture<Integer> first = async.submit(new Callable<Integer>() {
    9. @Override
    10. public Integer call() throws Exception {
    11. Thread.sleep(5000);
    12. return 1;
    13. }
    14. });
    15.  
    16. ListenableFuture<Integer> second = async.submit(new Callable<Integer>() {
    17. @Override
    18. public Integer call() throws Exception {
    19. Thread.sleep(5000);
    20. return 2;
    21. }
    22. });
    23.  
    24. // Note that this callback is executed on the main thread
    25. Futures.addCallback(Futures.allAsList(first, second), new FutureCallback<List<Integer>>() {
    26. @Override
    27. public void onSuccess(List<Integer> values) {
    28. int sum = 0;
    29.  
    30. for (int value : values)
    31. sum += value;
    32. Bukkit.broadcastMessage("Result: " + sum);
    33. }
    34.  
    35. @Override
    36. public void onFailure(Throwable ex) {
    37. getLogger().log(Level.SEVERE, "Failure in async computation.", ex);
    38. }
    39. }, sync);
    40. return true;
    41. }
    42. }

    You can also wait for events:
    Code:java
    1. public class ExampleMod extends JavaPlugin implements Listener {
    2. private final BukkitScheduledExecutorService sync = BukkitExecutors.newSynchronous(this);
    3.  
    4. @Override
    5. public boolean onCommand(CommandSender sender, Command command, String label, String[] args) {
    6. ListenableFuture<AsyncPlayerChatEvent> chat = BukkitFutures.nextEvent(this, AsyncPlayerChatEvent.class);
    7.  
    8. Futures.addCallback(chat, new FutureCallback<AsyncPlayerChatEvent>() {
    9. @Override
    10. public void onSuccess(AsyncPlayerChatEvent event) {
    11. // This will only be executed for the next async chat event
    12. Bukkit.broadcastMessage(event.getMessage());
    13. }
    14.  
    15. @Override
    16. public void onFailure(Throwable ex) {
    17. // If the server has been cancelled
    18. }
    19. }, sync);
    20. return true;
    21. }
    22. }
     
  2. Offline

    Wolvereness Bukkit Team Member

    I like the concept here... However, I would like to point out that server.broadcastMessage(String) is not thread-safe.
     
    chaseoes likes this.
  3. Offline

    Comphenix

    Yup, it checks the permissions for every potential recipient, which isn't thread safe.

    But in fact, the overload of Futures.addCallback I use in both examples accepts an executor as its third parameter, which will be used to execute the callback. I pass in sync, a wrapper around BukkitScheduled.runTask(), so the entire callback will be run on the main thread. It's actually thread safe, and very easy to implement.

    Essentially, it's (about) equivalent to the following mess:
    Code:java
    1. final BukkitScheduler scheduler = getServer().getScheduler();
    2. final JavaPlugin plugin = this;
    3.  
    4. final List<Integer> results = new ArrayList<Integer>();
    5. final List<Exception> exceptions = new ArrayList<Exception>();
    6.  
    7. scheduler.runTaskAsynchronously(plugin, new Runnable() {
    8. @Override
    9. public void run() {
    10. final Object lock = new Object();
    11.  
    12. // Start thread #2
    13. scheduler.runTaskAsynchronously(plugin, new Runnable() {
    14. @Override
    15. public void run() {
    16. try {
    17. Thread.sleep(5000);
    18.  
    19. synchronized (lock) {
    20. results.add(1);
    21. lock.notifyAll();
    22. }
    23. } catch (Exception e) {
    24. synchronized (lock) {
    25. exceptions.add(e);
    26. lock.notifyAll();
    27. }
    28. }
    29. }
    30. });
    31.  
    32. // Thread #1
    33. try {
    34. Thread.sleep(5000);
    35.  
    36. synchronized (lock) {
    37. results.add(1);
    38. // Wait for #2
    39. lock.wait();
    40. }
    41. } catch (Exception e) {
    42. synchronized (lock) {
    43. exceptions.add(e);
    44. }
    45. }
    46.  
    47. scheduler.runTask(plugin, new Runnable() {
    48. @Override
    49. public void run() {
    50. // TODO Auto-generated method stub
    51. if (results.size() == 2) {
    52. int sum = 0;
    53.  
    54. for (int value : results)
    55. sum += value;
    56. Bukkit.broadcastMessage("Result: " + sum);
    57.  
    58. } else {
    59. getLogger().log(Level.SEVERE, "Failure in async computation.", exceptions.get(0));
    60. }
    61. }
    62. });
    63. }
    64. });

    It's doable for two threads, but you will have to use a more general method for three or more threads. There's a bit of noise, and you would probably want to break it up into multiple methods. It's also easy to make a mistake - I almost forgot to call notifyAll() in the catch block, which would have resulted in thread #1 waiting forever. Concurrency is hard. :p
     
  4. Offline

    TheE

    I hope you do not mind if I push this.

    I really like the idea here. Bukkit's current multi threading API is just a pain to work with if things get a little more complicated. And while it is relatively easy (and - depending on the task - even better) to handle all 'async' tasks with a custom ExecutionService, the lack of implementing standards interfaces out of java.util.concurrent for all sync tasks make it impossible to use advanced features such as those provided by Guava that can speed up and simplify multithreaded programming a lot - unless, of course, you provide the wrapper yourself with is a shitty fix for a public API.
     
    Comphenix likes this.
  5. Offline

    Comphenix

    Thanks,

    Yeah, there should be a really compelling reason for choosing a custom API over the equivalent standard interfaces, and I don't think there were in this case. True, it's a bit more effort to implement, and each plugin would have to hold its own ScheduledExecutorService due to the existence of ExecutorService.shutdown(), but in return you get compatibility with libraries such as Guava, and make it easier (and more powerful) for API users.

    Still, we're stuck with the current BukkitScheduler, so might as well write a wrapper around it. Of course, there are still a couple of problems: one, I know there might be some performance issues when scheduling hundreds of short-lived tasks a second, due to the need for shutdown() to work for every scheduled task as per the specification. Perhaps I could relax this a bit, and ignore the fate of "executed" tasks. Another issue is scheduledWithFixedDelay. Since both asynchronous and synchronous executors are executed at a fixed rate only, there's really no easy way to implement it in the wrapper. I've deprecated its use, for now.

    But overall, I think it's a useful and powerful abstraction, especially when coupled with Guava.
     
  6. Offline

    Cirno

    Jeeze, whenever you post something, it amazes me and makes my brain mush.
    Thanks for the resource; I might use it later :p
     
Thread Status:
Not open for further replies.

Share This Page