83 int numErrs, numFixes, numLeft;
87 if (!fobP)
return true;
91 numLeft = fobP->
numOffs(&numErrs, &numFixes);
99 snprintf(ebuff,
sizeof(ebuff),
"%d uncorrected checksum errors",numLeft);
113int XrdXrootdProtocol::do_PgRead()
133 "pgread does not refer to an open file");
140 pathID =
static_cast<int>(rargs->
pathid);
173 if (!pathID) pP =
this;
174 else {
if (!(pP =
VerifyStream(rc, pathID,
false)))
return rc;
188 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgRIO, pathID);
203int XrdXrootdProtocol::do_PgRIO()
208 static int iovmax = -1;
211 iovmax = sysconf(_SC_IOV_MAX);
220 static const int maxIOVZ = iovmax;
221 static const int maxCSSZ = iovmax/2 - 1;
222 static const int maxPGRD = maxCSSZ*pgPageSize;
223 static const int infoLen =
sizeof(
kXR_int64);
225 struct pgReadResponse
231 uint64_t pgrOpts = 0;
232 int dlen, fLen, lLen, rc, xframt, Quantum;
233 uint32_t csVec[maxCSSZ];
234 struct iovec
iov[maxIOVZ];
244 memset(pgrResp.rsp.bdy.reserved, 0,
sizeof(pgrResp.rsp.bdy.reserved));
249 int pgOff, rPages, rLen =
IO.
IOLen;
255 if (rPages < Quantum) Quantum = rPages;
261 {
if ((rc = getBuff(1, Quantum)) <= 0)
return rc;}
263 if (
argp->
bsize > maxPGRD) Quantum = maxPGRD;
269 int items = Quantum / pgPageSize;
275 uint32_t *csVP = csVec;
277 int i = 1, n = items * 2;
279 {
iov[i ].iov_base = csVP++;
280 iov[i++].iov_len =
sizeof(uint32_t);
281 iov[i ].iov_base = buff;
282 iov[i++].iov_len = pgPageSize;
290 if ((pgOff =
IO.Offset & pgPageMask))
291 {rLen = pgPageSize - pgOff;
293 iov[2].iov_base = buff;
294 iov[2].iov_len = rLen;
295 rLen += Quantum - pgPageSize;
306 long long ioOffset =
IO.Offset;
307 do {
if ((xframt = sfsP->
pgRead(
IO.Offset, buff, rLen, csVec, pgrOpts)) <= 0)
311 iov[2].iov_len = fLen;
312 if (items > 1)
iov[items<<1].iov_len = lLen;
314 if (xframt < rLen || xframt ==
IO.
IOLen)
322 for (
int i = 0; i < items; i++) csVec[i] = htonl(csVec[i]);
324 pgrResp.ofs = htonll(ioOffset);
329 dlen = xframt + (items *
sizeof(uint32_t));
330 if ((rc =
Response.
Send(pgrResp.rsp, infoLen,
iov, items*2+1, dlen)) < 0)
335 iov[2].iov_len = pgPageSize;
340 ioOffset =
IO.Offset;
345 if (xframt < 0)
return fsError(xframt, 0, sfsP->
error, 0, 0);
351 pgrResp.rsp.bdy.dlen = 0;
352 pgrResp.ofs = htonll(
IO.Offset);
362int XrdXrootdProtocol::do_PgWrite()
392 return do_WriteNone(pathID);
401 TRACEP(FSIO, pathID<<
" pgwrite "
403 <<
IO.IOLen<<
'@'<<
IO.Offset<<
" fn=" <<
IO.
File->FileKey);
419 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgWIO, pathID);
423 return do_PgWIO(
true);
435bool XrdXrootdProtocol::do_PgWAIO(
int &rc)
469int XrdXrootdProtocol::do_PgWIO() {
return do_PgWIO(
true);}
471int XrdXrootdProtocol::do_PgWIO(
bool isFresh)
478 int n, rc, Quantum, iovLen, iovNum, csNum;
494 && !isRetry && do_PgWAIO(rc))
return rc;
495 if (isRetry && !do_PgWIORetry(rc))
return rc;
496 if (!do_PgWIOSetup(
pgwCtl))
return -1;
506 if ((rc =
getData(
this,
"pgwrite", ioV, iovNum)))
return rc;
516 for (
int i = 0; i < csNum; i++) csVec[i] = ntohl(csVec[i]);
533 if ((rc = sfsP->
pgWrite(
IO.Offset, buff, Quantum, csVec)) <= 0)
534 {
IO.EInfo[0] = rc;
IO.EInfo[1] = 0;
535 return do_WriteNone();
545 IO.Offset += Quantum;
566bool XrdXrootdProtocol::do_PgWIORetry(
int &rc)
568 static const int csLen =
sizeof(
kXR_unt32);
575 if (
IO.Offset & pgPageMask)
576 {
int n = pgPageSize - (
IO.Offset & pgPageMask);
577 isBad =
IO.
IOLen > (n + csLen);
578 }
else isBad =
IO.
IOLen > pgUnitSize;
584 "pgwrite retry of more than one page not allowed");
593 snprintf(buff,
sizeof(buff),
"retry %d@%lld",
IO.
IOLen-csLen,
IO.Offset);
625 if (!
argp || Quantum < halfBSize || argp->bsize < Quantum
627 {
if (getBuff(0, Quantum) <= 0)
return -1;}
struct ClientPgReadRequest pgread
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
XrdSysTrace XrdXrootdTrace
int setEtext(const char *text)
static bool csVer(dataInfo &dInfo, off_t &bado, int &badc)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static const uint64_t Verify
Options for pgRead() and pgWrite() as noted below.
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize wrlen, uint32_t *csvec, uint64_t opts=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void pgrOps(int rsz, bool isRetry=false)
void pgwOps(int wsz, bool isRetry=false)
void pgUpdt(int wErrs, int wFixd, int wUnc)
XrdXrootdFile * Get(int fnum)
void Add_rd(kXR_unt32 dictid, kXR_int32 rlen, kXR_int64 offset)
void Add_wr(kXR_unt32 dictid, kXR_int32 wlen, kXR_int64 offset)
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
const char * boAdd(XrdXrootdFile *fP, kXR_int64 foffs, int dlen=XrdProto::kXR_pgPageSZ)
char * boInfo(int &boLen)
ServerResponseStatus resp
struct iovec * FrameInfo(int &iovn, int &rdlen)
static const int maxBSize
const char * Setup(XrdBuffer *buffP, kXR_int64 fOffs, int totlen)
ServerResponseBody_pgWrite info
bool hasOffs(kXR_int64 foffs, int dlen)
bool delOffs(kXR_int64 foffs, int dlen)
int numOffs(int *errs=0, int *fixs=0)
static XrdXrootdStats * SI
XrdXrootdProtocol * VerifyStream(int &rc, int pID, bool lok=true)
XrdXrootdProtocol * Stream[maxStreams]
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
XrdXrootdMonitor::User Monitor
XrdXrootdResponse Response
static const int maxStreams
static RAtomic_int srvrAioOps
static const int kXR_pgUnitSZ
static const int kXR_pgPageSZ
static const int kXR_pgRetry