PAPI  5.0.1.0
libasync.c
Go to the documentation of this file.
00001 
00002 
00003 /* 
00004  * Library for Posix async read operations with hints.
00005  * Author: Don Capps
00006  * Company: Iozone
00007  * Date: 4/24/1998
00008  *
00009  * Two models are supported.  First model is a replacement for read() where the async
00010  * operations are performed and the requested data is bcopy()-ed back into the users 
00011  * buffer. The second model is a new version of read() where the caller does not 
00012  * supply the address of the buffer but instead is returned an address to the
00013  * location of the data. The second model eliminates a bcopy from the path.
00014  *
00015  * To use model #1:
00016  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
00017  *  The fd is the file descriptor for the async operations.
00018  *  The direct_flag sets VX_DIRECT 
00019  *
00020  * 2. Call async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
00021  *      Where:
00022  *  gc ............ is the pointer on the stack
00023  *  fd ............ is the file descriptor
00024  *  ubuffer ....... is the address of the user buffer.
00025  *  offset ........ is the offset in the file to begin reading
00026  *  size .......... is the size of the transfer.
00027  *  stride ........ is the distance, in size units, to space the async reads.
00028  *  max ........... is the max size of the file to be read.
00029  *  depth ......... is the number of async operations to perform.
00030  *
00031  * 3. Call end_async(gc) when finished.
00032  *  Where:
00033  *  gc ............ is the pointer on the stack.
00034  *
00035  * To use model #2:
00036  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
00037  *  The fd is the file descriptor for the async operations.
00038  *  The direct_flag sets VX_DIRECT 
00039  * 2. Call async_read(gc, fd, &ubuffer, offset, size, stride, max, depth)
00040  *      Where:
00041  *  gc ............ is the pointer on the stack
00042  *  fd ............ is the file descriptor
00043  *  ubuffer ....... is the address of a pointer that will be filled in 
00044  *                      by the async library.
00045  *  offset ........ is the offset in the file to begin reading
00046  *  size .......... is the size of the transfer.
00047  *  stride ........ is the distance, in size units, to space the async reads.
00048  *  max ........... is the max size of the file to be read.
00049  *  depth ......... is the number of async operations to perform.
00050  *
00051  * 3. Call async_release(gc) when finished with the data that was returned.
00052  *    This allows the async library to reuse the memory that was filled in
00053  *    and returned to the user.
00054  *
00055  * 4. Call end_async(gc) when finished.
00056  *  Where:
00057  *  gc ............ is the pointer on the stack.
00058  *
00059  * To use model #1: (WRITES)
00060  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
00061  *  The fd is the file descriptor for the async operations.
00062  *
00063  * 2. Call async_write(gc, fd, ubuffer, size, offset, depth)
00064  *      Where:
00065  *  gc ............ is the pointer on the stack
00066  *  fd ............ is the file descriptor
00067  *  ubuffer ....... is the address of the user buffer.
00068  *  size .......... is the size of the transfer.
00069  *  offset ........ is the offset in the file to begin reading
00070  *  depth ......... is the number of async operations to perform.
00071  *
00072  * 4. Call end_async(gc) when finished.
00073  *  Where:
00074  *  gc ............ is the pointer on the stack.
00075  *
00076  * Notes:
00077  *  The intended use is to replace calls to read() with calls to
00078  *  async_read() and allow the user to make suggestions on 
00079  *  what kind of async read-ahead would be nice to have.
00080  *  The first transfer requested is guarenteed to be complete
00081  *  before returning to the caller. The async operations will
00082  *  be started and will also be guarenteed to have completed
00083  *  if the next call specifies its first request to be one
00084  *  that was previously performed with an async operation.
00085  *  
00086  *  The async_read_no_copy() function allows the async operations
00087  *  to return the data to the user and not have to perform 
00088  *  a bcopy of the data back into the user specified buffer 
00089  *  location. This model is faster but assumes that the user
00090  *  application has been modified to work with this model.
00091  *
00092  *  The async_write() is intended to enhance the performance of 
00093  *  initial writes to a file. This is the slowest case in the write
00094  *  path as it must perform meta-data allocations and wait.
00095  */
00096 
00097 #include <sys/types.h>
00098 #include <aio.h>
00099 #if defined(solaris) || defined(linux) || defined(SCO_Unixware_gcc)
00100 #else
00101 #include <sys/timers.h>
00102 #endif
00103 #include <sys/errno.h>
00104 #include <unistd.h>
00105 #ifndef bsd4_4
00106 #include <malloc.h>
00107 #endif
00108 #ifdef VXFS
00109 #include <sys/fs/vx_ioctl.h>
00110 #endif
00111 
00112 #if defined(OSFV5) || defined(linux)
00113 #include <string.h>
00114 #endif
00115 
00116 #if defined(linux)
00117 #include <unistd.h>
00118 #include <stdio.h>
00119 #include <stdlib.h>
00120 #endif
00121 
00122 #if (defined(solaris) && defined(__LP64__)) || defined(__s390x__) || defined(FreeBSD)
00123 /* If we are building for 64-bit Solaris, all functions that return pointers
00124  * must be declared before they are used; otherwise the compiler will assume
00125  * that they return ints and the top 32 bits of the pointer will be lost,
00126  * causing segmentation faults.  The following includes take care of this.
00127  * It should be safe to add these for all other OSs too, but we're only
00128  * doing it for Solaris now in case another OS turns out to be a special case.
00129  */
00130 #include <stdio.h>
00131 #include <stdlib.h>
00132 #include <strings.h> /* For the BSD string functions */
00133 #endif
00134 
00135 void mbcopy(char *source, char *dest, size_t len);
00136 
00137 
00138 #if !defined(solaris) && !defined(off64_t) && !defined(_OFF64_T) && !defined(__off64_t_defined) && !defined(SCO_Unixware_gcc)
00139 typedef long long off64_t;
00140 #endif
00141 #if defined(OSFV5)
00142 #include <string.h>
00143 #endif
00144 
00145 
00146 extern long long page_size;
00147 extern int one;
00148 /*
00149  * Internal cache entrys. Each entry on the global
00150  * cache, pointed to by async_init(gc) will be of
00151  * this structure type.
00152  */
00153 char version[] = "Libasync Version $Revision$";
00154 struct cache_ent {
00155     struct aiocb myaiocb;           /* For use in small file mode */
00156 #ifdef _LARGEFILE64_SOURCE 
00157 #if defined(__CrayX1__)
00158     aiocb64_t myaiocb64;        /* For use in large file mode */
00159 #else
00160     struct aiocb64 myaiocb64;       /* For use in large file mode */
00161 #endif 
00162 #endif 
00163     long long fd;               /* File descriptor */
00164     long long size;             /* Size of the transfer */
00165     struct cache_ent *forward;      /* link to next element on cache list */
00166     struct cache_ent *back;         /* link to previous element on the cache list */
00167     long long direct;           /* flag to indicate if the buffer should be */
00168                         /* de-allocated by library */
00169     char *real_address;         /* Real address to free */
00170     
00171     volatile void *oldbuf;          /* Used for firewall to prevent in flight */
00172                         /* accidents */
00173     int oldfd;              /* Used for firewall to prevent in flight */
00174                         /* accidents */
00175     size_t oldsize;             /* Used for firewall to prevent in flight */
00176                         /* accidents */
00177 };
00178 
00179 /*
00180  * Head of the cache list
00181  */
00182 struct cache {
00183     struct cache_ent *head;     /* Head of cache list */
00184     struct cache_ent *tail;     /* tail of cache list */
00185     struct cache_ent *inuse_head;   /* head of in-use list */
00186     long long count;        /* How many elements on the cache list */
00187     struct cache_ent *w_head;       /* Head of cache list */
00188     struct cache_ent *w_tail;       /* tail of cache list */
00189     long long w_count;      /* How many elements on the write list */
00190     };
00191 
00192 long long max_depth;
00193 extern int errno;
00194 struct cache_ent *alloc_cache();
00195 struct cache_ent *incache();
00196 void async_init();
00197 void end_async();
00198 int async_suspend();
00199 int async_read();
00200 void takeoff_cache();
00201 void del_cache();
00202 void async_release();
00203 void putoninuse();
00204 void takeoffinuse();
00205 struct cache_ent *allocate_write_buffer();
00206 size_t async_write();
00207 void async_wait_for_write();
00208 void async_put_on_write_queue();
00209 void async_write_finish();
00210 
00211 /* On Solaris _LP64 will be defined by <sys/types.h> if we're compiling
00212  * as a 64-bit binary.  Make sure that __LP64__ gets defined in this case,
00213  * too -- it should be defined on the compiler command line, but let's
00214  * not rely on this.
00215  */
00216 #if defined(_LP64)
00217 #if !defined(__LP64__)
00218 #define __LP64__
00219 #endif
00220 #endif
00221 
00222 
00223 /***********************************************/
00224 /* Initialization routine to setup the library */
00225 /***********************************************/
00226 void
00227 async_init(gc,fd,flag)
00228 struct cache **gc;
00229 int fd;
00230 int flag;
00231 {
00232 #ifdef VXFS
00233     if(flag)
00234         ioctl(fd,VX_SETCACHE,VX_DIRECT);
00235 #endif
00236     if(*gc)
00237     {
00238         printf("Warning calling async_init two times ?\n");
00239         return;
00240     }
00241     *gc=(struct cache *)malloc((size_t)sizeof(struct cache));
00242     if(*gc == 0)
00243     {
00244         printf("Malloc failed\n");
00245         exit(174);
00246     }
00247     bzero(*gc,sizeof(struct cache));
00248 #if defined(__AIX__) || defined(SCO_Unixware_gcc)
00249     max_depth=500;
00250 #else
00251     max_depth=sysconf(_SC_AIO_MAX);
00252 #endif
00253 }
00254 
00255 /***********************************************/
00256 /* Tear down routine to shutdown the library   */
00257 /***********************************************/
00258 void
00259 end_async(gc)
00260 struct cache *gc;
00261 {
00262     del_cache(gc);
00263     async_write_finish(gc);
00264     free((void *)gc);
00265 }
00266 
00267 /***********************************************/
00268 /* Wait for a request to finish                */
00269 /***********************************************/
00270 int
00271 async_suspend(struct cache_ent *ce)
00272 {
00273 #ifdef _LARGEFILE64_SOURCE 
00274 #ifdef __LP64__
00275     const struct aiocb * const cblist[1] = {&ce->myaiocb};
00276 #else
00277     const struct aiocb64 * const cblist[1] = {&ce->myaiocb64};
00278 #endif
00279 #else
00280     const struct aiocb * const cblist[1] = {&ce->myaiocb};
00281 #endif
00282 
00283 #ifdef _LARGEFILE64_SOURCE 
00284 #ifdef __LP64__
00285     return aio_suspend(cblist, 1, NULL);
00286 #else
00287     return aio_suspend64(cblist, 1, NULL);
00288 #endif
00289 #else
00290     return aio_suspend(cblist, 1, NULL);
00291 #endif
00292 }
00293 
00294 /*************************************************************************
00295  * This routine is a generic async reader assist funtion. It takes
00296  * the same calling parameters as read() but also extends the
00297  * interface to include:
00298  * stride ..... For the async reads, what is the distance, in size units, 
00299  *      to space the reads. Note: Stride of 0 indicates that
00300  *      you do not want any read-ahead.
00301  * max    ..... What is the maximum file offset for this operation.
00302  * depth  ..... How much read-ahead do you want.
00303  * 
00304  * The calls to this will guarentee to complete the read() operation
00305  * before returning to the caller. The completion may occur in two
00306  * ways. First the operation may be completed by calling aio_read()
00307  * and then waiting for it to complete. Second  the operation may be 
00308  * completed by copying the data from a cache of previously completed 
00309  * async operations. 
00310  * In the event the read to be satisfied is not in the cache then a 
00311  * series of async operations will be scheduled and then the first 
00312  * async read will be completed. In the event that the read() can be 
00313  * satisfied from the cache then the data is copied back to the 
00314  * user buffer and a series of async reads will be initiated.  If a 
00315  * read is issued and the cache contains data and the read can not 
00316  * be satisfied from the cache, then the cache is discarded, and 
00317  * a new cache is constructed.
00318  * Note: All operations are aio_read(). The series will be issued
00319  * as asyncs in the order requested. After all are in flight
00320  * then the code will wait for the manditory first read.
00321  *************************************************************************/
00322 
00323 int 
00324 async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
00325 struct cache *gc;
00326 long long fd;
00327 char *ubuffer;
00328 off64_t offset;
00329 long long size;
00330 long long stride;
00331 off64_t max;
00332 long long depth;
00333 {
00334     off64_t a_offset,r_offset;
00335     long long a_size;
00336     struct cache_ent *ce,*first_ce=0;
00337     long long i;
00338     ssize_t retval=0;
00339     ssize_t ret;
00340     long long start = 0;
00341     long long del_read=0;
00342 
00343     a_offset=offset;
00344     a_size = size;
00345     /*
00346      * Check to see if it can be completed from the cache
00347      */
00348     if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
00349     {
00350 #ifdef _LARGEFILE64_SOURCE 
00351 #ifdef __LP64__
00352         while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
00353         {
00354             async_suspend(ce);
00355         }
00356 #else
00357         while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS)
00358         {
00359             async_suspend(ce);
00360         }
00361 #endif
00362 #else
00363         while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
00364         {
00365             async_suspend(ce);
00366         }
00367 #endif
00368         if(ret)
00369         {
00370             printf("aio_error 1: ret %d %d\n",ret,errno);
00371         }
00372 #ifdef _LARGEFILE64_SOURCE 
00373 #ifdef __LP64__
00374         retval=aio_return(&ce->myaiocb);
00375 #else
00376 #if defined(__CrayX1__)
00377         retval=aio_return64((aiocb64_t *)&ce->myaiocb64);
00378 #else
00379         retval=aio_return64((struct aiocb64 *)&ce->myaiocb64);
00380 #endif
00381 
00382 #endif
00383 #else
00384         retval=aio_return(&ce->myaiocb);
00385 #endif
00386         if(retval > 0)
00387         {
00388 #ifdef _LARGEFILE64_SOURCE 
00389 #ifdef __LP64__
00390             mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
00391 #else
00392             mbcopy((char *)ce->myaiocb64.aio_buf,(char *)ubuffer,(size_t)retval);
00393 #endif
00394 #else
00395             mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
00396 #endif
00397         }
00398 #ifdef _LARGEFILE64_SOURCE 
00399 #ifdef __LP64__
00400         if(retval < ce->myaiocb.aio_nbytes)
00401 #else
00402         if(retval < ce->myaiocb64.aio_nbytes)
00403 #endif
00404 #else
00405         if(retval < ce->myaiocb.aio_nbytes)
00406 #endif
00407         {
00408             printf("aio_return error1: ret %d %d\n",retval,errno);
00409 #ifdef _LARGEFILE64_SOURCE 
00410 #ifdef __LP64__
00411             printf("aio_return error1: fd %d offset %ld buffer %lx size %d Opcode %d\n",
00412                 ce->myaiocb.aio_fildes,
00413                 ce->myaiocb.aio_offset,
00414                 (long)(ce->myaiocb.aio_buf),
00415                 ce->myaiocb.aio_nbytes,
00416                 ce->myaiocb.aio_lio_opcode
00417 #else
00418             printf("aio_return error1: fd %d offset %lld buffer %lx size %d Opcode %d\n",
00419                 ce->myaiocb64.aio_fildes,
00420                 ce->myaiocb64.aio_offset,
00421                 (long)(ce->myaiocb64.aio_buf),
00422                 ce->myaiocb64.aio_nbytes,
00423                 ce->myaiocb64.aio_lio_opcode
00424 #endif
00425 #else
00426             printf("aio_return error1: fd %d offset %d buffer %lx size %d Opcode %d\n",
00427                 ce->myaiocb.aio_fildes,
00428                 ce->myaiocb.aio_offset,
00429                 (long)(ce->myaiocb.aio_buf),
00430                 ce->myaiocb.aio_nbytes,
00431                 ce->myaiocb.aio_lio_opcode
00432 #endif
00433                 );
00434         }
00435         ce->direct=0;
00436         takeoff_cache(gc,ce);
00437     }else
00438     {
00439         /*
00440          * Clear the cache and issue the first request async()
00441          */
00442         del_cache(gc);
00443         del_read++;
00444         first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ);
00445 again:
00446 #ifdef _LARGEFILE64_SOURCE 
00447 #ifdef __LP64__
00448         ret=aio_read(&first_ce->myaiocb);
00449 #else
00450         ret=aio_read64(&first_ce->myaiocb64);
00451 #endif
00452 #else
00453         ret=aio_read(&first_ce->myaiocb);
00454 #endif
00455         if(ret!=0)
00456         {
00457             if(errno==EAGAIN)
00458                 goto again;
00459             else
00460                 printf("error returned from aio_read(). Ret %d errno %d\n",ret,errno);
00461         }
00462     }
00463     if(stride==0)    /* User does not want read-ahead */
00464         goto out;
00465     if(a_offset<0)  /* Before beginning of file */
00466         goto out;
00467     if(a_offset+size>max)   /* After end of file */
00468         goto out;
00469     if(depth >=(max_depth-1))
00470         depth=max_depth-1;
00471     if(depth==0)
00472         goto out;
00473     if(gc->count > 1)
00474         start=depth-1;
00475     for(i=start;i<depth;i++)    /* Issue read-aheads for the depth specified */
00476     {
00477         r_offset=a_offset+((i+1)*(stride*a_size));
00478         if(r_offset<0)
00479             continue;
00480         if(r_offset+size > max)
00481             continue;
00482         if((ce=incache(gc,fd,r_offset,a_size)))
00483             continue;
00484         ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
00485 #ifdef _LARGEFILE64_SOURCE 
00486 #ifdef __LP64__
00487         ret=aio_read(&ce->myaiocb);
00488 #else
00489         ret=aio_read64(&ce->myaiocb64);
00490 #endif
00491 #else
00492         ret=aio_read(&ce->myaiocb);
00493 #endif
00494         if(ret!=0)
00495         {
00496             takeoff_cache(gc,ce);
00497             break;
00498         }
00499     }           
00500 out:
00501     if(del_read)    /* Wait for the first read to complete */
00502     {
00503 #ifdef _LARGEFILE64_SOURCE 
00504 #ifdef __LP64__
00505         while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
00506         {
00507             async_suspend(first_ce);
00508         }
00509 #else
00510         while((ret=aio_error64(&first_ce->myaiocb64))== EINPROGRESS)
00511         {
00512             async_suspend(first_ce);
00513         }
00514 #endif
00515 #else
00516         while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
00517         {
00518             async_suspend(first_ce);
00519         }
00520 #endif
00521         if(ret)
00522             printf("aio_error 2: ret %d %d\n",ret,errno);
00523 #ifdef _LARGEFILE64_SOURCE 
00524 #ifdef __LP64__
00525         retval=aio_return(&first_ce->myaiocb);
00526 #else
00527         retval=aio_return64(&first_ce->myaiocb64);
00528 #endif
00529 #else
00530         retval=aio_return(&first_ce->myaiocb);
00531 #endif
00532 #ifdef _LARGEFILE64_SOURCE 
00533 #ifdef __LP64__
00534         if(retval < first_ce->myaiocb.aio_nbytes)
00535 #else
00536         if(retval < first_ce->myaiocb64.aio_nbytes)
00537 #endif
00538 #else
00539         if(retval < first_ce->myaiocb.aio_nbytes)
00540 #endif
00541         {
00542             printf("aio_return error2: ret %d %d\n",retval,errno);
00543 #ifdef _LARGEFILE64_SOURCE 
00544 #ifdef __LP64__
00545             printf("aio_return error2: fd %d offset %lld buffer %lx size %d Opcode %d\n",
00546                 first_ce->myaiocb.aio_fildes,
00547                 first_ce->myaiocb.aio_offset,
00548                 (long)(first_ce->myaiocb.aio_buf),
00549                 first_ce->myaiocb.aio_nbytes,
00550                 first_ce->myaiocb.aio_lio_opcode
00551 #else
00552             printf("aio_return error2: fd %d offset %lld buffer %lx size %d Opcode %d\n",
00553                 first_ce->myaiocb64.aio_fildes,
00554                 first_ce->myaiocb64.aio_offset,
00555                 (long)(first_ce->myaiocb64.aio_buf),
00556                 first_ce->myaiocb64.aio_nbytes,
00557                 first_ce->myaiocb64.aio_lio_opcode
00558 #endif
00559 #else
00560             printf("aio_return error2: fd %d offset %d buffer %lx size %d Opcode %d\n",
00561                 first_ce->myaiocb.aio_fildes,
00562                 first_ce->myaiocb.aio_offset,
00563                 (long)(first_ce->myaiocb.aio_buf),
00564                 first_ce->myaiocb.aio_nbytes,
00565                 first_ce->myaiocb.aio_lio_opcode
00566 #endif
00567                 );
00568         }
00569         if(retval > 0)
00570         {
00571 #ifdef _LARGEFILE64_SOURCE 
00572 #ifdef __LP64__
00573             mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
00574 #else
00575             mbcopy((char *)first_ce->myaiocb64.aio_buf,(char *)ubuffer,(size_t)retval);
00576 #endif
00577 #else
00578             mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
00579 #endif
00580         }
00581         first_ce->direct=0;
00582         takeoff_cache(gc,first_ce);
00583     }
00584     return((int)retval);    
00585 }
00586 
00587 /************************************************************************
00588  * This routine allocates a cache_entry. It contains the 
00589  * aiocb block as well as linkage for use in the cache mechanism.
00590  * The space allocated here will be released after the cache entry
00591  * has been consumed. The routine takeoff_cache() will be called
00592  * after the data has been copied to user buffer or when the
00593  * cache is purged. The routine takeoff_cache() will also release
00594  * all memory associated with this cache entry.
00595  ************************************************************************/
00596 
00597 struct cache_ent *
00598 alloc_cache(gc,fd,offset,size,op)
00599 struct cache *gc;
00600 long long fd,size,op;
00601 off64_t offset;
00602 {
00603     struct cache_ent *ce;
00604     long temp;
00605     ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
00606     if(ce == (struct cache_ent *)0)
00607     {
00608         printf("Malloc failed\n");
00609         exit(175);
00610     }
00611     bzero(ce,sizeof(struct cache_ent));
00612 #ifdef _LARGEFILE64_SOURCE 
00613 #ifdef __LP64__
00614     ce->myaiocb.aio_fildes=(int)fd;
00615     ce->myaiocb.aio_offset=(off64_t)offset;
00616     ce->real_address = (char *)malloc((size_t)(size+page_size));
00617     temp=(long)ce->real_address;
00618     temp = (temp+page_size) & ~(page_size-1);
00619     ce->myaiocb.aio_buf=(volatile void *)temp;
00620     if(ce->myaiocb.aio_buf == 0)
00621 #else
00622     ce->myaiocb64.aio_fildes=(int)fd;
00623     ce->myaiocb64.aio_offset=(off64_t)offset;
00624     ce->real_address = (char *)malloc((size_t)(size+page_size));
00625     temp=(long)ce->real_address;
00626     temp = (temp+page_size) & ~(page_size-1);
00627     ce->myaiocb64.aio_buf=(volatile void *)temp;
00628     if(ce->myaiocb64.aio_buf == 0)
00629 #endif
00630 #else
00631     ce->myaiocb.aio_fildes=(int)fd;
00632     ce->myaiocb.aio_offset=(off_t)offset;
00633     ce->real_address = (char *)malloc((size_t)(size+page_size));
00634     temp=(long)ce->real_address;
00635     temp = (temp+page_size) & ~(page_size-1);
00636     ce->myaiocb.aio_buf=(volatile void *)temp;
00637     if(ce->myaiocb.aio_buf == 0)
00638 #endif
00639     {
00640         printf("Malloc failed\n");
00641         exit(176);
00642     }
00643     /*bzero(ce->myaiocb.aio_buf,(size_t)size);*/
00644 #ifdef _LARGEFILE64_SOURCE 
00645 #ifdef __LP64__
00646     ce->myaiocb.aio_reqprio=0;
00647     ce->myaiocb.aio_nbytes=(size_t)size;
00648     ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
00649     ce->myaiocb.aio_lio_opcode=(int)op;
00650 #else
00651     ce->myaiocb64.aio_reqprio=0;
00652     ce->myaiocb64.aio_nbytes=(size_t)size;
00653     ce->myaiocb64.aio_sigevent.sigev_notify=SIGEV_NONE;
00654     ce->myaiocb64.aio_lio_opcode=(int)op;
00655 #endif
00656 #else
00657     ce->myaiocb.aio_reqprio=0;
00658     ce->myaiocb.aio_nbytes=(size_t)size;
00659     ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
00660     ce->myaiocb.aio_lio_opcode=(int)op;
00661 #endif
00662     ce->fd=(int)fd;
00663     ce->forward=0;
00664     ce->back=gc->tail;
00665     if(gc->tail)
00666         gc->tail->forward = ce;
00667     gc->tail= ce;
00668     if(!gc->head)
00669         gc->head=ce;
00670     gc->count++;
00671     return(ce);
00672 }
00673 
00674 /************************************************************************
00675  * This routine checks to see if the requested data is in the
00676  * cache. 
00677 *************************************************************************/
00678 struct cache_ent *
00679 incache(gc,fd,offset,size)
00680 struct cache *gc;
00681 long long fd,size;
00682 off64_t offset;
00683 {
00684     struct cache_ent *move;
00685     if(gc->head==0)
00686     {
00687         return(0);
00688     }
00689     move=gc->head;
00690 #ifdef _LARGEFILE64_SOURCE 
00691 #ifdef __LP64__
00692     while(move)
00693     {
00694         if((move->fd == fd) && (move->myaiocb.aio_offset==(off64_t)offset) &&
00695             ((size_t)size==move->myaiocb.aio_nbytes))
00696             {
00697                 return(move);
00698             }
00699         move=move->forward;
00700     }
00701 #else
00702     while(move)
00703     {
00704         if((move->fd == fd) && (move->myaiocb64.aio_offset==(off64_t)offset) &&
00705             ((size_t)size==move->myaiocb64.aio_nbytes))
00706             {
00707                 return(move);
00708             }
00709         move=move->forward;
00710     }
00711 #endif
00712 #else
00713     while(move)
00714     {
00715         if((move->fd == fd) && (move->myaiocb.aio_offset==(off_t)offset) &&
00716             ((size_t)size==move->myaiocb.aio_nbytes))
00717             {
00718                 return(move);
00719             }
00720         move=move->forward;
00721     }
00722 #endif
00723     return(0);
00724 }
00725 
00726 /************************************************************************
00727  * This routine removes a specific cache entry from the cache, and
00728  * releases all memory associated witht the cache entry (if not direct).
00729 *************************************************************************/
00730 
00731 void
00732 takeoff_cache(gc,ce)
00733 struct cache *gc;
00734 struct cache_ent *ce;
00735 {
00736     struct cache_ent *move;
00737     long long found;
00738     move=gc->head;
00739     if(move==ce) /* Head of list */
00740     {
00741 
00742         gc->head=ce->forward;
00743         if(gc->head)
00744             gc->head->back=0;
00745         else
00746             gc->tail = 0;
00747         if(!ce->direct)
00748         {
00749             free((void *)(ce->real_address));
00750             free((void *)ce);
00751         }
00752         gc->count--;
00753         return;
00754     }
00755     found=0;
00756     while(move)
00757     {
00758         if(move==ce)
00759         {
00760             if(move->forward)
00761             {
00762                 move->forward->back=move->back;
00763             }
00764             if(move->back)
00765             {
00766                 move->back->forward=move->forward;
00767             }
00768             found=1;
00769             break;
00770         }
00771         else
00772         {
00773             move=move->forward;
00774         }
00775     }
00776     if(gc->head == ce)
00777         gc->tail = ce;
00778     if(!found)
00779         printf("Internal Error in takeoff cache\n");
00780     move=gc->head;
00781     if(!ce->direct)
00782     {
00783         free((void *)(ce->real_address));
00784         free((void *)ce);
00785     }
00786     gc->count--;
00787 }
00788 
00789 /************************************************************************
00790  * This routine is used to purge the entire cache. This is called when
00791  * the cache contains data but the incomming read was not able to 
00792  * be satisfied from the cache. This indicates that the previous
00793  * async read-ahead was not correct and a new pattern is emerging. 
00794  ************************************************************************/
00795 void
00796 del_cache(gc)
00797 struct cache *gc;
00798 {
00799     struct cache_ent *ce;
00800     ssize_t ret;
00801     ce=gc->head;
00802     while(1)
00803     {
00804         ce=gc->head;
00805         if(ce==0)
00806             return;
00807 #ifdef _LARGEFILE64_SOURCE 
00808 #ifdef __LP64__
00809         while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED)
00810 #else
00811         while((ret = aio_cancel64(0,&ce->myaiocb64))==AIO_NOTCANCELED)
00812 #endif
00813 #else
00814         while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED)
00815 #endif
00816             ; 
00817 
00818 #ifdef _LARGEFILE64_SOURCE 
00819 #ifdef __LP64__
00820         ret = aio_return(&ce->myaiocb);
00821 #else
00822         ret = aio_return64(&ce->myaiocb64);
00823 #endif
00824 #else
00825         ret = aio_return(&ce->myaiocb);
00826 #endif
00827         ce->direct=0;
00828         takeoff_cache(gc,ce);     /* remove from cache */
00829     }
00830 }
00831 
00832 /************************************************************************
00833  * Like its sister async_read() this function performs async I/O for 
00834  * all buffers but it differs in that it expects the caller to 
00835  * request a pointer to the data to be returned instead of handing
00836  * the function a location to put the data. This will allow the
00837  * async I/O to be performed and does not require any bcopy to be
00838  * done to put the data back into the location specified by the caller.
00839  ************************************************************************/
00840 int
00841 async_read_no_copy(gc, fd, ubuffer, offset, size, stride, max, depth)
00842 struct cache *gc;
00843 long long fd;
00844 char **ubuffer;
00845 off64_t offset;
00846 long long size;
00847 long long stride;
00848 off64_t max;
00849 long long depth;
00850 {
00851     off64_t a_offset,r_offset;
00852     long long a_size;
00853     struct cache_ent *ce,*first_ce=0;
00854     long long i;
00855     ssize_t retval=0;
00856     ssize_t ret;
00857     long long del_read=0;
00858     long long start=0;
00859 
00860     a_offset=offset;
00861     a_size = size;
00862     /*
00863      * Check to see if it can be completed from the cache
00864      */
00865     if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
00866     {
00867 #ifdef _LARGEFILE64_SOURCE 
00868 #ifdef __LP64__
00869         while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
00870         {
00871             async_suspend(ce);
00872         }
00873 #else
00874         while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS)
00875         {
00876             async_suspend(ce);
00877         }
00878 #endif
00879 #else
00880         while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
00881         {
00882             async_suspend(ce);
00883         }
00884 #endif
00885         if(ret)
00886             printf("aio_error 3: ret %d %d\n",ret,errno);
00887 #ifdef _LARGEFILE64_SOURCE 
00888 #ifdef __LP64__
00889         if(ce->oldbuf != ce->myaiocb.aio_buf ||
00890             ce->oldfd != ce->myaiocb.aio_fildes ||
00891             ce->oldsize != ce->myaiocb.aio_nbytes) 
00892 #else
00893         if(ce->oldbuf != ce->myaiocb64.aio_buf ||
00894             ce->oldfd != ce->myaiocb64.aio_fildes ||
00895             ce->oldsize != ce->myaiocb64.aio_nbytes) 
00896 #endif
00897 #else
00898         if(ce->oldbuf != ce->myaiocb.aio_buf ||
00899             ce->oldfd != ce->myaiocb.aio_fildes ||
00900             ce->oldsize != ce->myaiocb.aio_nbytes) 
00901 #endif
00902             printf("It changed in flight\n");
00903             
00904 #ifdef _LARGEFILE64_SOURCE 
00905 #ifdef __LP64__
00906         retval=aio_return(&ce->myaiocb);
00907 #else
00908         retval=aio_return64(&ce->myaiocb64);
00909 #endif
00910 #else
00911         retval=aio_return(&ce->myaiocb);
00912 #endif
00913         if(retval > 0)
00914         {
00915 #ifdef _LARGEFILE64_SOURCE 
00916 #ifdef __LP64__
00917             *ubuffer=(char *)ce->myaiocb.aio_buf;
00918 #else
00919             *ubuffer=(char *)ce->myaiocb64.aio_buf;
00920 #endif
00921 #else
00922             *ubuffer=(char *)ce->myaiocb.aio_buf;
00923 #endif
00924         }else
00925             *ubuffer=0;
00926 #ifdef _LARGEFILE64_SOURCE 
00927 #ifdef __LP64__
00928         if(retval < ce->myaiocb.aio_nbytes)
00929 #else
00930         if(retval < ce->myaiocb64.aio_nbytes)
00931 #endif
00932 #else
00933         if(retval < ce->myaiocb.aio_nbytes)
00934 #endif
00935         {
00936             printf("aio_return error4: ret %d %d\n",retval,errno);
00937 #ifdef _LARGEFILE64_SOURCE 
00938 #ifdef __LP64__
00939             printf("aio_return error4: fd %d offset %lld buffer %lx size %d Opcode %d\n",
00940                 ce->myaiocb.aio_fildes,
00941                 ce->myaiocb.aio_offset,
00942                 (long)(ce->myaiocb.aio_buf),
00943                 ce->myaiocb.aio_nbytes,
00944                 ce->myaiocb.aio_lio_opcode
00945 #else
00946             printf("aio_return error4: fd %d offset %lld buffer %lx size %d Opcode %d\n",
00947                 ce->myaiocb64.aio_fildes,
00948                 ce->myaiocb64.aio_offset,
00949                 (long)(ce->myaiocb64.aio_buf),
00950                 ce->myaiocb64.aio_nbytes,
00951                 ce->myaiocb64.aio_lio_opcode
00952 #endif
00953 #else
00954             printf("aio_return error4: fd %d offset %d buffer %lx size %d Opcode %d\n",
00955                 ce->myaiocb.aio_fildes,
00956                 ce->myaiocb.aio_offset,
00957                 (long)(ce->myaiocb.aio_buf),
00958                 ce->myaiocb.aio_nbytes,
00959                 ce->myaiocb.aio_lio_opcode
00960 #endif
00961                 );
00962         }
00963         ce->direct=1;
00964         takeoff_cache(gc,ce); /* do not delete buffer*/
00965         putoninuse(gc,ce);
00966     }else
00967     {
00968         /*
00969          * Clear the cache and issue the first request async()
00970          */
00971         del_cache(gc);
00972         del_read++;
00973         first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ); /* allocate buffer */
00974         /*printf("allocated buffer/read %x offset %d\n",first_ce->myaiocb.aio_buf,offset);*/
00975 again:
00976 #ifdef _LARGEFILE64_SOURCE 
00977 #ifdef __LP64__
00978         first_ce->oldbuf=first_ce->myaiocb.aio_buf;
00979         first_ce->oldfd=first_ce->myaiocb.aio_fildes;
00980         first_ce->oldsize=first_ce->myaiocb.aio_nbytes;
00981         ret=aio_read(&first_ce->myaiocb);
00982 #else
00983         first_ce->oldbuf=first_ce->myaiocb64.aio_buf;
00984         first_ce->oldfd=first_ce->myaiocb64.aio_fildes;
00985         first_ce->oldsize=first_ce->myaiocb64.aio_nbytes;
00986         ret=aio_read64(&first_ce->myaiocb64);
00987 #endif
00988 #else
00989         first_ce->oldbuf=first_ce->myaiocb.aio_buf;
00990         first_ce->oldfd=first_ce->myaiocb.aio_fildes;
00991         first_ce->oldsize=first_ce->myaiocb.aio_nbytes;
00992         ret=aio_read(&first_ce->myaiocb);
00993 #endif
00994         if(ret!=0)
00995         {
00996             if(errno==EAGAIN)
00997                 goto again;
00998             else
00999                 printf("error returned from aio_read(). Ret %d errno %d\n",ret,errno);
01000         }
01001     }
01002     if(stride==0)    /* User does not want read-ahead */
01003         goto out;
01004     if(a_offset<0)  /* Before beginning of file */
01005         goto out;
01006     if(a_offset+size>max)   /* After end of file */
01007         goto out;
01008     if(depth >=(max_depth-1))
01009         depth=max_depth-1;
01010     if(depth==0)
01011         goto out;
01012     if(gc->count > 1)
01013         start=depth-1;
01014     for(i=start;i<depth;i++)    /* Issue read-aheads for the depth specified */
01015     {
01016         r_offset=a_offset+((i+1)*(stride*a_size));
01017         if(r_offset<0)
01018             continue;
01019         if(r_offset+size > max)
01020             continue;
01021         if((ce=incache(gc,fd,r_offset,a_size)))
01022             continue;
01023         ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
01024 #ifdef _LARGEFILE64_SOURCE 
01025 #ifdef __LP64__
01026         ce->oldbuf=ce->myaiocb.aio_buf;
01027         ce->oldfd=ce->myaiocb.aio_fildes;
01028         ce->oldsize=ce->myaiocb.aio_nbytes;
01029         ret=aio_read(&ce->myaiocb);
01030 #else
01031         ce->oldbuf=ce->myaiocb64.aio_buf;
01032         ce->oldfd=ce->myaiocb64.aio_fildes;
01033         ce->oldsize=ce->myaiocb64.aio_nbytes;
01034         ret=aio_read64(&ce->myaiocb64);
01035 #endif
01036 #else
01037         ce->oldbuf=ce->myaiocb.aio_buf;
01038         ce->oldfd=ce->myaiocb.aio_fildes;
01039         ce->oldsize=ce->myaiocb.aio_nbytes;
01040         ret=aio_read(&ce->myaiocb);
01041 #endif
01042         if(ret!=0)
01043         {
01044             takeoff_cache(gc,ce);
01045             break;
01046         }
01047     }           
01048 out:
01049     if(del_read)    /* Wait for the first read to complete */
01050     {
01051 #ifdef _LARGEFILE64_SOURCE 
01052 #ifdef __LP64__
01053         while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
01054         {
01055             async_suspend(first_ce);
01056         }
01057 #else
01058         while((ret=aio_error64(&first_ce->myaiocb64))== EINPROGRESS)
01059         {
01060             async_suspend(first_ce);
01061         }
01062 #endif
01063 #else
01064         while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
01065         {
01066             async_suspend(first_ce);
01067         }
01068 #endif
01069         if(ret)
01070             printf("aio_error 4: ret %d %d\n",ret,errno);
01071 #ifdef _LARGEFILE64_SOURCE 
01072 #ifdef __LP64__
01073         if(first_ce->oldbuf != first_ce->myaiocb.aio_buf ||
01074             first_ce->oldfd != first_ce->myaiocb.aio_fildes ||
01075             first_ce->oldsize != first_ce->myaiocb.aio_nbytes) 
01076             printf("It changed in flight2\n");
01077         retval=aio_return(&first_ce->myaiocb);
01078 #else
01079         if(first_ce->oldbuf != first_ce->myaiocb64.aio_buf ||
01080             first_ce->oldfd != first_ce->myaiocb64.aio_fildes ||
01081             first_ce->oldsize != first_ce->myaiocb64.aio_nbytes) 
01082             printf("It changed in flight2\n");
01083         retval=aio_return64(&first_ce->myaiocb64);
01084 #endif
01085 #else
01086         if(first_ce->oldbuf != first_ce->myaiocb.aio_buf ||
01087             first_ce->oldfd != first_ce->myaiocb.aio_fildes ||
01088             first_ce->oldsize != first_ce->myaiocb.aio_nbytes) 
01089             printf("It changed in flight2\n");
01090         retval=aio_return(&first_ce->myaiocb);
01091 #endif
01092 #ifdef _LARGEFILE64_SOURCE 
01093 #ifdef __LP64__
01094         if(retval < first_ce->myaiocb.aio_nbytes)
01095 #else
01096         if(retval < first_ce->myaiocb64.aio_nbytes)
01097 #endif
01098 #else
01099         if(retval < first_ce->myaiocb.aio_nbytes)
01100 #endif
01101         {
01102             printf("aio_return error5: ret %d %d\n",retval,errno);
01103 #ifdef _LARGEFILE64_SOURCE 
01104 #ifdef __LP64__
01105             printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n",
01106                 first_ce->myaiocb.aio_fildes,
01107                 first_ce->myaiocb.aio_offset,
01108                 (long)(first_ce->myaiocb.aio_buf),
01109                 first_ce->myaiocb.aio_nbytes,
01110                 first_ce->myaiocb.aio_lio_opcode
01111 #else
01112             printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n",
01113                 first_ce->myaiocb64.aio_fildes,
01114                 first_ce->myaiocb64.aio_offset,
01115                 (long)(first_ce->myaiocb64.aio_buf),
01116                 first_ce->myaiocb64.aio_nbytes,
01117                 first_ce->myaiocb64.aio_lio_opcode
01118 #endif
01119 #else
01120             printf("aio_return error5: fd %d offset %ld buffer %lx size %d Opcode %d\n",
01121                 first_ce->myaiocb.aio_fildes,
01122                 first_ce->myaiocb.aio_offset,
01123                 (long)(first_ce->myaiocb.aio_buf),
01124                 first_ce->myaiocb.aio_nbytes,
01125                 first_ce->myaiocb.aio_lio_opcode
01126 #endif
01127                 );
01128         }
01129         if(retval > 0)
01130         {
01131 #ifdef _LARGEFILE64_SOURCE 
01132 #ifdef __LP64__
01133             *ubuffer=(char *)first_ce->myaiocb.aio_buf;
01134 #else
01135             *ubuffer=(char *)first_ce->myaiocb64.aio_buf;
01136 #endif
01137 #else
01138             *ubuffer=(char *)first_ce->myaiocb.aio_buf;
01139 #endif
01140         }else
01141             *ubuffer=(char *)0;
01142         first_ce->direct=1;  /* do not delete the buffer */
01143         takeoff_cache(gc,first_ce);
01144         putoninuse(gc,first_ce);
01145     }
01146     return((int)retval);    
01147 }
01148 
01149 /************************************************************************
01150  * The caller is now finished with the data that was provided so
01151  * the library is now free to return the memory to the pool for later
01152  * reuse.
01153  ************************************************************************/
01154 void
01155 async_release(gc)
01156 struct cache *gc;
01157 {
01158     takeoffinuse(gc);
01159 }
01160 
01161 
01162 /************************************************************************
01163  * Put the buffer on the inuse list. When the user is finished with 
01164  * the buffer it will call back into async_release and the items on the 
01165  * inuse list will be deallocated.
01166  ************************************************************************/
01167 void
01168 putoninuse(gc,entry)
01169 struct cache *gc;
01170 struct cache_ent *entry;
01171 {
01172     if(gc->inuse_head)
01173         entry->forward=gc->inuse_head;
01174     else
01175         entry->forward=0;
01176     gc->inuse_head=entry;
01177 }
01178 
01179 /************************************************************************
01180  * This is called when the application is finished with the data that
01181  * was provided. The memory may now be returned to the pool.
01182  ************************************************************************/
01183 void
01184 takeoffinuse(gc)
01185 struct cache *gc;
01186 {
01187     struct cache_ent *ce;
01188     if(gc->inuse_head==0)
01189         printf("Takeoffinuse error\n");
01190     ce=gc->inuse_head;
01191     gc->inuse_head=gc->inuse_head->forward;
01192     
01193     if(gc->inuse_head !=0)
01194         printf("Error in take off inuse\n");
01195     free((void*)(ce->real_address));
01196     free(ce);
01197 }
01198 
01199 /*************************************************************************
01200  * This routine is a generic async writer assist funtion. It takes
01201  * the same calling parameters as write() but also extends the
01202  * interface to include:
01203  * 
01204  * offset ..... offset in the file.
01205  * depth  ..... How much read-ahead do you want.
01206  * 
01207  *************************************************************************/
01208 size_t
01209 async_write(gc,fd,buffer,size,offset,depth)
01210 struct cache *gc;
01211 long long fd,size;
01212 char *buffer;
01213 off64_t offset;
01214 long long depth;
01215 {
01216     struct cache_ent *ce;
01217     size_t ret;
01218     ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,0LL,(char *)0,(char *)0);
01219     ce->direct=0;    /* not direct. Lib supplies buffer and must free it */
01220 #ifdef _LARGEFILE64_SOURCE 
01221 #ifdef __LP64__
01222     mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);
01223 #else
01224     mbcopy(buffer,(char *)(ce->myaiocb64.aio_buf),(size_t)size);
01225 #endif
01226 #else
01227     mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);
01228 #endif
01229     async_put_on_write_queue(gc,ce);
01230     /*
01231     printf("asw: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes,
01232         ce->myaiocb64.aio_offset,
01233         ce->myaiocb64.aio_nbytes);
01234     */  
01235 
01236 again:
01237 #ifdef _LARGEFILE64_SOURCE 
01238 #ifdef __LP64__
01239     ret=aio_write(&ce->myaiocb);
01240 #else
01241     ret=aio_write64(&ce->myaiocb64);
01242 #endif
01243 #else
01244     ret=aio_write(&ce->myaiocb);
01245 #endif
01246     if(ret==-1)
01247     {
01248         if(errno==EAGAIN)
01249         {
01250             async_wait_for_write(gc);
01251             goto again;
01252         }
01253         if(errno==0)
01254         {
01255             /* Compensate for bug in async library */
01256             async_wait_for_write(gc);
01257             goto again;
01258         }
01259         else
01260         {
01261             printf("Error in aio_write: ret %d errno %d count %lld\n",ret,errno,gc->w_count);
01262             /*
01263             printf("aio_write_no_copy: fd %d buffer %x offset %lld size %d\n",
01264                 ce->myaiocb64.aio_fildes,
01265                 ce->myaiocb64.aio_buf,
01266                 ce->myaiocb64.aio_offset,
01267                 ce->myaiocb64.aio_nbytes);
01268             */
01269             exit(177);
01270         }
01271     } 
01272     return((ssize_t)size);
01273 }
01274 
01275 /*************************************************************************
01276  * Allocate a write aiocb and write buffer of the size specified. Also 
01277  * put some extra buffer padding so that VX_DIRECT can do its job when
01278  * needed.
01279  *************************************************************************/
01280 
01281 struct cache_ent *
01282 allocate_write_buffer(gc,fd,offset,size,op,w_depth,direct,buffer,free_addr)
01283 struct cache *gc;
01284 long long fd,size,op;
01285 off64_t offset;
01286 long long w_depth;
01287 long long direct;
01288 char *buffer,*free_addr;
01289 {
01290     struct cache_ent *ce;
01291     long temp;
01292     if(fd==0LL)
01293     {
01294         printf("Setting up write buffer insane\n");
01295         exit(178);
01296     }
01297     if(gc->w_count > w_depth)
01298         async_wait_for_write(gc);
01299     ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
01300     if(ce == (struct cache_ent *)0)
01301     {
01302         printf("Malloc failed 1\n");
01303         exit(179);
01304     }
01305     bzero(ce,sizeof(struct cache_ent));
01306 #ifdef _LARGEFILE64_SOURCE 
01307 #ifdef __LP64__
01308     ce->myaiocb.aio_fildes=(int)fd;
01309     ce->myaiocb.aio_offset=(off64_t)offset;
01310     if(!direct)
01311     {
01312         ce->real_address = (char *)malloc((size_t)(size+page_size));
01313         temp=(long)ce->real_address;
01314         temp = (temp+page_size) & ~(page_size-1);
01315         ce->myaiocb.aio_buf=(volatile void *)temp;
01316     }else
01317     {
01318         ce->myaiocb.aio_buf=(volatile void *)buffer;
01319         ce->real_address=(char *)free_addr;
01320     }
01321     if(ce->myaiocb.aio_buf == 0)
01322 #else
01323     ce->myaiocb64.aio_fildes=(int)fd;
01324     ce->myaiocb64.aio_offset=(off64_t)offset;
01325     if(!direct)
01326     {
01327         ce->real_address = (char *)malloc((size_t)(size+page_size));
01328         temp=(long)ce->real_address;
01329         temp = (temp+page_size) & ~(page_size-1);
01330         ce->myaiocb64.aio_buf=(volatile void *)temp;
01331     }
01332     else
01333     {
01334         ce->myaiocb64.aio_buf=(volatile void *)buffer;
01335         ce->real_address=(char *)free_addr;
01336     }
01337     if(ce->myaiocb64.aio_buf == 0)
01338 #endif
01339 #else
01340     ce->myaiocb.aio_fildes=(int)fd;
01341     ce->myaiocb.aio_offset=(off_t)offset;
01342     if(!direct)
01343     {
01344         ce->real_address = (char *)malloc((size_t)(size+page_size));
01345         temp=(long)ce->real_address;
01346         temp = (temp+page_size) & ~(page_size-1);
01347         ce->myaiocb.aio_buf=(volatile void *)temp;
01348     }
01349     else
01350     {
01351         ce->myaiocb.aio_buf=(volatile void *)buffer;
01352         ce->real_address=(char *)free_addr;
01353     }
01354     if(ce->myaiocb.aio_buf == 0)
01355 #endif
01356     {
01357         printf("Malloc failed 2\n");
01358         exit(180);
01359     }
01360 #ifdef _LARGEFILE64_SOURCE 
01361 #ifdef __LP64__
01362     ce->myaiocb.aio_reqprio=0;
01363     ce->myaiocb.aio_nbytes=(size_t)size;
01364     ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
01365     ce->myaiocb.aio_lio_opcode=(int)op;
01366 #else
01367     ce->myaiocb64.aio_reqprio=0;
01368     ce->myaiocb64.aio_nbytes=(size_t)size;
01369     ce->myaiocb64.aio_sigevent.sigev_notify=SIGEV_NONE;
01370     ce->myaiocb64.aio_lio_opcode=(int)op;
01371 #endif
01372 #else
01373     ce->myaiocb.aio_reqprio=0;
01374     ce->myaiocb.aio_nbytes=(size_t)size;
01375     ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
01376     ce->myaiocb.aio_lio_opcode=(int)op;
01377 #endif
01378     ce->fd=(int)fd;
01379     return(ce);
01380 }
01381 
01382 /*************************************************************************
01383  * Put it on the outbound queue.
01384  *************************************************************************/
01385 
01386 void
01387 async_put_on_write_queue(gc,ce)
01388 struct cache *gc;
01389 struct cache_ent *ce;
01390 {
01391     ce->forward=0;
01392     ce->back=gc->w_tail;
01393     if(gc->w_tail)
01394         gc->w_tail->forward = ce;
01395     gc->w_tail= ce;
01396     if(!gc->w_head)
01397         gc->w_head=ce;
01398     gc->w_count++;
01399     return;
01400 }
01401 
01402 /*************************************************************************
01403  * Cleanup all outstanding writes
01404  *************************************************************************/
01405 void
01406 async_write_finish(gc)
01407 struct cache *gc;
01408 {
01409     while(gc->w_head)
01410     {
01411         /*printf("async_write_finish: Waiting for buffer %x to finish\n",gc->w_head->myaiocb64.aio_buf);*/
01412         async_wait_for_write(gc);
01413     }
01414 }
01415 
01416 /*************************************************************************
01417  * Wait for an I/O to finish
01418  *************************************************************************/
01419 
01420 void
01421 async_wait_for_write(gc)
01422 struct cache *gc;
01423 {
01424     struct cache_ent *ce;
01425     size_t ret,retval;
01426     if(gc->w_head==0)
01427         return;
01428     ce=gc->w_head;
01429     gc->w_head=ce->forward;
01430     gc->w_count--;
01431     ce->forward=0;
01432     if(ce==gc->w_tail)
01433         gc->w_tail=0;
01434     /*printf("Wait for buffer %x  offset %lld  size %d to finish\n",
01435         ce->myaiocb64.aio_buf,
01436         ce->myaiocb64.aio_offset,
01437         ce->myaiocb64.aio_nbytes);
01438     printf("write count %lld \n",gc->w_count);
01439     */
01440 #ifdef _LARGEFILE64_SOURCE 
01441 #ifdef __LP64__
01442     while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
01443     {
01444         async_suspend(ce);
01445     }
01446 #else
01447     while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS)
01448     {
01449         async_suspend(ce);
01450     }
01451 #endif
01452 #else
01453     while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
01454     {
01455         async_suspend(ce);
01456     }
01457 #endif
01458     if(ret)
01459     {
01460         printf("aio_error 5: ret %d %d\n",ret,errno);
01461 #ifdef _LARGEFILE64_SOURCE 
01462 #ifdef __LP64__
01463         printf("fd %d offset %lld size %d\n",
01464             ce->myaiocb.aio_fildes,
01465             ce->myaiocb.aio_offset,
01466             ce->myaiocb.aio_nbytes);
01467 #else
01468         printf("fd %d offset %lld size %d\n",
01469             ce->myaiocb64.aio_fildes,
01470             ce->myaiocb64.aio_offset,
01471             ce->myaiocb64.aio_nbytes);
01472 #endif
01473 #else
01474         printf("fd %d offset %lld size %d\n",
01475             ce->myaiocb.aio_fildes,
01476             ce->myaiocb.aio_offset,
01477             ce->myaiocb.aio_nbytes);
01478 #endif
01479         exit(181);
01480     }
01481 
01482 #ifdef _LARGEFILE64_SOURCE 
01483 #ifdef __LP64__
01484     retval=aio_return(&ce->myaiocb);
01485 #else
01486 #if defined(__CrayX1__)
01487     retval=aio_return64((aiocb64_t *)&ce->myaiocb64);
01488 #else
01489     retval=aio_return64((struct aiocb64 *)&ce->myaiocb64);
01490 #endif
01491 
01492 #endif
01493 #else
01494     retval=aio_return(&ce->myaiocb);
01495 #endif
01496     if((int)retval < 0)
01497     {
01498         printf("aio_return error: %d\n",errno);
01499     }
01500 
01501     if(!ce->direct)
01502     {
01503         /* printf("Freeing buffer %x\n",ce->real_address);*/
01504         free((void *)(ce->real_address));
01505         free((void *)ce);
01506     }
01507 
01508 }
01509 
01510 /*************************************************************************
01511  * This routine is a generic async writer assist funtion. It takes
01512  * the same calling parameters as write() but also extends the
01513  * interface to include:
01514  * 
01515  * offset ..... offset in the file.
01516  * depth  ..... How much read-ahead do you want.
01517  * free_addr .. address of memory to free after write is completed.
01518  * 
01519  *************************************************************************/
01520 size_t
01521 async_write_no_copy(gc,fd,buffer,size,offset,depth,free_addr)
01522 struct cache *gc;
01523 long long fd,size;
01524 char *buffer;
01525 off64_t offset;
01526 long long depth;
01527 char *free_addr;
01528 {
01529     struct cache_ent *ce;
01530     size_t ret;
01531     long long direct = 1;
01532     ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,direct,buffer,free_addr);
01533     ce->direct=0;   /* have library de-allocate the buffer */
01534     async_put_on_write_queue(gc,ce);
01535     /*
01536     printf("awnc: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes,
01537         ce->myaiocb64.aio_offset,
01538         ce->myaiocb64.aio_nbytes);
01539     */
01540 
01541 again:
01542 #ifdef _LARGEFILE64_SOURCE 
01543 #ifdef __LP64__
01544     ret=aio_write(&ce->myaiocb);
01545 #else
01546     ret=aio_write64(&ce->myaiocb64);
01547 #endif
01548 #else
01549     ret=aio_write(&ce->myaiocb);
01550 #endif
01551     if(ret==-1)
01552     {
01553         if(errno==EAGAIN)
01554         {
01555             async_wait_for_write(gc);
01556             goto again;
01557         }
01558         if(errno==0)
01559         {
01560             /* Compensate for bug in async library */
01561             async_wait_for_write(gc);
01562             goto again;
01563         }
01564         else
01565         {
01566             printf("Error in aio_write: ret %d errno %d\n",ret,errno);
01567 #ifdef _LARGEFILE64_SOURCE 
01568 #ifdef __LP64__
01569             printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n",
01570                 ce->myaiocb.aio_fildes,
01571                 (long)(ce->myaiocb.aio_buf),
01572                 ce->myaiocb.aio_offset,
01573                 ce->myaiocb.aio_nbytes);
01574 #else
01575             printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n",
01576                 ce->myaiocb64.aio_fildes,
01577                 (long)(ce->myaiocb64.aio_buf),
01578                 ce->myaiocb64.aio_offset,
01579                 ce->myaiocb64.aio_nbytes);
01580 #endif
01581 #else
01582             printf("aio_write_no_copy: fd %d buffer %lx offset %ld size %d\n",
01583                 ce->myaiocb.aio_fildes,
01584                 (long)(ce->myaiocb.aio_buf),
01585                 ce->myaiocb.aio_offset,
01586                 ce->myaiocb.aio_nbytes);
01587 #endif
01588             exit(182);
01589         }
01590     } 
01591     else    
01592     {
01593         return((ssize_t)size);
01594     }
01595 }
01596 
01597 void mbcopy(source, dest, len)
01598 char *source,*dest;
01599 size_t len;
01600 {
01601     int i;
01602     for(i=0;i<len;i++)
01603         *dest++=*source++;
01604 }
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Defines