1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
@Slf4j public class LeaderSelectorParticipant extends LeaderSelectorListenerAdapter implements Closeable { private final String name; private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorParticipant(CuratorFramework client, String path, String name) { this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue(); }
public void start() { leaderSelector.start(); }
@Override public void close() { leaderSelector.close(); }
@Override public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = 5 * new Random().nextInt(1) + 1;
log.info("{} is now the leader. Waiting " + waitSeconds + " seconds...", name); log.info("{} has been leader {} time(s) before.", name, leaderCount.getAndIncrement()); try { Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { log.info("{} was interrupted.", name); Thread.currentThread().interrupt(); } finally { log.info("{} relinquishing leadership.\n", name); } } }
@Slf4j public class LeaderSelectorExample {
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception {
log.info("Create {} clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur.", CLIENT_QTY); log.info("Notice that leader election is fair: all clients will become leader and will do so the same number of times.");
List<CuratorFramework> clients = Lists.newArrayList(); List<LeaderSelectorParticipant> examples = Lists.newArrayList(); TestingServer server = new TestingServer(); try { for (int i = 0; i < CLIENT_QTY; ++i) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); clients.add(client);
LeaderSelectorParticipant example = new LeaderSelectorParticipant(client, PATH, "Client #" + i); examples.add(example);
client.start(); example.start(); }
log.info("Press enter/return to quit\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { log.info("Shutting down...");
for (LeaderSelectorParticipant leaderSelectorParticipant : examples) { CloseableUtils.closeQuietly(leaderSelectorParticipant); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); }
CloseableUtils.closeQuietly(server); } } }
|