OpenJDK / jdk / jdk
changeset 36434:3fd8dee1b158
8151299: Http client SelectorManager overwriting read and write events
Reviewed-by: chegar
author | michaelm |
---|---|
date | Wed, 09 Mar 2016 13:37:30 +0000 |
parents | dcbc230cfa4f |
children | 0408881ad616 |
files | jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java jdk/test/java/net/httpclient/whitebox/TEST.properties jdk/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java |
diffstat | 3 files changed, 300 insertions(+), 41 deletions(-) [+] |
line wrap: on
line diff
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Wed Mar 09 12:11:31 2016 +0000 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Wed Mar 09 13:37:30 2016 +0000 @@ -30,19 +30,17 @@ import java.net.URI; import static java.net.http.Utils.BUFSIZE; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import static java.nio.channels.SelectionKey.OP_CONNECT; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; import java.nio.channels.Selector; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.stream.Stream; import java.util.concurrent.ExecutorService; import java.security.NoSuchAlgorithmException; -import java.util.ListIterator; -import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import javax.net.ssl.SSLContext; @@ -72,12 +70,6 @@ private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); private final LinkedList<TimeoutEvent> timeouts; - //@Override - void debugPrint() { - selmgr.debugPrint(); - client2.debugPrint(); - } - public static HttpClientImpl create(HttpClientBuilderImpl builder) { HttpClientImpl impl = new HttpClientImpl(builder); impl.start(); @@ -173,19 +165,15 @@ // Main loop for this client's selector class SelectorManager extends Thread { - final Selector selector; boolean closed; final List<AsyncEvent> readyList; final List<AsyncEvent> registrations; - List<AsyncEvent> debugList; - SelectorManager() throws IOException { readyList = new LinkedList<>(); registrations = new LinkedList<>(); - debugList = new LinkedList<>(); selector = Selector.open(); } @@ -216,13 +204,6 @@ return c; } - synchronized void debugPrint() { - System.err.println("Selecting on:"); - for (AsyncEvent e : debugList) { - System.err.println(opvals(e.interestOps())); - } - } - String opvals(int i) { StringBuilder sb = new StringBuilder(); if ((i & OP_READ) != 0) @@ -239,14 +220,18 @@ try { while (true) { synchronized (this) { - debugList = copy(registrations); for (AsyncEvent exchange : registrations) { SelectableChannel c = exchange.channel(); try { c.configureBlocking(false); - c.register(selector, - exchange.interestOps(), - exchange); + SelectionKey key = c.keyFor(selector); + SelectorAttachment sa; + if (key == null) { + sa = new SelectorAttachment(c, selector); + } else { + sa = (SelectorAttachment)key.attachment(); + } + sa.register(exchange); } catch (IOException e) { Log.logError("HttpClientImpl: " + e); c.close(); @@ -266,11 +251,10 @@ Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { - if (key.isReadable() || key.isConnectable() || key.isWritable()) { - key.cancel(); - AsyncEvent exchange = (AsyncEvent) key.attachment(); - readyList.add(exchange); - } + SelectorAttachment sa = (SelectorAttachment)key.attachment(); + int eventsOccurred = key.readyOps(); + sa.events(eventsOccurred).forEach(readyList::add); + sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); @@ -306,6 +290,80 @@ } /** + * Tracks multiple user level registrations associated with one NIO + * registration (SelectionKey). In this implementation, registrations + * are one-off and when an event is posted the registration is cancelled + * until explicitly registered again. + * + * <p> No external synchronization required as this class is only used + * by the SelectorManager thread. One of these objects required per + * connection. + */ + private static class SelectorAttachment { + private final SelectableChannel chan; + private final Selector selector; + private final ArrayList<AsyncEvent> pending; + private int interestops; + + SelectorAttachment(SelectableChannel chan, Selector selector) { + this.pending = new ArrayList<>(); + this.chan = chan; + this.selector = selector; + } + + void register(AsyncEvent e) throws ClosedChannelException { + int newops = e.interestOps(); + boolean reRegister = (interestops & newops) != newops; + interestops |= newops; + pending.add(e); + if (reRegister) { + // first time registration happens here also + chan.register(selector, interestops, this); + } + } + + int interestOps() { + return interestops; + } + + /** + * Returns a Stream<AsyncEvents> containing only events that are + * registered with the given {@code interestop}. + */ + Stream<AsyncEvent> events(int interestop) { + return pending.stream() + .filter(ev -> (ev.interestOps() & interestop) != 0); + } + + /** + * Removes any events with the given {@code interestop}, and if no + * events remaining, cancels the associated SelectionKey. + */ + void resetInterestOps(int interestop) { + int newops = 0; + + Iterator<AsyncEvent> itr = pending.iterator(); + while (itr.hasNext()) { + AsyncEvent event = itr.next(); + int evops = event.interestOps(); + if ((evops & interestop) != 0) { + itr.remove(); + } else { + newops |= evops; + } + } + + interestops = newops; + SelectionKey key = chan.keyFor(selector); + if (newops == 0) { + key.cancel(); + } else { + key.interestOps(newops); + } + } + } + + /** * Creates a HttpRequest associated with this group. * * @throws IllegalStateException if the group has been stopped @@ -425,18 +483,9 @@ } } iter.add(event); - //debugPrintList("register"); selmgr.wakeupSelector(); } - void debugPrintList(String s) { - System.err.printf("%s: {", s); - for (TimeoutEvent e : timeouts) { - System.err.printf("(%d,%d) ", e.delta, e.timeval); - } - System.err.println("}"); - } - synchronized void signalTimeouts(long then) { if (timeouts.isEmpty()) { return; @@ -462,7 +511,6 @@ break; } } - //debugPrintList("signalTimeouts"); } synchronized void cancelTimer(TimeoutEvent event) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/net/httpclient/whitebox/TEST.properties Wed Mar 09 13:37:30 2016 +0000 @@ -0,0 +1,3 @@ +TestNG.dirs = . + +bootclasspath.dirs = /java/net/httpclient
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java Wed Mar 09 13:37:30 2016 +0000 @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @bug 8151299 + * @summary Http client SelectorManager overwriting read and write events + */ +package java.net.http; + +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import static java.lang.System.out; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static java.util.concurrent.TimeUnit.SECONDS; + +import org.testng.annotations.Test; + +/** + * Whitebox test of selector mechanics. Currently only a simple test + * setting one read and one write event is done. It checks that the + * write event occurs first, followed by the read event and then no + * further events occur despite the conditions actually still existing. + */ +@Test +public class SelectorTest { + + AtomicInteger counter = new AtomicInteger(); + volatile boolean error; + static final CountDownLatch finishingGate = new CountDownLatch(1); + + String readSomeBytes(RawChannel chan) { + try { + ByteBuffer buf = ByteBuffer.allocate(1024); + int t = chan.read(buf); + if (t <= 0) { + out.printf("chan read returned %d\n", t); + return null; + } + byte[] bb = new byte[t]; + buf.get(bb); + return new String(bb, US_ASCII); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Test(timeOut = 10000) + public void test() throws Exception { + + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + + out.println("Listening on port " + server.getLocalPort()); + + TestServer t = new TestServer(server); + t.start(); + out.println("Started server thread"); + + final RawChannel chan = getARawChannel(port); + + chan.registerEvent(new RawChannel.NonBlockingEvent() { + @Override + public int interestOps() { + return SelectionKey.OP_READ; + } + + @Override + public void handle() { + readSomeBytes(chan); + out.printf("OP_READ\n"); + if (counter.get() != 1) { + out.printf("OP_READ error counter = %d\n", counter); + error = true; + } + } + }); + + chan.registerEvent(new RawChannel.NonBlockingEvent() { + @Override + public int interestOps() { + return SelectionKey.OP_WRITE; + } + + @Override + public void handle() { + out.printf("OP_WRITE\n"); + if (counter.get() != 0) { + out.printf("OP_WRITE error counter = %d\n", counter); + error = true; + } else { + ByteBuffer bb = ByteBuffer.wrap(TestServer.INPUT); + counter.incrementAndGet(); + try { + chan.write(bb); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + }); + out.println("Events registered. Waiting"); + finishingGate.await(30, SECONDS); + if (error) + throw new RuntimeException("Error"); + else + out.println("No error"); + } + } + + static RawChannel getARawChannel(int port) throws Exception { + URI uri = URI.create("http://127.0.0.1:" + port + "/"); + out.println("client connecting to " + uri.toString()); + HttpRequest req = HttpRequest.create(uri).GET(); + HttpResponse r = req.response(); + r.body(HttpResponse.ignoreBody()); + return ((HttpResponseImpl) r).rawChannel(); + } + + static class TestServer extends Thread { + static final byte[] INPUT = "Hello world".getBytes(US_ASCII); + static final byte[] OUTPUT = "Goodbye world".getBytes(US_ASCII); + static final String FIRST_RESPONSE = "HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n"; + final ServerSocket server; + + TestServer(ServerSocket server) throws IOException { + this.server = server; + } + + public void run() { + try (Socket s = server.accept(); + InputStream is = s.getInputStream(); + OutputStream os = s.getOutputStream()) { + + out.println("Got connection"); + readRequest(is); + os.write(FIRST_RESPONSE.getBytes()); + read(is); + write(os); + Thread.sleep(1000); + // send some more data, and make sure WRITE op does not get called + write(os); + out.println("TestServer exiting"); + SelectorTest.finishingGate.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // consumes the HTTP request + static void readRequest(InputStream is) throws IOException { + out.println("starting readRequest"); + byte[] buf = new byte[1024]; + String s = ""; + while (true) { + int n = is.read(buf); + if (n <= 0) + throw new IOException("Error"); + s = s + new String(buf, 0, n); + if (s.indexOf("\r\n\r\n") != -1) + break; + } + out.println("returning from readRequest"); + } + + static void read(InputStream is) throws IOException { + out.println("starting read"); + for (int i = 0; i < INPUT.length; i++) { + int c = is.read(); + if (c == -1) + throw new IOException("closed"); + if (INPUT[i] != (byte) c) + throw new IOException("Error. Expected:" + INPUT[i] + ", got:" + c); + } + out.println("returning from read"); + } + + static void write(OutputStream os) throws IOException { + out.println("doing write"); + os.write(OUTPUT); + } + } +}