Example: Use asynchronous I/O

An application creates an I/O completion port using the QsoCreateIOCompletionPort() API.

This API returns a handle that can be used to schedule and wait for completion of asynchronous I/O requests. The application starts an input or an output function, specifying an I/O completion port handle. When the I/O is completed, status information and an application-defined handle are posted to the specified I/O completion port. The post to the I/O completion port wakes up exactly one of possibly many threads that are waiting. The application receives the following items:

This application handle can be the socket descriptor identifying the client connection, or a pointer to storage that contains extensive information about the state of the client connection. Since the operation was completed and the application handle was passed, the worker thread determines the next step to complete the client connection. Worker threads that process these completed asynchronous operations can handle many different client requests and are not tied to just one. Because copying to and from user buffers occurs asynchronously to the server processes, wait time for client request diminishes. This can be beneficial on systems where there are multiple processors.


This graphic diagrams the sample programs that use Asynchronous I/O.

Flow of socket events: Asynchronous I/O server

The following sequence of the socket calls provides a description of the graphic. It also describes the relationship between the server and worker examples. Each set of flows contain links to usage notes on specific APIs. If you need more details on the use of a particular API, you can use these links. This flow describes the socket calls in the following sample application. Use this server example with the generic client example.

  1. Master thread creates I/O completion port by calling QsoCreateIOCompletionPort()
  2. Master thread creates pool of worker thread(s) to process any I/O completion port requests with the pthread_create function.
  3. Worker thread(s) call QsoWaitForIOCompletionPort() which waits for client requests to process.
  4. The master thread accepts a client connection and proceeds to issue a QsoStartRecv() which specifies the I/O completion port upon which the worker threads are waiting.
    Note: You can also use accept asynchronously by using the QsoStartAccept().
  5. At some point, a client request arrives asynchronous to the server process. The sockets operating system loads the supplied user buffer and sends the completed QsoStartRecv() request to the specified I/O completion port. One worker thread is awoken and proceeds to process this request.
  6. The worker thread extracts the client socket descriptor from the application-defined handle and proceeds to echo the received data back to the client by performing a QsoStartSend() operation.
  7. If the data can be immediately sent, then QsoStartSend() returns indication of the fact; otherwise, the sockets operating system sends the data as soon as possible and posts indication of the fact to the specified I/O completion port. The worker thread gets indication of data sent and can wait on the I/O completion port for another request or end if instructed to do so. QsoPostIOCompletion() can be used by the master thread to post a worker thread termination event.
  8. Master thread waits for worker thread to finish and then destroys the I/O completion port by calling QsoDestroyIOCompletionPort().
Note: By using the code examples, you agree to the terms of the Code license and disclaimer information.
      #include <stdio.h>
      #include <stdlib.h>
      #include <string.h>
      #include <sys/time.h>
      #include <sys/types.h>
      #include <sys/socket.h>
      #include <netinet/in.h>
      #include <errno.h>
      #include <unistd.h>
      #define _MULTI_THREADED
      #include "pthread.h"
      #include "qsoasync.h"
      #define BufferLength 80
      #define Failure 0
      #define Success 1
      #define SERVPORT 12345

void *workerThread(void *arg);

/********************************************************************/
/*                                                                  */
/* Function Name: main                                              */
/*                                                                  */
/* Descriptive Name:  Master thread will establish a client         */
/*      connection and hand processing responsibility               */
/*      to a worker thread.                                         */
/* Note: Due to the thread attribute of this program, spawn() must  */
/*       be used to invoke.                                         */
/********************************************************************/

