Avatar billede dsj Nybegynder
28. maj 2003 - 10:32 Der er 12 kommentarer og
1 løsning

NIO: Selector og SocketChannel

Jeg har en server der servicerer en række klienter. For hver klient registrerer jeg en SocketChannel for både READ og WRITE i en Selector:

  public void registerClient(Client c, SocketChannel sc) throws ClosedChannelException {
    sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, c);
  }

Jeg lytter på Selector'en med koden:

  int keysAdded = 0;
  while ((keysAdded = selector.select(WAIT_TIME)) > 0) {
    Set readyKeys = selector.selectedKeys();
    Iterator i = readyKeys.iterator();
    while (i.hasNext()) {
      SelectionKey key = (SelectionKey)i.next();
      i.remove();
      if (key.isReadable()) {
        ...
      } else if (key.isWritable()) {
        ...
      }
    }
  }

Når nu en SocketChannel er klar til at blive læst fra er det jo nemt, man læser blot til en ByteBuffer og gør hvad man ellers vil. Mit problem er, når nu en anden tråd i min server skal sende til en given klient. Hvordan gør jeg en SocketChannel "writable", for der skal jo først skrives med SocketChannel.write(ByteBuffer), når Selector'en siger at en SocketChannel er writable, eller hvordan foregår det? Håber problemet er forståeligt.
Avatar billede ahj123 Nybegynder
28. maj 2003 - 13:41 #1
For det første:
Når en SocketChannel kun skal bruges til at læse fra (og der således ikke p.t. er nogen data der skal skrives til SocketChannel'en) så skal den IKKE være registreret med OP_WRITE. Det vil nemlig gøre at dit while loop vil blive ved med at køre og derfor vil resultere i et CPU forbrug på 100%.

Mht. til at sende data i en anden tråd, så kan det ikke lade sig gøre direkte. Det fornuftigste ville nok være at implementere en metode i client klassen som varetager denne funktion ved at sætte data der skal sendes i "kø" (brug f.eks. en LinkedList). Husk at metoden i client klassen skal registrere SocketChannel'en til OP_WRITE. Selector tråden kan så hente data'ene fra client objektet og sende dem. Husk at Selector tråden skal registrere SocketChannel'en til OP_READ only hvis der ikke er flere data, da der ellers spildes en masse CPU cykler.

Og husk at bruge synchronized når din LinkedList med data tilgås af to forskellige tråde!
Avatar billede dsj Nybegynder
28. maj 2003 - 14:49 #2
Det er også underligt, hvis ikke jeg angiver et timeout i select(), blokker den for evigt og registrere intet af at der er blevet tilfæjet nye SocketChannel's, som modtager en masse; en Selector.wakeup() får den heller ikke til at hoppe ud af select().

Hvordan finder Selector'en ud af, at der er noget i klientens data-kø der skal sendes?, eller det skal jeg selv vide ud fra hvad serveren lige har modtaget fra klienten?

Kan du uddybe: "Husk at metoden i client klassen skal registrere SocketChannel'en til OP_WRITE." ?

Vil det sige, at man så at sige ALDRIG skal registrere en SocketChannel med OP_WRITE? Jeg kan godt se at den går i seriøst busy-wait :)

Som du nok kan se er det her forholdsvis nyt for mig, og på nettet findes der langt flere spørgsmål en svar...
Avatar billede dsj Nybegynder
28. maj 2003 - 16:26 #3
Det synes at hjælpe, hvis den samme selector lytter på både den ServerSocketChannel der modtager forbindelsene og alle de modtagede SocketChannel's. I så tilfælde reagerer den fint på modtagede data fra klienten (OP_READ). Burde man ikke kunne køre dem i forskellige Selector's, eller bare køre ServerSocketChannel i normal blocking-mode?
Avatar billede ahj123 Nybegynder
28. maj 2003 - 16:51 #4
Man burde sagtens kunne køre dem i forskellige selectors eller køre ServerSocketChannel i normal blocking mode.

Jeg uddyber lige lidt.
Jeg ville lave en metode i client klassen f.eks. sådan:

sendData(byte[] data) {
  synchronized(linkedList) {
    linkedList.addFirst(data);
  }
  socketChannel.keyFor(selector).interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE));
}

