|
Lines 1-5
Link Here
|
| 1 |
/********************************************************************** |
1 |
/********************************************************************** |
| 2 |
* Copyright (c) 2005 IBM Corporation and others. |
2 |
* Copyright (c) 2005, 2007 IBM Corporation and others. |
| 3 |
* All rights reserved. This program and the accompanying materials |
3 |
* All rights reserved. This program and the accompanying materials |
| 4 |
* are made available under the terms of the Eclipse Public License v1.0 |
4 |
* are made available under the terms of the Eclipse Public License v1.0 |
| 5 |
* which accompanies this distribution, and is available at |
5 |
* which accompanies this distribution, and is available at |
|
Lines 49-55
Link Here
|
| 49 |
216026 Jul/23/2002 dns Added check for writers before detaching from shm buffer |
49 |
216026 Jul/23/2002 dns Added check for writers before detaching from shm buffer |
| 50 |
230611 Nov/19/2002 dns set numUsedSlots correctly when chunk is full |
50 |
230611 Nov/19/2002 dns set numUsedSlots correctly when chunk is full |
| 51 |
232468 Nov/19/2002 dns added code to handle case when continueDataSize > dataLengh |
51 |
232468 Nov/19/2002 dns added code to handle case when continueDataSize > dataLengh |
| 52 |
Last Changed = 02/12/12 12:15:11 |
52 |
94473 Jun/10/2005 dns increased the wait before stopping flushing because |
|
|
53 |
nobody else is attached to shm |
| 54 |
Last Changed = 05/06/10 12:15:11 |
| 53 |
|
55 |
|
| 54 |
*******************************************************************************/ |
56 |
*******************************************************************************/ |
| 55 |
|
57 |
|
|
Lines 62-68
Link Here
|
| 62 |
#ifdef SQLWINT |
64 |
#ifdef SQLWINT |
| 63 |
#include <limits.h> |
65 |
#include <limits.h> |
| 64 |
#include <io.h> |
66 |
#include <io.h> |
| 65 |
#include <winsock.h> |
67 |
#include <winsock2.h> /* 134924 */ |
| 66 |
#else |
68 |
#else |
| 67 |
#ifdef OSS_DEBUG |
69 |
#ifdef OSS_DEBUG |
| 68 |
#include <errno.h> /* 198075 */ |
70 |
#include <errno.h> /* 198075 */ |
|
Lines 76-81
Link Here
|
| 76 |
#include <unistd.h> |
78 |
#include <unistd.h> |
| 77 |
#endif |
79 |
#endif |
| 78 |
|
80 |
|
|
|
81 |
#include <jni.h> |
| 79 |
|
82 |
|
| 80 |
/* MACRO's to simplify the JNI code. */ |
83 |
/* MACRO's to simplify the JNI code. */ |
| 81 |
#if defined __cplusplus |
84 |
#if defined __cplusplus |
|
Lines 227-233
Link Here
|
| 227 |
} |
230 |
} |
| 228 |
} |
231 |
} |
| 229 |
} |
232 |
} |
| 230 |
/* Write data to file */ |
233 |
/* Write data to file */ |
| 231 |
else { |
234 |
else { |
| 232 |
bytesWritten = write( fileDescriptor, (char *)*pcontinueBuffer, totalRecordSize ) ; |
235 |
bytesWritten = write( fileDescriptor, (char *)*pcontinueBuffer, totalRecordSize ) ; |
| 233 |
if ( (ssize_t)-1 == bytesWritten ) |
236 |
if ( (ssize_t)-1 == bytesWritten ) |
|
Lines 285-291
Link Here
|
| 285 |
/* If the whole record doesn't fit in this buffer then allocate a temporary |
288 |
/* If the whole record doesn't fit in this buffer then allocate a temporary |
| 286 |
buffer for the record and copy the part that is included in this buffer |
289 |
buffer for the record and copy the part that is included in this buffer |
| 287 |
into the temp bufer*/ |
290 |
into the temp bufer*/ |
| 288 |
if (offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength >= dataLength) { |
291 |
if (offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength > dataLength) { |
| 289 |
|
292 |
|
| 290 |
continueDataSize = offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength - dataLength; |
293 |
continueDataSize = offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength - dataLength; |
| 291 |
bFlushLen = dataLength - offset; |
294 |
bFlushLen = dataLength - offset; |
|
Lines 827-832
Link Here
|
| 827 |
Uint32 slotIndex = 0 ; |
830 |
Uint32 slotIndex = 0 ; |
| 828 |
Uint32 loopIndex = 0 ; |
831 |
Uint32 loopIndex = 0 ; |
| 829 |
Uint32 timedOut = 0 ; |
832 |
Uint32 timedOut = 0 ; |
|
|
833 |
Uint32 attachCheckIndex = 0 ; /* bugzilla 94473 */ |
| 830 |
int flushDataLen; |
834 |
int flushDataLen; |
| 831 |
OSSRamboCB * pRamboCB ; |
835 |
OSSRamboCB * pRamboCB ; |
| 832 |
OSSRamboChunkCB * pChunkCB = NULL ; |
836 |
OSSRamboChunkCB * pChunkCB = NULL ; |
|
Lines 944-958
Link Here
|
| 944 |
|
948 |
|
| 945 |
loopIndex = 0; |
949 |
loopIndex = 0; |
| 946 |
timedOut = 0; |
950 |
timedOut = 0; |
|
|
951 |
attachCheckIndex = 0; /* 94473 */ |
| 947 |
/* Wait until the chunk is full or we have been asked to stop or we have waited too long */ |
952 |
/* Wait until the chunk is full or we have been asked to stop or we have waited too long */ |
| 948 |
for ( ; ; ) |
953 |
for ( ; ; ) |
| 949 |
{ |
954 |
{ |
|
|
955 |
/* If flushing is to be stopped then set the stopping flag and quit the loop */ |
| 950 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
956 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
| 951 |
{ |
957 |
{ |
| 952 |
stopping = true ; |
958 |
stopping = true ; |
| 953 |
break ; |
959 |
break ; |
| 954 |
} |
960 |
} |
| 955 |
|
961 |
|
|
|
962 |
/* If the chunk is full then quit the loop so it can be flushed. */ |
| 956 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
963 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
| 957 |
{ |
964 |
{ |
| 958 |
#ifdef OSS_DEBUG1 |
965 |
#ifdef OSS_DEBUG1 |
|
Lines 962-968
Link Here
|
| 962 |
#endif |
969 |
#endif |
| 963 |
break ; |
970 |
break ; |
| 964 |
} |
971 |
} |
|
|
972 |
/* Else If we have looped too long */ |
| 965 |
else if (loopIndex == 100) { |
973 |
else if (loopIndex == 100) { |
|
|
974 |
/* If there is any data in the chunk Then quit the loop to flush it */ |
| 966 |
if ((ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) - ossAtomicPeek(&pChunkCB->flushedCount) > 0 || |
975 |
if ((ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) - ossAtomicPeek(&pChunkCB->flushedCount) > 0 || |
| 967 |
(ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0)) { |
976 |
(ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0)) { |
| 968 |
timedOut = 1; |
977 |
timedOut = 1; |
|
Lines 973-984
Link Here
|
| 973 |
break ; |
982 |
break ; |
| 974 |
} |
983 |
} |
| 975 |
/* 205955 begin */ |
984 |
/* 205955 begin */ |
|
|
985 |
/* Else get the attach count to see if anyone else is attached */ |
| 976 |
else { |
986 |
else { |
| 977 |
int attcnt = 0; |
987 |
int attcnt = 0; |
| 978 |
OSSErr osserr = OSS_OK ; |
988 |
OSSErr osserr = OSS_OK ; |
| 979 |
osserr = ossIPCMemAttachCount( pRambo->ipcMemHandle, &attcnt ) ; |
989 |
osserr = ossIPCMemAttachCount( pRambo->ipcMemHandle, &attcnt ) ; |
| 980 |
/* If nobody is attached to the shared memory except for us then stop flushing */ |
990 |
attachCheckIndex++; /* bugzilla 94473 */ |
| 981 |
if ( osserr == OSS_OK && attcnt == 1 ) |
991 |
/* If nobody is attached to the shared memory except for us then stop flushing |
|
|
992 |
* bugzilla 94473 - only stop flushing if we have gone through the wait loop four |
| 993 |
* times to provide more time for the agent to attach to it (4 seconds instead of 1). */ |
| 994 |
if ( osserr == OSS_OK && attcnt == 1 && attachCheckIndex >= 4) |
| 982 |
{ |
995 |
{ |
| 983 |
/* set the user count to 1 so the shared memory gets deleted when it is destroyed */ |
996 |
/* set the user count to 1 so the shared memory gets deleted when it is destroyed */ |
| 984 |
ossIPCMemSetUserCount( pRambo->ipcMemHandle, attcnt ) ; |
997 |
ossIPCMemSetUserCount( pRambo->ipcMemHandle, attcnt ) ; |
|
Lines 1505-1510
Link Here
|
| 1505 |
} |
1518 |
} |
| 1506 |
|
1519 |
|
| 1507 |
|
1520 |
|
|
|
1521 |
/******************************************************************************* |
| 1522 |
|
| 1523 |
Function Name |
| 1524 |
dataToStrProcessor |
| 1525 |
|
| 1526 |
Function |
| 1527 |
Processes data from a RAMBO buffer and writes it out a java OutputStream |
| 1528 |
|
| 1529 |
Inputs |
| 1530 |
pTraceData - addr of trace data to process |
| 1531 |
dataLength - length of data to process |
| 1532 |
continueDataSize - size of data at the start of the buffer to send without processing because it is the remainder of a record |
| 1533 |
jenv - Java environment |
| 1534 |
jobj - Java OutputStream object |
| 1535 |
jmethod - the OutputStream write method ID |
| 1536 |
|
| 1537 |
Normal Return |
| 1538 |
returns size of last part of the data record that did not fit in this buffer |
| 1539 |
|
| 1540 |
Error Return |
| 1541 |
-1 |
| 1542 |
**********************************************************************************/ |
| 1543 |
|
| 1544 |
int dataToStrProcessor( |
| 1545 |
void *pTraceData, |
| 1546 |
int dataLength, |
| 1547 |
int continueDataSize, /* size of end of msg from previous buffer to send */ |
| 1548 |
JNIEnv *jenv, /* Java environemnt */ |
| 1549 |
jobject jobj, /* OutputStream object */ |
| 1550 |
jmethodID jmethod ) { /* write method ID */ |
| 1551 |
|
| 1552 |
char *pFlushData; |
| 1553 |
Uint32 recordLength; |
| 1554 |
int offset = 0; |
| 1555 |
int endOffset = dataLength - OSS_RAMBO_DATA_HEADER_SIZE; |
| 1556 |
int flushLen; /* length of data to flush to file descriptor */ |
| 1557 |
int bFlushLen; /* length of data to flush from buffer (includes rambo data header length) */ |
| 1558 |
|
| 1559 |
#ifdef OSS_DEBUG |
| 1560 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"dataProcessor: pTraceData=%p *pTraceData=%x/%x/%x/%x/%x contSize=%d\n", |
| 1561 |
pTraceData, (*(char *)pTraceData & 0x000000ff),(*((char *)pTraceData+1) & 0x000000ff), |
| 1562 |
(*((char *)pTraceData+2) & 0x000000ff), (*((char *)pTraceData+3) & 0x000000ff), |
| 1563 |
(*((char *)pTraceData+4) & 0x000000ff), continueDataSize); |
| 1564 |
#endif |
| 1565 |
|
| 1566 |
/* If there was a record that started in the previous buffer but the buffer was not big |
| 1567 |
enough to contain the whole record, then the rest of the record is at the beginning |
| 1568 |
of this buffer so flush the remainder of the record. |
| 1569 |
*/ |
| 1570 |
if (continueDataSize > 0) { |
| 1571 |
int conFlushLen; |
| 1572 |
|
| 1573 |
/* The remainder of the record may be bigger than the size of this buffer */ |
| 1574 |
if (continueDataSize > dataLength) { |
| 1575 |
/* Flush this whole buffer to the file or socket */ |
| 1576 |
conFlushLen = dataLength; |
| 1577 |
} |
| 1578 |
else { |
| 1579 |
/* Write the remaninder of the previous record out to the file or socket */ |
| 1580 |
conFlushLen = continueDataSize; |
| 1581 |
} |
| 1582 |
|
| 1583 |
/* Format the java input arguments */ |
| 1584 |
|
| 1585 |
/* Possible memory leak here if we create a new byte array every time |
| 1586 |
* - need to confirm this |
| 1587 |
*/ |
| 1588 |
if(conFlushLen > 0) { |
| 1589 |
jbyteArray jbarr; |
| 1590 |
jthrowable jexc; |
| 1591 |
jbyte *tmpBuffer; |
| 1592 |
|
| 1593 |
jbarr = ENV(jenv)->NewByteArray(ENVPARM(jenv) conFlushLen); |
| 1594 |
if(jbarr == NULL) { |
| 1595 |
return -1; |
| 1596 |
} |
| 1597 |
|
| 1598 |
tmpBuffer = (signed char*)malloc(sizeof(signed char) * conFlushLen); |
| 1599 |
memcpy(tmpBuffer, (signed char*)pTraceData, sizeof(signed char) * conFlushLen); |
| 1600 |
|
| 1601 |
ENV(jenv)->SetByteArrayRegion(ENVPARM(jenv) jbarr, 0, conFlushLen, tmpBuffer); |
| 1602 |
|
| 1603 |
/* Write the data out to the java OutputStream */ |
| 1604 |
ENV(jenv)->CallVoidMethod(ENVPARM(jenv) jobj, jmethod, jbarr, 0, conFlushLen); |
| 1605 |
|
| 1606 |
/* Check for a java exception */ |
| 1607 |
jexc = ENV(jenv)->ExceptionOccurred(ENVPARM1(jenv)); |
| 1608 |
if(jexc) { |
| 1609 |
ENV(jenv)->ExceptionClear(ENVPARM1(jenv)); |
| 1610 |
} |
| 1611 |
|
| 1612 |
// ENV(jenv)->ReleaseByteArrayElements(ENVPARM(jenv) jbarr, tmpBuffer, 0); |
| 1613 |
ENV(jenv)->DeleteLocalRef(ENVPARM(jenv) jbarr); |
| 1614 |
free(tmpBuffer); |
| 1615 |
|
| 1616 |
if(jexc) { |
| 1617 |
return -1; |
| 1618 |
} |
| 1619 |
} |
| 1620 |
|
| 1621 |
offset += OSS_RAMBO_ROUND_SLOT_SIZE( conFlushLen ); |
| 1622 |
continueDataSize -= conFlushLen; |
| 1623 |
} |
| 1624 |
|
| 1625 |
/* Process the data until we have reached the end of the buffer */ |
| 1626 |
while (offset < endOffset) { |
| 1627 |
|
| 1628 |
#ifdef OSS_DEBUG |
| 1629 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"dataProcessor: *pTraceData at offset %d =%x/%x/%x/%x %x/%x/%x/%x/%x\n", |
| 1630 |
offset, (*((char *)pTraceData+offset) & 0x000000ff),(*((char *)pTraceData+offset+1) & 0x000000ff), |
| 1631 |
(*((char *)pTraceData+offset+2) & 0x000000ff), (*((char *)pTraceData+offset+3) & 0x000000ff), |
| 1632 |
(*((char *)pTraceData+offset+4) & 0x000000ff), (*((char *)pTraceData+offset+5) & 0x000000ff), |
| 1633 |
(*((char *)pTraceData+offset+6) & 0x000000ff), (*((char *)pTraceData+offset+7) & 0x000000ff), |
| 1634 |
(*((char *)pTraceData+offset+8) & 0x000000ff)); |
| 1635 |
#endif |
| 1636 |
|
| 1637 |
pFlushData = (char *)pTraceData + offset; |
| 1638 |
flushLen = 0; |
| 1639 |
bFlushLen = 0; |
| 1640 |
/* read the data length from the buffer */ |
| 1641 |
recordLength = *(Uint32 *)pFlushData; |
| 1642 |
|
| 1643 |
/* If the whole record doesn't fit in this buffer then flush what is included |
| 1644 |
in this buffer */ |
| 1645 |
if (offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength >= dataLength) { |
| 1646 |
continueDataSize = offset + OSS_RAMBO_DATA_HEADER_SIZE + recordLength - dataLength; |
| 1647 |
bFlushLen = dataLength - offset; |
| 1648 |
#ifdef OSS_DEBUG |
| 1649 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"dataProcessor: Reached end of buffer recordLength=%d bflushLen=%d pFlushData=%p\n", |
| 1650 |
recordLength, bFlushLen, pFlushData); |
| 1651 |
#endif |
| 1652 |
} |
| 1653 |
/* Else the whole record is contained in this buffer so flush it */ |
| 1654 |
else { |
| 1655 |
bFlushLen = OSS_RAMBO_DATA_HEADER_SIZE + recordLength; |
| 1656 |
} |
| 1657 |
|
| 1658 |
flushLen = bFlushLen - OSS_RAMBO_DATA_HEADER_SIZE; |
| 1659 |
pFlushData = (char *)pFlushData + OSS_RAMBO_DATA_HEADER_SIZE; |
| 1660 |
|
| 1661 |
#ifdef OSS_DEBUG |
| 1662 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"dataProcessor: recordLength=%d flushLen=%d bFlushLen=%d pFlushData=%p", |
| 1663 |
recordLength, flushLen, bFlushLen, pFlushData); |
| 1664 |
#endif |
| 1665 |
|
| 1666 |
/* Format the java input arguments */ |
| 1667 |
|
| 1668 |
/* Possible memory leak here if we create a new byte array every time |
| 1669 |
* - need to confirm this |
| 1670 |
*/ |
| 1671 |
if(flushLen > 0) { |
| 1672 |
jbyteArray jbarr; |
| 1673 |
jthrowable jexc; |
| 1674 |
jbyte *tmpBuffer; |
| 1675 |
|
| 1676 |
jbarr = ENV(jenv)->NewByteArray(ENVPARM(jenv) flushLen); |
| 1677 |
if(jbarr == NULL) { |
| 1678 |
return -1; |
| 1679 |
} |
| 1680 |
|
| 1681 |
tmpBuffer = (signed char*)malloc(sizeof(signed char) * flushLen); |
| 1682 |
memcpy(tmpBuffer, (signed char*)pFlushData, sizeof(signed char) * flushLen); |
| 1683 |
|
| 1684 |
ENV(jenv)->SetByteArrayRegion(ENVPARM(jenv) jbarr, 0, flushLen, tmpBuffer); |
| 1685 |
|
| 1686 |
/* Write the data out to the java OutputStream */ |
| 1687 |
ENV(jenv)->CallVoidMethod(ENVPARM(jenv) jobj, jmethod, jbarr, 0, flushLen); |
| 1688 |
|
| 1689 |
/* Check for a java exception */ |
| 1690 |
jexc = ENV(jenv)->ExceptionOccurred(ENVPARM1(jenv)); |
| 1691 |
if(jexc) { |
| 1692 |
ENV(jenv)->ExceptionClear(ENVPARM1(jenv)); |
| 1693 |
} |
| 1694 |
|
| 1695 |
// ENV(jenv)->ReleaseByteArrayElements(ENVPARM(jenv) jbarr, tmpBuffer, 0); |
| 1696 |
ENV(jenv)->DeleteLocalRef(ENVPARM(jenv) jbarr); |
| 1697 |
free(tmpBuffer); |
| 1698 |
|
| 1699 |
if(jexc) { |
| 1700 |
return -1; |
| 1701 |
} |
| 1702 |
} |
| 1703 |
|
| 1704 |
/* increase the offset by the number of slots that were flushed */ |
| 1705 |
offset += OSS_RAMBO_ROUND_SLOT_SIZE( bFlushLen ); |
| 1706 |
} |
| 1707 |
return continueDataSize; |
| 1708 |
} |
| 1709 |
|
| 1508 |
|
1710 |
|
| 1509 |
|
1711 |
|
| 1510 |
/******************************************************************************* |
1712 |
/******************************************************************************* |
|
Lines 1544-1549
Link Here
|
| 1544 |
Uint32 chunkIndex = 0 ; |
1746 |
Uint32 chunkIndex = 0 ; |
| 1545 |
Uint32 slotIndex = 0 ; |
1747 |
Uint32 slotIndex = 0 ; |
| 1546 |
Uint32 loopIndex = 0 ; |
1748 |
Uint32 loopIndex = 0 ; |
|
|
1749 |
Uint32 attachCheckIndex = 0 ; /* bugzilla 94473 */ |
| 1547 |
Uint32 timedOut = 0 ; |
1750 |
Uint32 timedOut = 0 ; |
| 1548 |
int flushDataLen; |
1751 |
int flushDataLen; |
| 1549 |
OSSRamboCB * pRamboCB ; |
1752 |
OSSRamboCB * pRamboCB ; |
|
Lines 1661-1669
Link Here
|
| 1661 |
|
1864 |
|
| 1662 |
loopIndex = 0; |
1865 |
loopIndex = 0; |
| 1663 |
timedOut = 0; |
1866 |
timedOut = 0; |
|
|
1867 |
attachCheckIndex = 0; /* 94473 */ |
| 1664 |
/* Wait until the chunk is full or we have been asked to stop or we have waited too long */ |
1868 |
/* Wait until the chunk is full or we have been asked to stop or we have waited too long */ |
| 1665 |
for ( ; ; ) |
1869 |
for ( ; ; ) |
| 1666 |
{ |
1870 |
{ |
|
|
1871 |
/* If flushing is to be stopped then set the stopping flag and quit the loop */ |
| 1667 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
1872 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
| 1668 |
{ |
1873 |
{ |
| 1669 |
stopping = true ; |
1874 |
stopping = true ; |
|
Lines 1673-1678
Link Here
|
| 1673 |
break ; |
1878 |
break ; |
| 1674 |
} |
1879 |
} |
| 1675 |
|
1880 |
|
|
|
1881 |
/* If the chunk is full then quit the loop so it can be flushed. */ |
| 1676 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
1882 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
| 1677 |
{ |
1883 |
{ |
| 1678 |
#ifdef OSS_DEBUG |
1884 |
#ifdef OSS_DEBUG |
|
Lines 1680-1686
Link Here
|
| 1680 |
#endif |
1886 |
#endif |
| 1681 |
break ; |
1887 |
break ; |
| 1682 |
} |
1888 |
} |
|
|
1889 |
/* Else If we have looped too long */ |
| 1683 |
else if (loopIndex == 100) { |
1890 |
else if (loopIndex == 100) { |
|
|
1891 |
/* If there is any data in the chunk Then quit the loop to flush it */ |
| 1684 |
if ((ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) - ossAtomicPeek(&pChunkCB->flushedCount) > 0 || |
1892 |
if ((ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) - ossAtomicPeek(&pChunkCB->flushedCount) > 0 || |
| 1685 |
(ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0)) { |
1893 |
(ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0)) { |
| 1686 |
timedOut = 1; |
1894 |
timedOut = 1; |
|
Lines 1690-1695
Link Here
|
| 1690 |
break ; |
1898 |
break ; |
| 1691 |
} |
1899 |
} |
| 1692 |
/* 205955 begin */ |
1900 |
/* 205955 begin */ |
|
|
1901 |
/* Else get the attach count to see if anyone else is attached */ |
| 1693 |
else { |
1902 |
else { |
| 1694 |
int attcnt = 0; |
1903 |
int attcnt = 0; |
| 1695 |
OSSErr osserr = OSS_OK ; |
1904 |
OSSErr osserr = OSS_OK ; |
|
Lines 1697-1704
Link Here
|
| 1697 |
#ifdef OSS_DEBUG |
1906 |
#ifdef OSS_DEBUG |
| 1698 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"flushToFD: attach count = %d", attcnt); |
1907 |
ra_logServiceMessage(__FILE__, __LINE__,RA_DEBUG,"flushToFD: attach count = %d", attcnt); |
| 1699 |
#endif |
1908 |
#endif |
| 1700 |
/* If nobody is attached to the shared memory except for us then stop flushing */ |
1909 |
attachCheckIndex++; /* bugzilla 94473 */ |
| 1701 |
if ( osserr == OSS_OK && attcnt == 1 ) |
1910 |
/* If nobody is attached to the shared memory except for us then stop flushing |
|
|
1911 |
* bugzilla 94473 - only stop flushing if we have gone through the wait loop four |
| 1912 |
* times to provide more time for the agent to attach to it (4 seconds instead of 1). */ |
| 1913 |
if ( osserr == OSS_OK && attcnt == 1 && attachCheckIndex >= 4) |
| 1702 |
{ |
1914 |
{ |
| 1703 |
/* set the user count to 1 so the shared memory gets deleted when it is destroyed */ |
1915 |
/* set the user count to 1 so the shared memory gets deleted when it is destroyed */ |
| 1704 |
ossIPCMemSetUserCount( pRambo->ipcMemHandle, attcnt ) ; |
1916 |
ossIPCMemSetUserCount( pRambo->ipcMemHandle, attcnt ) ; |
|
Lines 1905-1930
Link Here
|
| 1905 |
goto errorProcessingFailed ; |
2117 |
goto errorProcessingFailed ; |
| 1906 |
} |
2118 |
} |
| 1907 |
|
2119 |
|
| 1908 |
/* Get a pointer to the chunk of memory */ |
2120 |
/* Get a pointer to the chunk of memory */ |
| 1909 |
pChunk = (void *)( (uintptr_t)pMemBuffer(pRamboCB) |
2121 |
pChunk = (void *)( (uintptr_t)pMemBuffer(pRamboCB) |
| 1910 |
+ ( chunkIndex * OSS_RAMBO_CHUNK_SIZE ) ) ; |
2122 |
+ ( chunkIndex * OSS_RAMBO_CHUNK_SIZE ) ) ; |
| 1911 |
if (ossAtomicPeek(&pChunkCB->flushedCount) > 0) |
2123 |
if (ossAtomicPeek(&pChunkCB->flushedCount) > 0) |
| 1912 |
pChunk = (void *)((char *)pChunk + ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE); |
2124 |
pChunk = (void *)((char *)pChunk + ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE); |
| 1913 |
OSS_ASSERT( pChunk != NULL ) ; |
2125 |
OSS_ASSERT( pChunk != NULL ) ; |
| 1914 |
OSS_ASSERT( (uintptr_t)pChunk >= (uintptr_t)pMemBuffer(pRamboCB) ) ; |
2126 |
OSS_ASSERT( (uintptr_t)pChunk >= (uintptr_t)pMemBuffer(pRamboCB) ) ; |
| 1915 |
|
2127 |
|
| 1916 |
/* Pass the chunk to the data processor, but only process the filled slots */ |
2128 |
/* Pass the chunk to the data processor, but only process the filled slots */ |
| 1917 |
|
2129 |
|
| 1918 |
err = dataProcessor( pChunk, numUsedSlots * OSS_RAMBO_SLOT_SIZE, continueDataSize, fileDescriptor, descriptorType ) ; |
2130 |
err = dataProcessor( pChunk, numUsedSlots * OSS_RAMBO_SLOT_SIZE, continueDataSize, fileDescriptor, descriptorType ) ; |
| 1919 |
if ( 0 > err ) { |
2131 |
if ( 0 > err ) { |
| 1920 |
#ifdef OSS_DEBUG1 |
2132 |
#ifdef OSS_DEBUG1 |
| 1921 |
fprintf(dbgfd,"ossRamboFlushToFD: failed writing the chunk %d of shared memory. error=%d\n", chunkIndex, err ) ; |
2133 |
fprintf(dbgfd,"ossRamboFlushToFD: failed writing the chunk %d of shared memory. error=%d\n", chunkIndex, err ) ; |
| 1922 |
fflush(dbgfd); |
2134 |
fflush(dbgfd); |
| 1923 |
#endif |
2135 |
#endif |
| 1924 |
goto errorProcessingFailed ; |
2136 |
goto errorProcessingFailed ; |
| 1925 |
} |
2137 |
} |
| 1926 |
else |
2138 |
else |
| 1927 |
continueDataSize = err; |
2139 |
continueDataSize = err; |
| 1928 |
|
2140 |
|
| 1929 |
#ifdef OSS_DEBUG1 |
2141 |
#ifdef OSS_DEBUG1 |
| 1930 |
fprintf(dbgfd,"Flushed unfilled chunk index %lu has fillCount %lu\n", |
2142 |
fprintf(dbgfd,"Flushed unfilled chunk index %lu has fillCount %lu\n", |
|
Lines 1982-2079
Link Here
|
| 1982 |
/* 198075 end */ |
2194 |
/* 198075 end */ |
| 1983 |
|
2195 |
|
| 1984 |
|
2196 |
|
| 1985 |
|
|
|
| 1986 |
|
| 1987 |
/******************************************************************************* |
2197 |
/******************************************************************************* |
| 1988 |
|
2198 |
|
| 1989 |
Name |
2199 |
Function Name |
| 1990 |
ossRamboStopFlusherAndDetach |
2200 |
ossRamboFlushToStream |
| 1991 |
|
2201 |
|
| 1992 |
Function |
2202 |
Function |
| 1993 |
Tells flusher routine to stop flushing buffer and detach from buffer. |
2203 |
Flushes the RAMBO buffer to a Java Stream on a timed basis. It |
|
|
2204 |
does not wait for a chunk to be filled before flushing the data in it. |
| 1994 |
|
2205 |
|
| 1995 |
Inputs |
2206 |
Inputs |
| 1996 |
1. pRambo |
2207 |
1. pRambo |
| 1997 |
Address of a RAMBO handle. |
2208 |
Address of a RAMBO buffer control block. |
|
|
2209 |
2. outStrObj |
| 2210 |
java OutputStream object |
| 1998 |
|
2211 |
|
| 1999 |
Normal Return |
2212 |
Normal Returns |
| 2000 |
OSS_OK |
2213 |
OSS_OK |
| 2001 |
|
2214 |
|
| 2002 |
Error Return |
2215 |
Error Returns |
| 2003 |
OSS_ERR_INVALID |
2216 |
OSS_ERR_INVALID |
| 2004 |
OSS_ERR_RAMBO_NOT_INIT |
2217 |
OSS_ERR_RAMBO_NOT_INIT |
|
|
2218 |
OSS_ERR_RAMBO_AUTOFLUSHER_ENABLED |
| 2219 |
OSS_ERR_RAMBO_IO_ERROR |
| 2005 |
|
2220 |
|
| 2006 |
******************************************************************************/ |
2221 |
******************************************************************************/ |
| 2007 |
OSS_EXTERNC OSSErr OSS_API ossRamboStopFlusherAndDetach( OSSRambo * pRambo ) |
2222 |
OSS_EXTERNC OSSErr OSS_API ossRamboFlushToStream( |
|
|
2223 |
OSSRambo *pRambo, |
| 2224 |
JNIEnv *jenv, |
| 2225 |
jobject jOutStrObj ) |
| 2008 |
{ |
2226 |
{ |
| 2009 |
OSSErr osserr = OSS_OK ; |
2227 |
OSSErr osserr = OSS_OK ; |
|
|
2228 |
ssize_t bytesWritten = 0 ; |
| 2229 |
Uint32 chunkIndex = 0 ; |
| 2230 |
Uint32 slotIndex = 0 ; |
| 2231 |
Uint32 loopIndex = 0 ; |
| 2232 |
Uint32 timedOut = 0 ; |
| 2233 |
Uint32 attachCheckIndex = 0 ; /* bugzilla 94473 */ |
| 2234 |
int flushDataLen; |
| 2235 |
OSSRamboCB * pRamboCB ; |
| 2236 |
OSSRamboChunkCB * pChunkCB = NULL ; |
| 2237 |
void * pChunk = NULL ; |
| 2238 |
bool wrappedChunk = false ; |
| 2239 |
bool flushUnfilledChunks = false ; |
| 2240 |
bool stopping = false; |
| 2241 |
Sint32 err = 0 ; |
| 2242 |
Sint32 continueDataSize = 0; |
| 2243 |
jclass jcls = NULL; |
| 2244 |
jmethodID jmethod = NULL; |
| 2010 |
|
2245 |
|
| 2011 |
/* Validate parameters */ |
2246 |
#ifdef OSS_DEBUG1 |
|
|
2247 |
FILE *dbgfd; |
| 2248 |
#endif |
| 2249 |
|
| 2250 |
#ifdef OSS_DEBUG1 |
| 2251 |
dbgfd = fopen("flushdbg.txt", "ac+"); |
| 2252 |
if (dbgfd == NULL) { |
| 2253 |
osserr = errno; |
| 2254 |
goto exit; |
| 2255 |
} |
| 2256 |
if ( 0 > fprintf(dbgfd,"ossRamboFlushToFD: Entered.\n" ) ) { |
| 2257 |
osserr = OSS_ERR_RAMBO_IO_ERROR; |
| 2258 |
goto exit; |
| 2259 |
} |
| 2260 |
fflush(dbgfd); |
| 2261 |
#endif |
| 2262 |
|
| 2263 |
/* Validate the parameters */ |
| 2012 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo, sizeof( *pRambo ) ) ) ; |
2264 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo, sizeof( *pRambo ) ) ) ; |
| 2013 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
2265 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
| 2014 |
OSS_ASSERT( !ossIsBadWritePtr( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
2266 |
OSS_ASSERT( !ossIsBadWritePtr( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
| 2015 |
if ( NULL == pRambo ) |
2267 |
|
|
|
2268 |
#ifdef OSS_DEBUG1 |
| 2269 |
fprintf(dbgfd,"ossRamboFlushToFD: After asserts.\n" ) ; |
| 2270 |
fflush(dbgfd); |
| 2271 |
#endif |
| 2272 |
|
| 2273 |
if ( ( NULL == pRambo ) || ( NULL == pRambo->pRamboCB ) ) |
| 2016 |
{ |
2274 |
{ |
| 2017 |
goto errorInvalid ; |
2275 |
goto errorInvalid ; |
| 2018 |
} |
2276 |
} |
| 2019 |
|
2277 |
|
| 2020 |
OSS_ASSERT( ossRamboIsInitialized( pRambo->pRamboCB ) ) ; |
2278 |
pRamboCB = pRambo->pRamboCB ; |
| 2021 |
if ( !ossRamboIsInitialized( pRambo->pRamboCB ) ) |
2279 |
|
|
|
2280 |
OSS_ASSERT( ossRamboIsInitialized( pRamboCB ) ) ; |
| 2281 |
|
| 2282 |
if ( !ossRamboIsInitialized( pRamboCB ) ) |
| 2022 |
{ |
2283 |
{ |
| 2023 |
goto errorNotInitialized ; |
2284 |
goto errorNotInitialized ; |
| 2024 |
} |
2285 |
} |
| 2025 |
|
2286 |
|
| 2026 |
/* Check if the auto flusher is started */ |
2287 |
/* If the auto flusher is on, don't allow manual flushing */ |
| 2027 |
if ( !ossRamboIsAutoFlusherStarted( pRambo->pRamboCB ) ) |
2288 |
if ( ossRamboIsAutoFlusherStarted( pRamboCB ) ) |
| 2028 |
{ |
2289 |
{ |
| 2029 |
osserr = OSS_OK ; |
2290 |
osserr = OSS_ERR_RAMBO_AUTOFLUSHER_ENABLED ; |
| 2030 |
goto exit ; |
2291 |
goto exit ; |
| 2031 |
} |
2292 |
} |
| 2032 |
|
2293 |
|
| 2033 |
/* Notify the auto-flusher to stop */ |
2294 |
/* Get java method ID */ |
| 2034 |
pRambo->pRamboCB->l2.h2.status |= OSS_RAMBO_AUTOF_STOP_NOW ; |
|
|
| 2035 |
|
| 2036 |
#ifdef OSS_DEBUG |
2295 |
#ifdef OSS_DEBUG |
| 2037 |
printf("After setting the stop flag\n"); |
2296 |
printf("ossRamboFlushToStream: Before trying to get java method ID\n"); |
| 2038 |
#endif |
2297 |
#endif |
|
|
2298 |
jcls = ENV(jenv)->GetObjectClass(ENVPARM(jenv) jOutStrObj); |
| 2299 |
if(jcls != NULL) { |
| 2300 |
jmethod = ENV(jenv)->GetMethodID(ENVPARM(jenv) jcls, "write", "([BII)V"); |
| 2301 |
} |
| 2039 |
|
2302 |
|
| 2040 |
/* 216026 begin */ |
2303 |
if (jmethod == NULL) { |
| 2041 |
/* Wait until there are no more writers to the shared memory */ |
2304 |
osserr = OSS_ERR_RAMBO_IO_ERROR ; |
| 2042 |
|
2305 |
goto exit ; |
| 2043 |
while ( ossAtomicPeek(&pRambo->writers) > 0 ) { |
2306 |
} |
| 2044 |
#ifdef OSS_DEBUG |
2307 |
#ifdef OSS_DEBUG |
| 2045 |
printf("ossRamboStopFlusher: waiting because there are still %d writers\n", ossAtomicPeek(&pRambo->writers)); |
2308 |
printf("ossRamboFlushToStream: After getting java method ID\n"); |
| 2046 |
#endif |
2309 |
#endif |
| 2047 |
ossYield() ; |
|
|
| 2048 |
} |
| 2049 |
/* 216026 end */ |
| 2050 |
|
| 2051 |
/* 186134 - Add a call to detach from the shared memory so it can be destroyed |
| 2052 |
by the server */ |
| 2053 |
ossIPCMemDetach( pRambo->ipcMemHandle ) ; |
| 2054 |
|
2310 |
|
| 2055 |
OSS_ASSERT( OSS_OK == osserr ) ; |
|
|
| 2056 |
|
2311 |
|
| 2057 |
exit: |
2312 |
/* If already flushing, wait. */ |
| 2058 |
return osserr ; |
2313 |
/* TODO */ |
|
|
2314 |
/* |
| 2315 |
* There is a timing hole here. The flushing status may have been |
| 2316 |
* cleared and then set by another process before we get to set it. |
| 2317 |
*/ |
| 2318 |
while ( ossRamboIsFlushing( pRamboCB ) ) |
| 2319 |
{ |
| 2320 |
ossYield() ; |
| 2321 |
} |
| 2322 |
// pRamboCB->l2.h2.status |= OSS_RAMBO_FLUSHING ; |
| 2323 |
/* Pretend we are auto flushing instead of manual flushing so we don't hold up writing |
| 2324 |
to the buffer */ |
| 2325 |
pRamboCB->l2.h2.status |= OSS_RAMBO_AUTOF_STARTED ; |
| 2326 |
/* TODO */ |
| 2059 |
|
2327 |
|
| 2060 |
errorInvalid : |
|
|
| 2061 |
osserr = OSS_ERR_INVALID ; |
| 2062 |
goto exit ; |
| 2063 |
|
2328 |
|
| 2064 |
errorNotInitialized : |
2329 |
/* Examine the first chunk control block to see if we wrapped. */ |
| 2065 |
osserr = OSS_ERR_RAMBO_NOT_INIT ; |
2330 |
if ( ossAtomicPeek(&(pFirstChunkCB(pRamboCB)->fillCount)) > OSS_RAMBO_SLOTS_PER_CHUNK ) |
| 2066 |
goto exit ; |
2331 |
{ |
| 2067 |
} |
2332 |
pOnDiskCB(pRamboCB)->bufferWrapped = 1 ; /* True */ |
|
|
2333 |
} |
| 2068 |
|
2334 |
|
|
|
2335 |
/* Write RAMBO storage header information */ |
| 2336 |
OSS_ASSERT( ossRamboIsValidOnDiskCB( pOnDiskCB(pRamboCB) ) ) ; |
| 2069 |
|
2337 |
|
| 2070 |
/******************************************************************************* |
|
|
| 2071 |
|
2338 |
|
| 2072 |
Name |
2339 |
/* TODO */ |
| 2073 |
ossRamboStopFlusher |
2340 |
/* Start flushing */ |
|
|
2341 |
#ifdef OSS_DEBUG1 |
| 2342 |
// printf("ossRamboFlushFD: Started flushing.\n" ) ; |
| 2343 |
fprintf(dbgfd,"ossRamboFlushToFD: Started flushing.\n" ) ; |
| 2344 |
fflush(dbgfd); |
| 2345 |
#endif |
| 2074 |
|
2346 |
|
| 2075 |
Function |
2347 |
/* Try to find the least recently filled chunk if the buffer has |
| 2076 |
Tells flusher routine to stop flushing buffer. |
2348 |
been wrapped and start flushing at it. */ |
|
|
2349 |
|
| 2350 |
if ( ossAtomicPeek(&(pFirstChunkCB(pRamboCB)->fillCount)) > OSS_RAMBO_SLOTS_PER_CHUNK ) |
| 2351 |
{ |
| 2352 |
slotIndex = ossAtomicPeek(&pRamboCB->l1.h1.nextSlotIndex) % pRamboCB->l2.h2.maxSlots ; |
| 2353 |
chunkIndex = slotIndex / OSS_RAMBO_SLOTS_PER_CHUNK + 1; |
| 2354 |
} |
| 2355 |
else |
| 2356 |
chunkIndex = 0; |
| 2357 |
|
| 2358 |
#ifdef OSS_DEBUG |
| 2359 |
printf("ossRamboFlushToStream: About to go into waiting loop\n"); |
| 2360 |
#endif |
| 2361 |
|
| 2362 |
for ( ; ; ) |
| 2363 |
{ |
| 2364 |
/* Get pointer to the chunk control block */ |
| 2365 |
pChunkCB = pFirstChunkCB(pRamboCB) + chunkIndex ; |
| 2366 |
OSS_ASSERT( ossRamboIsValidChunkCB( pChunkCB, pRamboCB ) ) ; |
| 2367 |
|
| 2368 |
loopIndex = 0; |
| 2369 |
timedOut = 0; |
| 2370 |
attachCheckIndex = 0; /* 94473 */ |
| 2371 |
/* Wait until the chunk is full or we have been asked to stop or we have waited too long */ |
| 2372 |
for ( ; ; ) |
| 2373 |
{ |
| 2374 |
/* If flushing is to be stopped then set the stopping flag and quit the loop */ |
| 2375 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
| 2376 |
{ |
| 2377 |
stopping = true ; |
| 2378 |
break ; |
| 2379 |
} |
| 2380 |
|
| 2381 |
/* If the chunk is full then quit the loop so it can be flushed. */ |
| 2382 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
| 2383 |
{ |
| 2384 |
#ifdef OSS_DEBUG1 |
| 2385 |
// printf("ossRamboFlushFD: Started flushing.\n" ) ; |
| 2386 |
fprintf(dbgfd,"ossRamboFlushToFD: Data found in buffer.\n" ) ; |
| 2387 |
fflush(dbgfd); |
| 2388 |
#endif |
| 2389 |
break ; |
| 2390 |
} |
| 2391 |
/* Else If we have looped too long */ |
| 2392 |
else if (loopIndex == 100) { |
| 2393 |
/* If there is any data in the chunk Then quit the loop to flush it */ |
| 2394 |
if ((ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) - ossAtomicPeek(&pChunkCB->flushedCount) > 0 || |
| 2395 |
(ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0)) { |
| 2396 |
timedOut = 1; |
| 2397 |
#ifdef OSS_DEBUG1 |
| 2398 |
fprintf(dbgfd,"ossRamboFlushToFD: We've waited too long, flush data found in chunk.\n" ) ; |
| 2399 |
fflush(dbgfd); |
| 2400 |
#endif |
| 2401 |
break ; |
| 2402 |
} |
| 2403 |
/* 205955 begin */ |
| 2404 |
/* Else get the attach count to see if anyone else is attached */ |
| 2405 |
else { |
| 2406 |
int attcnt = 0; |
| 2407 |
OSSErr osserr = OSS_OK ; |
| 2408 |
osserr = ossIPCMemAttachCount( pRambo->ipcMemHandle, &attcnt ) ; |
| 2409 |
attachCheckIndex++; /* bugzilla 94473 */ |
| 2410 |
/* If nobody is attached to the shared memory except for us then stop flushing |
| 2411 |
* bugzilla 94473 - only stop flushing if we have gone through the wait loop four |
| 2412 |
* times to provide more time for the agent to attach to it (4 seconds instead of 1). */ |
| 2413 |
if ( osserr == OSS_OK && attcnt == 1 && attachCheckIndex >= 4) |
| 2414 |
{ |
| 2415 |
#ifdef OSS_DEBUG |
| 2416 |
printf("ossRamboFlushToStream: Nobody is attached to write to the buffer yet.\n"); |
| 2417 |
#endif |
| 2418 |
/* set the user count to 1 so the shared memory gets deleted when it is destroyed */ |
| 2419 |
/* |
| 2420 |
ossIPCMemSetUserCount( pRambo->ipcMemHandle, attcnt ) ; |
| 2421 |
stopping = true ; |
| 2422 |
break ; |
| 2423 |
*/ |
| 2424 |
} |
| 2425 |
/* Else continue checking for writes to shared memory */ |
| 2426 |
loopIndex = 0; |
| 2427 |
} |
| 2428 |
/* 205955 end */ |
| 2429 |
} |
| 2430 |
|
| 2431 |
ossSleep( 10 ) ; |
| 2432 |
loopIndex++; |
| 2433 |
} |
| 2434 |
|
| 2435 |
/* If we should stop, break out of the flushing loop */ |
| 2436 |
if ( stopping ) |
| 2437 |
{ |
| 2438 |
#ifdef OSS_DEBUG1 |
| 2439 |
// printf("ossRamboFlushFD: Started flushing.\n" ) ; |
| 2440 |
fprintf(dbgfd,"ossRamboFlushToFD: We are stopping flushing so flush unfilled chunks.\n" ) ; |
| 2441 |
fflush(dbgfd); |
| 2442 |
#endif |
| 2443 |
flushUnfilledChunks = true ; |
| 2444 |
break ; |
| 2445 |
} |
| 2446 |
|
| 2447 |
/* If the chunk is full, flush it to disk */ |
| 2448 |
#ifdef ENW |
| 2449 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 && (ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK) == 0 ) |
| 2450 |
#else |
| 2451 |
if ( pChunkCB->fillCount >= OSS_RAMBO_SLOTS_PER_CHUNK ) |
| 2452 |
#endif |
| 2453 |
{ |
| 2454 |
unsigned int i; |
| 2455 |
if (ossAtomicPeek(&pChunkCB->flushedCount) == 0) { |
| 2456 |
OSS_ASSERT( (Uint32)-1 == ossAtomicPeek(&pChunkCB->slotMap[0]) ) ; |
| 2457 |
} |
| 2458 |
OSS_ASSERT( (Uint32)-1 == ossAtomicPeek(&pChunkCB->slotMap[OSS_RAMBO_SLOT_BITMAP_ARRAY_SIZE-1]) ) ; |
| 2459 |
|
| 2460 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > OSS_RAMBO_SLOTS_PER_CHUNK ) |
| 2461 |
{ |
| 2462 |
wrappedChunk = true ; |
| 2463 |
} |
| 2464 |
/* |
| 2465 |
if (!flushStarted) |
| 2466 |
flushStarted = true; |
| 2467 |
*/ |
| 2468 |
/* Get a pointer to the chunk of memory */ |
| 2469 |
pChunk = (void *)( (uintptr_t)pMemBuffer(pRamboCB) |
| 2470 |
+ ( chunkIndex * OSS_RAMBO_CHUNK_SIZE ) ) ; |
| 2471 |
if (ossAtomicPeek(&pChunkCB->flushedCount) > 0) { |
| 2472 |
pChunk = (void *)((char *)pChunk + ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE); |
| 2473 |
flushDataLen = OSS_RAMBO_CHUNK_SIZE - ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE; |
| 2474 |
} |
| 2475 |
else |
| 2476 |
flushDataLen = OSS_RAMBO_CHUNK_SIZE; |
| 2477 |
|
| 2478 |
OSS_ASSERT( pChunk != NULL ) ; |
| 2479 |
OSS_ASSERT( (uintptr_t)pChunk >= (uintptr_t)pMemBuffer(pRamboCB) ) ; |
| 2480 |
|
| 2481 |
/* Pass the chunk to the data processor */ |
| 2482 |
|
| 2483 |
err = dataToStrProcessor( pChunk, flushDataLen, continueDataSize, jenv, jOutStrObj, jmethod ) ; |
| 2484 |
if ( 0 > err ) { |
| 2485 |
// perror("ossRamboFlushFD failed writing the storage header info"); |
| 2486 |
#ifdef OSS_DEBUG1 |
| 2487 |
fprintf(dbgfd,"ossRamboFlushToFD: failed writing the chunk %d of shared memory. wsa error=%d\n", chunkIndex, WSAGetLastError() ) ; |
| 2488 |
fflush(dbgfd); |
| 2489 |
#endif |
| 2490 |
goto errorProcessingFailed ; |
| 2491 |
} |
| 2492 |
else |
| 2493 |
continueDataSize = err; |
| 2494 |
|
| 2495 |
if ( wrappedChunk ) |
| 2496 |
{ |
| 2497 |
/* |
| 2498 |
* The flusher has been lapped. The chunk has been filled |
| 2499 |
* more than once. Log this so we know it has happened but |
| 2500 |
* don't exit. |
| 2501 |
*/ |
| 2502 |
#ifdef OSS_DEBUG1 |
| 2503 |
// printf("RC = ERR_CHUNK_OVERWRITTEN: Flushed chunk %lu fillCount %lu\n", |
| 2504 |
// chunkIndex, pChunkCB->fillCount ) ; |
| 2505 |
fprintf(dbgfd,"RC = ERR_CHUNK_OVERWRITTEN: Flushed chunk %lu fillCount %lu\n", |
| 2506 |
chunkIndex, ossAtomicPeek(&pChunkCB->fillCount) ) ; |
| 2507 |
fflush(dbgfd); |
| 2508 |
#endif |
| 2509 |
wrappedChunk = false ; |
| 2510 |
} |
| 2511 |
#ifdef OSS_DEBUG1 |
| 2512 |
else { |
| 2513 |
// printf("Flushed chunk %lu fillCount %lu\n", |
| 2514 |
// chunkIndex, pChunkCB->fillCount ) ; |
| 2515 |
fprintf(dbgfd,"Flushed chunk %lu fillCount %lu\n", |
| 2516 |
chunkIndex, ossAtomicPeek(&pChunkCB->fillCount) ) ; |
| 2517 |
fflush(dbgfd); |
| 2518 |
} |
| 2519 |
#endif |
| 2520 |
|
| 2521 |
//pause() ; |
| 2522 |
/* Reset the chunk control block */ |
| 2523 |
/* 186134 begin |
| 2524 |
memset( (void *)&pChunkCB->slotMap[0], 0x00, sizeof( pChunkCB->slotMap ) ) ; |
| 2525 |
*/ |
| 2526 |
for(i=0; i < OSS_RAMBO_SLOT_BITMAP_ARRAY_SIZE; i++) { |
| 2527 |
ossAtomicPoke(&(pChunkCB->slotMap[i]), 0); |
| 2528 |
} |
| 2529 |
/* 186134 end */ |
| 2530 |
ossAtomicPoke(&pChunkCB->fillCount, 0) ; |
| 2531 |
ossAtomicPoke(&pChunkCB->flushedCount, 0) ; |
| 2532 |
} |
| 2533 |
else if ( timedOut ) { |
| 2534 |
/* Get a pointer to the chunk of memory */ |
| 2535 |
pChunk = (void *)( (uintptr_t)pMemBuffer(pRamboCB) |
| 2536 |
+ ( chunkIndex * OSS_RAMBO_CHUNK_SIZE ) ) ; |
| 2537 |
|
| 2538 |
slotIndex = ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK; |
| 2539 |
if (ossAtomicPeek(&pChunkCB->flushedCount) > 0) { |
| 2540 |
pChunk = (void *)((char *)pChunk + ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE); |
| 2541 |
flushDataLen = (slotIndex - ossAtomicPeek(&pChunkCB->flushedCount)) * OSS_RAMBO_SLOT_SIZE; |
| 2542 |
} |
| 2543 |
else |
| 2544 |
flushDataLen = slotIndex * OSS_RAMBO_SLOT_SIZE; |
| 2545 |
|
| 2546 |
OSS_ASSERT( pChunk != NULL ) ; |
| 2547 |
OSS_ASSERT( (uintptr_t)pChunk >= (uintptr_t)pMemBuffer(pRamboCB) ) ; |
| 2548 |
|
| 2549 |
/* Pass the chunk to the data processor */ |
| 2550 |
|
| 2551 |
err = dataToStrProcessor( pChunk, flushDataLen, continueDataSize, jenv, jOutStrObj, jmethod ) ; |
| 2552 |
if ( 0 > err ) { |
| 2553 |
// perror("ossRamboFlushFD failed writing the storage header info"); |
| 2554 |
#ifdef OSS_DEBUG1 |
| 2555 |
fprintf(dbgfd,"ossRamboFlushToFD: failed writing the chunk %d of shared memory. wsa error=%d\n", chunkIndex, WSAGetLastError() ) ; |
| 2556 |
fflush(dbgfd); |
| 2557 |
#endif |
| 2558 |
goto errorProcessingFailed ; |
| 2559 |
} |
| 2560 |
else |
| 2561 |
continueDataSize = err; |
| 2562 |
|
| 2563 |
/* Reset the chunk control block */ |
| 2564 |
ossRamboMarkSlotsEmpty(pChunkCB, ossAtomicPeek(&pChunkCB->flushedCount), slotIndex-ossAtomicPeek(&pChunkCB->flushedCount)); |
| 2565 |
} |
| 2566 |
#ifdef OSS_DEBUG1 |
| 2567 |
else if (ossAtomicPeek(&pChunkCB->fillCount) > 0) { |
| 2568 |
// printf("Unfilled chunk %lu fillCount %lu\n", |
| 2569 |
// chunkIndex, pChunkCB->fillCount ) ; |
| 2570 |
fprintf(dbgfd,"Unfilled chunk %lu fillCount %lu\n", |
| 2571 |
chunkIndex, ossAtomicPeek(&pChunkCB->fillCount) ) ; |
| 2572 |
fflush(dbgfd); |
| 2573 |
} |
| 2574 |
#endif |
| 2575 |
/* |
| 2576 |
else if (flushStarted) |
| 2577 |
printf("Empty chunk %lu\n", chunkIndex) ; |
| 2578 |
*/ |
| 2579 |
/* Move to next chunk if we didn't time out */ |
| 2580 |
if ( !timedOut ) |
| 2581 |
chunkIndex = (chunkIndex + 1) % pRamboCB->l2.h2.maxChunks ; |
| 2582 |
|
| 2583 |
/* Check if we have been asked to stop */ |
| 2584 |
if ( pRamboCB->l2.h2.status & OSS_RAMBO_AUTOF_STOP_NOW ) |
| 2585 |
{ |
| 2586 |
flushUnfilledChunks = true ; |
| 2587 |
break ; |
| 2588 |
} |
| 2589 |
#ifdef OSS_DEBUG |
| 2590 |
printf("ossRamboFlushToStream: About to do another iteration of the waiting loop\n"); |
| 2591 |
#endif |
| 2592 |
} |
| 2593 |
|
| 2594 |
/* Flush unfilled chunks */ |
| 2595 |
if ( flushUnfilledChunks ) |
| 2596 |
{ |
| 2597 |
int numUsedSlots; |
| 2598 |
|
| 2599 |
/* Start flushing at the current chunk index so the flushing continues in the correct order */ |
| 2600 |
unsigned int i; |
| 2601 |
#ifdef OSS_DEBUG |
| 2602 |
printf("ossRamboFlushToStream: Flushing unfilled chunks\n"); |
| 2603 |
#endif |
| 2604 |
for ( i = 0; i < pRamboCB->l2.h2.maxChunks; i++ ) |
| 2605 |
{ |
| 2606 |
/* Get pointer to the chunk control block */ |
| 2607 |
pChunkCB = pFirstChunkCB(pRamboCB) + chunkIndex ; |
| 2608 |
OSS_ASSERT( ossRamboIsValidChunkCB( pChunkCB, pRamboCB ) ) ; |
| 2609 |
|
| 2610 |
/* If the chunk contains data, flush it to disk */ |
| 2611 |
if ( ossAtomicPeek(&pChunkCB->fillCount) > 0 ) |
| 2612 |
{ |
| 2613 |
unsigned int j; |
| 2614 |
numUsedSlots = ossAtomicPeek(&pChunkCB->fillCount) % OSS_RAMBO_SLOTS_PER_CHUNK; |
| 2615 |
|
| 2616 |
/* Check if chunk is full */ |
| 2617 |
if (numUsedSlots == 0) { |
| 2618 |
numUsedSlots = OSS_RAMBO_SLOTS_PER_CHUNK; |
| 2619 |
} |
| 2620 |
|
| 2621 |
/* subtract the slots that have already been flushed */ |
| 2622 |
numUsedSlots -= ossAtomicPeek(&pChunkCB->flushedCount); |
| 2623 |
|
| 2624 |
/* if there are no used slots in this chunk */ |
| 2625 |
if (numUsedSlots == 0) { |
| 2626 |
/* Move to next chunk */ |
| 2627 |
chunkIndex = (chunkIndex + 1) % pRamboCB->l2.h2.maxChunks ; |
| 2628 |
continue; |
| 2629 |
} |
| 2630 |
else if (numUsedSlots < 0) { |
| 2631 |
/* Note: flushedCount should never be bigger than fillCount but we'll |
| 2632 |
catch this case and stop flushing because we are in an inconsistent |
| 2633 |
state |
| 2634 |
*/ |
| 2635 |
goto errorProcessingFailed ; |
| 2636 |
} |
| 2637 |
|
| 2638 |
/* Get a pointer to the chunk of memory */ |
| 2639 |
pChunk = (void *)( (uintptr_t)pMemBuffer(pRamboCB) |
| 2640 |
+ ( chunkIndex * OSS_RAMBO_CHUNK_SIZE ) ) ; |
| 2641 |
if (ossAtomicPeek(&pChunkCB->flushedCount) > 0) |
| 2642 |
pChunk = (void *)((char *)pChunk + ossAtomicPeek(&pChunkCB->flushedCount) * OSS_RAMBO_SLOT_SIZE); |
| 2643 |
OSS_ASSERT( pChunk != NULL ) ; |
| 2644 |
OSS_ASSERT( (uintptr_t)pChunk >= (uintptr_t)pMemBuffer(pRamboCB) ) ; |
| 2645 |
|
| 2646 |
/* Pass the chunk to the data processor, but only process the filled slots */ |
| 2647 |
|
| 2648 |
err = dataToStrProcessor( pChunk, numUsedSlots * OSS_RAMBO_SLOT_SIZE, continueDataSize, jenv, jOutStrObj, jmethod ) ; |
| 2649 |
if ( 0 > err ) { |
| 2650 |
#ifdef OSS_DEBUG1 |
| 2651 |
fprintf(dbgfd,"ossRamboFlushToFD: failed writing the chunk %d of shared memory. error=%d\n", chunkIndex, err ) ; |
| 2652 |
fflush(dbgfd); |
| 2653 |
#endif |
| 2654 |
goto errorProcessingFailed ; |
| 2655 |
} |
| 2656 |
else |
| 2657 |
continueDataSize = err; |
| 2658 |
|
| 2659 |
#ifdef OSS_DEBUG1 |
| 2660 |
// printf("Flushed unfilled chunk index %lu has fillCount %lu\n", |
| 2661 |
// chunkIndex, pChunkCB->fillCount ) ; |
| 2662 |
fprintf(dbgfd,"Flushed unfilled chunk index %lu has fillCount %lu\n", |
| 2663 |
chunkIndex, ossAtomicPeek(&pChunkCB->fillCount) ) ; |
| 2664 |
fflush(dbgfd); |
| 2665 |
#endif |
| 2666 |
|
| 2667 |
/* Reset the chunk control block */ |
| 2668 |
/* 186134 begin |
| 2669 |
memset( (void *)&pChunkCB->slotMap[0], 0x00, sizeof( pChunkCB->slotMap ) ) ; |
| 2670 |
*/ |
| 2671 |
for(j=0; j < OSS_RAMBO_SLOT_BITMAP_ARRAY_SIZE; j++) { |
| 2672 |
ossAtomicPoke(&(pChunkCB->slotMap[j]), 0); |
| 2673 |
} |
| 2674 |
/* 186134 end */ |
| 2675 |
ossAtomicPoke(&pChunkCB->fillCount, 0) ; |
| 2676 |
ossAtomicPoke(&pChunkCB->flushedCount, 0) ; |
| 2677 |
} |
| 2678 |
#ifdef OSS_DEBUG1 |
| 2679 |
else { |
| 2680 |
// printf("empty chunk index %lu\n", chunkIndex ) ; |
| 2681 |
fprintf(dbgfd,"empty chunk index %lu\n", chunkIndex ) ; |
| 2682 |
fflush(dbgfd); |
| 2683 |
} |
| 2684 |
#endif |
| 2685 |
/* Move to next chunk */ |
| 2686 |
chunkIndex = (chunkIndex + 1) % pRamboCB->l2.h2.maxChunks ; |
| 2687 |
|
| 2688 |
} |
| 2689 |
/* Clear the auto-flusher status flags */ |
| 2690 |
pRamboCB->l2.h2.status &= ~OSS_RAMBO_AUTOF_STOP_NOW ; |
| 2691 |
pRamboCB->l2.h2.status &= ~OSS_RAMBO_AUTOF_STARTED ; |
| 2692 |
} |
| 2693 |
#ifdef OSS_DEBUG |
| 2694 |
printf("ossRamboFlushToStream: Returning with osserr = %d\n", osserr); |
| 2695 |
#endif |
| 2696 |
|
| 2697 |
// pRamboCB->l2.h2.status &= ~OSS_RAMBO_FLUSHING ; |
| 2698 |
|
| 2699 |
OSS_ASSERT( OSS_OK == osserr ) ; |
| 2700 |
|
| 2701 |
exit : |
| 2702 |
#ifdef OSS_DEBUG1 |
| 2703 |
fclose(dbgfd); |
| 2704 |
#endif |
| 2705 |
OSS_ASSERT( !( pRamboCB->l2.h2.status & OSS_RAMBO_FLUSHING ) ) ; |
| 2706 |
return osserr ; |
| 2707 |
|
| 2708 |
errorInvalid : |
| 2709 |
osserr = OSS_ERR_INVALID ; |
| 2710 |
goto exit ; |
| 2711 |
|
| 2712 |
errorNotInitialized : |
| 2713 |
osserr = OSS_ERR_RAMBO_NOT_INIT ; |
| 2714 |
goto exit ; |
| 2715 |
|
| 2716 |
errorProcessingFailed : |
| 2717 |
// pRamboCB->l2.h2.status &= ~OSS_RAMBO_FLUSHING ; |
| 2718 |
pRamboCB->l2.h2.status &= ~OSS_RAMBO_AUTOF_STOP_NOW ; |
| 2719 |
pRamboCB->l2.h2.status &= ~OSS_RAMBO_AUTOF_STARTED ; |
| 2720 |
osserr = OSS_ERR_RAMBO_IO_ERROR ; |
| 2721 |
goto exit ; |
| 2722 |
} |
| 2723 |
|
| 2724 |
|
| 2725 |
/******************************************************************************* |
| 2726 |
|
| 2727 |
Name |
| 2728 |
ossRamboStopFlusherAndDetach |
| 2729 |
|
| 2730 |
Function |
| 2731 |
Tells flusher routine to stop flushing buffer and detach from buffer. |
| 2732 |
|
| 2733 |
Inputs |
| 2734 |
1. pRambo |
| 2735 |
Address of a RAMBO handle. |
| 2736 |
|
| 2737 |
Normal Return |
| 2738 |
OSS_OK |
| 2739 |
|
| 2740 |
Error Return |
| 2741 |
OSS_ERR_INVALID |
| 2742 |
OSS_ERR_RAMBO_NOT_INIT |
| 2743 |
|
| 2744 |
******************************************************************************/ |
| 2745 |
OSS_EXTERNC OSSErr OSS_API ossRamboStopFlusherAndDetach( OSSRambo * pRambo ) |
| 2746 |
{ |
| 2747 |
OSSErr osserr = OSS_OK ; |
| 2748 |
|
| 2749 |
/* Validate parameters */ |
| 2750 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo, sizeof( *pRambo ) ) ) ; |
| 2751 |
OSS_ASSERT( !ossIsBadReadPtr ( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
| 2752 |
OSS_ASSERT( !ossIsBadWritePtr( pRambo->pRamboCB, sizeof( *pRambo->pRamboCB ) ) ) ; |
| 2753 |
if ( NULL == pRambo ) |
| 2754 |
{ |
| 2755 |
goto errorInvalid ; |
| 2756 |
} |
| 2757 |
|
| 2758 |
OSS_ASSERT( ossRamboIsInitialized( pRambo->pRamboCB ) ) ; |
| 2759 |
if ( !ossRamboIsInitialized( pRambo->pRamboCB ) ) |
| 2760 |
{ |
| 2761 |
goto errorNotInitialized ; |
| 2762 |
} |
| 2763 |
|
| 2764 |
/* Check if the auto flusher is started */ |
| 2765 |
if ( !ossRamboIsAutoFlusherStarted( pRambo->pRamboCB ) ) |
| 2766 |
{ |
| 2767 |
osserr = OSS_OK ; |
| 2768 |
goto exit ; |
| 2769 |
} |
| 2770 |
|
| 2771 |
/* Notify the auto-flusher to stop */ |
| 2772 |
pRambo->pRamboCB->l2.h2.status |= OSS_RAMBO_AUTOF_STOP_NOW ; |
| 2773 |
|
| 2774 |
#ifdef OSS_DEBUG |
| 2775 |
printf("After setting the stop flag\n"); |
| 2776 |
#endif |
| 2777 |
|
| 2778 |
/* 216026 begin */ |
| 2779 |
/* Wait until there are no more writers to the shared memory */ |
| 2780 |
|
| 2781 |
while ( ossAtomicPeek(&pRambo->writers) > 0 ) { |
| 2782 |
#ifdef OSS_DEBUG |
| 2783 |
printf("ossRamboStopFlusher: waiting because there are still %d writers\n", ossAtomicPeek(&pRambo->writers)); |
| 2784 |
#endif |
| 2785 |
ossYield() ; |
| 2786 |
} |
| 2787 |
/* 216026 end */ |
| 2788 |
|
| 2789 |
/* 186134 - Add a call to detach from the shared memory so it can be destroyed |
| 2790 |
by the server */ |
| 2791 |
ossIPCMemDetach( pRambo->ipcMemHandle ) ; |
| 2792 |
|
| 2793 |
OSS_ASSERT( OSS_OK == osserr ) ; |
| 2794 |
|
| 2795 |
exit: |
| 2796 |
return osserr ; |
| 2797 |
|
| 2798 |
errorInvalid : |
| 2799 |
osserr = OSS_ERR_INVALID ; |
| 2800 |
goto exit ; |
| 2801 |
|
| 2802 |
errorNotInitialized : |
| 2803 |
osserr = OSS_ERR_RAMBO_NOT_INIT ; |
| 2804 |
goto exit ; |
| 2805 |
} |
| 2806 |
|
| 2807 |
|
| 2808 |
/******************************************************************************* |
| 2809 |
|
| 2810 |
Name |
| 2811 |
ossRamboStopFlusher |
| 2812 |
|
| 2813 |
Function |
| 2814 |
Tells flusher routine to stop flushing buffer. |
| 2077 |
|
2815 |
|
| 2078 |
Inputs |
2816 |
Inputs |
| 2079 |
1. pRambo |
2817 |
1. pRambo |
|
Lines 2263-2266
Link Here
|
| 2263 |
return (ra_shm_err_t)shmerr; |
3001 |
return (ra_shm_err_t)shmerr; |
| 2264 |
} |
3002 |
} |
| 2265 |
/* 175248 end */ |
3003 |
/* 175248 end */ |
|
|
3004 |
|
| 3005 |
OSSErr ra_stopFlusher(OSSRambo *handle) { |
| 3006 |
OSSErr shmerr; |
| 3007 |
|
| 3008 |
if (handle != NULL) |
| 3009 |
shmerr = ossRamboStopFlusher(handle); |
| 3010 |
else |
| 3011 |
shmerr = OSS_OK; |
| 3012 |
|
| 3013 |
return shmerr; |
| 3014 |
} |
| 2266 |
|
3015 |
|