int main()
{
       int listen_sd, client_sd, rc;
       int  on = 1, ioCompPort;
       pthread_t thr;
       void *status;
       char buffer[BufferLength];
       struct sockaddr_in serveraddr;
       Qso_OverlappedIO_t ioStruct;


       /*********************************************/
       /* Create an I/O completion port for this    */
       /* process.                                  */
       /*********************************************/
       if ((ioCompPort = QsoCreateIOCompletionPort()) < 0)
       {
         perror("QsoCreateIOCompletionPort() failed");
         exit(-1);
       }

       /*********************************************/
       /* Create a worker thread                    */
       /* to process all client requests. The       */
       /* worker thread will wait for client        */
       /* requests to arrive on the I/O completion  */
       /* port just created.                        */
       /*********************************************/
       rc = pthread_create(&thr, NULL, workerThread,
                          &ioCompPort);
       if (rc < 0)
       {
         perror("pthread_create() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         exit(-1);
       }

       /*********************************************/
       /* Create an AF_INET stream socket to receive*/
       /* incoming connections on                   */
       /*********************************************/
       if ((listen_sd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
       {
         perror("socket() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         exit(-1);
       }

       /*********************************************/
       /* Allow socket descriptor to be reuseable   */
       /*********************************************/
       if ((rc = setsockopt(listen_sd, SOL_SOCKET,
                            SO_REUSEADDR,
                            (char *)&on,
                            sizeof(on))) < 0)
       {
         perror("setsockopt() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         exit(-1);
       }

       /*********************************************/
       /* bind the socket                           */
       /*********************************************/
       memset(&serveraddr, 0x00, sizeof(struct sockaddr_in));
       serveraddr.sin_family        = AF_INET;
       serveraddr.sin_port          = htons(SERVPORT);
       serveraddr.sin_addr.s_addr   = htonl(INADDR_ANY);

       if ((rc = bind(listen_sd,
                     (struct sockaddr *)&serveraddr,
                      sizeof(serveraddr))) < 0)
       {
         perror("bind() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         exit(-1);
       }

       /*********************************************/
       /* Set listen backlog                        */
       /*********************************************/
       if ((rc = listen(listen_sd, 10)) < 0)
       {
         perror("listen() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         exit(-1);
       }

       printf("Waiting for client connection.\n");

       /*********************************************/
       /* accept an incoming client connection.     */
       /*********************************************/
       if ((client_sd = accept(listen_sd, (struct sockaddr *)NULL,
                         NULL)) < 0)
       {
         perror("accept() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         exit(-1);
       }

       /*********************************************/
       /* Issue QsoStartRecv() to receive client    */
       /* request.                                  */
       /* Note:                                     */
       /*    postFlag == on denoting request should */
       /*                posted to the I/O          */
       /*                completion port, even if   */
       /*                if request is immediately  */
       /*                available.  Worker thread  */
       /*                will process client        */
       /*                request.                   */
       /*********************************************/

       /*********************************************/
       /* initialize Qso_OverlappedIO_t structure - */
       /* reserved fields must be hex 00's.         */
       /*********************************************/
       memset(&ioStruct, '\0', sizeof(ioStruct));

       ioStruct.buffer = buffer;
       ioStruct.bufferLength = sizeof(buffer);


       /*********************************************/
       /* Store the client descriptor in the        */
       /* Qso_OverlappedIO_t descriptorHandle field.*/
       /* This area is used to house information    */
       /* defining the state of the client          */
       /* connection. Field descriptorHandle is     */
       /* defined as a (void *) to allow the server */
       /* to address more extensive client          */
       /* connection state if needed.               */
       /*********************************************/
       *((int*)&ioStruct.descriptorHandle) = client_sd;
       ioStruct.postFlag = 1;
       ioStruct.fillBuffer = 0;

       rc = QsoStartRecv(client_sd, ioCompPort, &ioStruct);
       if (rc == -1)
       {
         perror("QsoStartRecv() failed");
         QsoDestroyIOCompletionPort(ioCompPort);
         close(listen_sd);
         close(client_sd);
         exit(-1);
       }
       /*********************************************/
       /* close the server's listening socket.      */
       /*********************************************/
       close(listen_sd);

       /*********************************************/
       /* Wait for worker thread to finish          */
       /* processing client connection.             */
       /*********************************************/
       rc = pthread_join(thr, &status);

       QsoDestroyIOCompletionPort(ioCompPort);
       if ( rc == 0 && (rc = __INT(status)) == Success)
       {
         printf("Success.\n");
         exit(0);
       }
       else
       {
         perror("pthread_join() reported failure");
         exit(-1);
       }
}
/* end workerThread */



/********************************************************************/
/*                                                                  */
/* Function Name: workerThread                                      */
/*                                                                  */
/* Descriptive Name:  Process client connection.                    */
/********************************************************************/
void *workerThread(void *arg)
{
      struct timeval waitTime;
      int ioCompPort, clientfd;
      Qso_OverlappedIO_t ioStruct;
      int rc, tID;
      pthread_t thr;
      pthread_id_np_t t_id;
      t_id = pthread_getthreadid_np();
      tID = t_id.intId.lo;

      /*********************************************/
      /* I/O completion port is passed to this     */
      /* routine.                                  */
      /*********************************************/
      ioCompPort = *(int *)arg;

      /*********************************************/
      /* Wait on the supplied I/O completion port  */
      /* for a client request.                     */
      /*********************************************/
      waitTime.tv_sec = 500;
      waitTime.tv_usec = 0;
      rc = QsoWaitForIOCompletion(ioCompPort, &ioStruct, &waitTime);
      if (rc == 1 && ioStruct.returnValue != -1)
      /*********************************************/
      /* Client request has been received.         */
      /*********************************************/
        ;
      else
      {
        printf("QsoWaitForIOCompletion() or QsoStartRecv() failed.\n");
        perror("QsoWaitForIOCompletion() or QsoStartRecv() failed");
        return __VOID(Failure);
      }

      /*********************************************/
      /* Obtain the socket descriptor associated   */
      /* with the client connection.               */
      /*********************************************/
      clientfd = *((int *) &ioStruct.descriptorHandle);

      /*********************************************/
      /* Echo the data back to the client.         */
      /* Note: postFlag == 0. If write completes   */
      /* immediate then indication will be         */
      /* returned, otherwise once the              */
      /* write is performed the I/O Completion     */
      /* port will be posted.                      */
      /*********************************************/
      ioStruct.postFlag = 0;
      ioStruct.bufferLength = ioStruct.returnValue;
      rc = QsoStartSend(clientfd, ioCompPort, &ioStruct);

      if (rc == 0)
      /*********************************************/
      /* Operation complete  - data has been sent. */
      /*********************************************/
        ;
      else
      {
      /*********************************************/
      /* Two possibilities                         */
      /*    rc == -1                               */
      /*      Error on function call               */
      /*    rc ==  1                               */
      /*      Write cannot be immediately       */
      /*      performed. Once complete, the I/O    */
      /*      completion port will be posted.      */
      /*********************************************/

        if (rc == -1)
        {
          printf("QsoStartSend() failed.\n");
          perror("QsoStartSend() failed");
          close(clientfd);
          return __VOID(Failure);
        }
       /*********************************************/
       /* Wait for operation to complete.           */
       /*********************************************/
        rc = QsoWaitForIOCompletion(ioCompPort, &ioStruct, &waitTime);
        if (rc == 1 && ioStruct.returnValue != -1)
       /*********************************************/
       /* Send successful.                           */
       /*********************************************/
          ;
        else
        {
          printf("QsoWaitForIOCompletion() or QsoStartSend() failed.\n");
          perror("QsoWaitForIOCompletion() or QsoStartSend() failed");
          return __VOID(Failure);
        }
      }
      close(clientfd);
      return __VOID(Success);
} /* end workerThread */
Related concepts
Asynchronous I/O
Related reference
Socket application design recommendations
Examples: Connection-oriented designs
Example: Generic client
Example: Use signals with blocking socket APIs
Related information
QsoCreateIOCompletionPort()
pthread_create
QsoWaitForIoCompletionPort()
QsoStartRecv()
QsoStartAccept()
QsoStartSend()
QsoDestroyIOCompletionPort()