I dit while-loop skal du så have følgende:
if(key.isWritable()) {
  Client c = (Client) key.attachment();
  synchronized(c.linkedList) {
    if(c.linkedList.isEmpty()) {
      key.interestOps(SelectionKey.OP_READ);
    }
    else {
      byte[] data = c.linkedList.removeLast();
      // kode til at sende data her
    }
  }
}

Det er ikke færdig kode. Der skal bl.a. tages højde for at det måske ikke er muligt at sende hele data array'en på en gang.

Håber det hjalp!
Avatar billede dsj Nybegynder
28. maj 2003 - 21:09 #5
Jo, det hjalp - men hvordan tager man så højde for at data array'et ikke kan sendes på én gang? Og er der andre specielle ting man skal tage højde for ?
Avatar billede dsj Nybegynder
28. maj 2003 - 21:23 #6
Nu har jeg lavet en metode som kaldes ved "// kode til at sende data her":

  private void processWrite(SelectionKey key) {
    try {
      Log.echoDebug("ClientWorker.processWrite");
      key.interestOps(SelectionKey.OP_READ);
      SocketChannel sc = (SocketChannel)key.channel();
      Client c = (Client)key.attachment();
      QueueVector sendQueue = c.getSendQueue();
      Message m;
      ByteBuffer buf;
      while (!sendQueue.isEmpty()) {
        m = (Message)sendQueue.remove();
        buf = ByteBuffer.wrap(m.toString().getBytes());
        sc.write(buf);
        System.out.println("Har sendt besked");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

"Har sendt besked" udskrives det korrekte antal gange, men klienten modtager absolut intet. Det er i øvrigt en java-klient der anvender den gammeldags socket-model - kan du se hvorfor der ikke sendes noget?
Avatar billede dsj Nybegynder
28. maj 2003 - 22:37 #7
"En masse masse grimme ord" - nu virker det, tak for hjælpen! Der er godt nok ikke meget hjælp at hente, der er stort set ingen der ved noget om NIO.

Til andre, så er løsningen at lave while-sætningen i metoden processWrite, som følger:

      while (!sendQueue.isEmpty()) {
        m = (Message)sendQueue.remove();
        System.out.println("Sender antal tegn: "+m.toString().length());

        buf.put((m.toString()+"\0").getBytes());
        buf.flip();
        while (buf.hasRemaining()) {
          sc.write(buf);
        }
        buf.clear();
        System.out.println("Har sendt besked");
      }

Utroligt, ingen steder kunne jeg finde, at bufferen skulle flippes - flip() - før den bliver sendt...
Avatar billede arne_v Ekspert
28. maj 2003 - 22:40 #8
Avatar billede dsj Nybegynder
28. maj 2003 - 23:05 #9
Ja der bliver flippet en del, men ikke når SocketChannel.write() anvendes. I artiklen skriver de kun CharBuffer's ikke ByteBuffer's, det er muligvis derfor, men selv i Sun's egne eksempler: http://java.sun.com/j2se/1.4.1/docs/guide/nio/example/index.html flipper de ikke ved write.

Hvis ikke der flippes starter den åbenbart i den forkerte ende af bufferen og sender alle de tomme felter (som 0'er) og stopper når den kommer til det der egentlig skulle sendes. Nu sender jeg fra win2k til win2k, så der skulle vel ikke være problemer med at der er forskel på hvilken vej den læser fra, eller hvad ved jeg - det er nok det fra faget Teknologi jeg synes er længst ude i skoven :)
Avatar billede ahj123 Nybegynder
29. maj 2003 - 02:36 #10
OK, nu har jeg godt nok fået et par øl, men jeg håber det jeg skriver giver mening alligevel!

Din kode er ikke OK. Der er ingen som helst garanti for at hele bufferens indhold sendes når du kalder sc.write(buf) - det kan være det hele, eller det kan være ingenting, eller det kan være noget af bufferen, det er det der er det vanskelige ved non-blocking IO. SOM REGEL vil alle bytes i bufferen blive sendt, men der er ingen GARANTI for at det sker. Det er man nødt til at tage højde for. Jeg skriver et kode-eksempel i morgen, når jeg er ædru!
Avatar billede ahj123 Nybegynder
29. maj 2003 - 11:07 #11
OK, jeg har fundet noget kode jeg selv har skrevet, som du måske kan bruge som inspiration! Her der det godt nok String's der sendes, men princippet er vist det samme. Min 'Connection' klasse svarer vist nogenlunde til din 'Client' klasse.

public class Connection {
  public static final int BUFFER_SIZE = 256;
  private SocketChannel channel;
  private ByteBuffer receiveBuf, sendBuf;
  private LinkedList pendingStrings;
  private String sendingString;
  private StringBuffer receivingString;
  private int sendingStringPos;

  public synchronized void send(String s) {
    pendingStrings.addFirst(s+"\n");
    SelectionKey selKey = channel.keyFor(channelManager.getSelector());
    if(selKey.interestOps() == SelectionKey.OP_READ) {
      selKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
  }

  public void doRead() {
    try {
      int bytesRead = channel.read(receiveBuf);
      if (bytesRead == -1) {
        doClose();
        return;
      }
      receiveBuf.flip();
      if(receivingString != null) {
        while(receiveBuf.remaining() >= 2) {
          char receivedChar = receiveBuf.getChar();
          if(receivedChar == '\n') {
            connectionHandler.handleMessage(this, receivingString.toString());
            receivingString = null;
            break;
          }
          else {
            receivingString.append(receivedChar);
          }
        }
      }
      while(receivingString == null) {
        receivingString = new StringBuffer();
        while(receiveBuf.remaining() >= 2) {
          char receivedChar = receiveBuf.getChar();
          if(receivedChar == '\n') {
            // Kode til at gøre noget med den modtagede string her
            receivingString = null;
            break;
          }
          else {
            receivingString.append(receivedChar);
          }
        }
      }
      if(receivingString != null) {
      }
      receiveBuf.compact();
    }
    catch( IOException e ) {
      doClose();
    }
  }

  public synchronized void doWrite() {
    if(sendingString != null) {
      while(sendingStringPos != sendingString.length()) {
        if(sendBuf.remaining() >= 2) {
          sendBuf.putChar(sendingString.charAt(sendingStringPos++));
        }
      }
      if(sendingStringPos == sendingString.length()) {
        sendingString = null;
      }
    }
    while(!pendingStrings.isEmpty() && sendingString == null) {
      sendingString = (String) pendingStrings.removeLast();
      sendingStringPos = 0;
      while(sendingStringPos != sendingString.length() &&
            sendBuf.remaining() >= 2 )
      {
        sendBuf.putChar(sendingString.charAt(sendingStringPos++));
      }
      if(sendingStringPos == sendingString.length()) {
        sendingString = null;
      }
    }
    try {
      sendBuf.flip();
      channel.write(sendBuf);
      sendBuf.compact();
      if(pendingStrings.isEmpty() && sendingString == null) {
        SelectionKey selKey = channel.keyFor(channelManager.getSelector());
        selKey.interestOps(SelectionKey.OP_READ);
      }
    }
    catch( IOException e ) {
      doClose();
    }
  }
}
Avatar billede dsj Nybegynder
29. maj 2003 - 13:09 #12
Sørger denne her virkelig ikke for at alt bliver sendt?

        while (buf.hasRemaining()) {
          sc.write(buf);
        }
Avatar billede ahj123 Nybegynder
29. maj 2003 - 13:28 #13
Læs dokumentationen omkring SocketChannel.write():

http://java.sun.com/j2se/1.4.1/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer)
Avatar billede Ny bruger Nybegynder

Din løsning...

Tilladte BB-code-tags: [b]fed[/b] [i]kursiv[/i] [u]understreget[/u] Web- og emailadresser omdannes automatisk til links. Der sættes "nofollow" på alle links.

Loading billede Opret Preview
Kategori
Kurser inden for grundlæggende programmering

Log ind eller opret profil

Hov!

For at kunne deltage på Computerworld Eksperten skal du være logget ind.

Det er heldigvis nemt at oprette en bruger: Det tager to minutter og du kan vælge at bruge enten e-mail, Facebook eller Google som login.

Du kan også logge ind via nedenstående tjenester