32#include <netinet/in.h>
62 tident(tid), fPath(strdup(path)), ossDF(df),
63 calcP(cP), altcP(0), viaDel(delF), nextOff(0),
75 cksName = cP->
Type(sz);
82 if (cP->
Combinable() && (sz == (
int)
sizeof(uint32_t)))
83 {ProcessRTC = &XrdOfsCksFile::RTC_CB32;
86 ProcessRTC = &XrdOfsCksFile::RTC_NCXX;
87 if ((rc = posix_memalign(&ioBuff, 4096, ioBlen)))
88 {snprintf(eBuff, sizeof(eBuff),
89 "get buffer for real-time %s checksum for", cksName);
90 eLog.Emsg(
"ckscon", rc, eBuff, path);
104 if (ossDF)
delete ossDF;
105 if (calcP) calcP->Recycle();
106 if (altcP) altcP->Recycle();
107 if (fPath) free(fPath);
108 if (ioBuff) free(ioBuff);
137 {snprintf(eBuff,
sizeof(eBuff),
"File not properly closed; "
138 "real-time %s checksum was not set for", cksName);
139 eLog.
Emsg(
"ckscls", eBuff, fPath);
149 {
auto it = segMap.begin();
150 snprintf(eBuff,
sizeof(eBuff),
151 "%lld bytes missing at offset %lld; real-time %s",
152 (
long long)(it->second.segBeg - nextOff),
153 (
long long)nextOff, cksName);
154 eLog.
Emsg(
"ckcls", eBuff,
"checksum was not set for", fPath);
160 memset((
void*)&cksData, 0,
sizeof(cksData));
161 cksData.Set(calcP->Type(csSize));
162 cksData.Length = csSize;
163 memcpy(cksData.Value, calcP->Final(), csSize);
165 if ((rc = ossDF->Fstat(&
Stat)))
166 {eLog.
Emsg(
"clscls", rc,
"get real-time checksum mtime for", fPath);
170 cksData.fmTime =
static_cast<long long>(
Stat.st_mtime);
171 cksData.csTime =
static_cast<int>(time(0) -
Stat.st_mtime);
173 if ((rc = cksP->Set(fPath, cksData, 1)))
174 eLog.
Emsg(
"ckscls", rc,
"set real-time checksum for", fPath);
183 return wrapDF.Close(retsz);
201 int rc =
wrapDF.Ftruncate(flen);
207 {eLog.
Emsg(
"ckstrunc", rc,
"continue real-time checksum for", fPath);
211 {eLog.
Emsg(
"ckstrunc",
"Unable to continue real-time checksum for",
212 fPath,
"; truncate arg not 0.");
247 if (Dirty)
return -ENOTSUP;
261 if (rc) Dirty =
true;
279 ssize_t retval =
wrapDF.pgWrite(buffer, offset, wrlen, csvec,
opts);
285 {eLog.
Emsg(
"ckspgw", retval,
"continue real-time checksum for",fPath);
288 if ((eText = (this->*ProcessRTC)(buffer, offset, wrlen)))
289 {eLog.
Emsg(
"ckspgw",
"unable to continue real-time checksum for",
309 {
if ((eText = (this->*ProcessRTC)((
void *)aioparm->
sfsAio.
aio_buf,
312 {eLog.
Emsg(
"cksaiopw",
"Unable to continue real-time checksum for",
344 ssize_t retval =
wrapDF.Write(buff, offset, blen);
350 {eLog.
Emsg(
"cksw", retval,
"continue streaming checksum for", fPath);
353 if ((eText = (this->*ProcessRTC)(buff, offset, blen)))
354 {eLog.
Emsg(
"cksw",
"Unable to continue real-time checksum for",
375 {
if ((eText = (this->*ProcessRTC)((
void *)aioparm->
sfsAio.
aio_buf,
378 {eLog.
Emsg(
"cksaiopw",
"Unable to continue real-time checksum for",
386 return wrapDF.Write(aioparm);
399 {eLog.
Emsg(
"ckswv",
"Unable to continue streaming checksum for",
400 fPath,
"; WriteV() conflict.");
406 return wrapDF.WriteV(writeV, n);
418const char* XrdOfsCksFile::RTC_CB32(
const void* inBuff, off_t inOff,
int inLen)
424 if (inOff == nextOff)
425 {calcP->
Update((
const char*)inBuff, inLen);
426 nextOff = inOff + inLen;
427 auto it = segMap.begin();
428 while(it != segMap.end() && nextOff == it->second.segBeg)
430 calcP->
Combine((
const char*)&(it->second.segCks),it->second.segLen);
431 nextOff = it->second.segBeg + it->second.segLen;
432 it = segMap.erase(it);
437 if (it != segMap.end() && nextOff > it->second.segBeg)
438 return "; I/O segments overlap";
445 if (inOff < nextOff)
return "; ovewrite of previous data";
450 memcpy(&theCS, altcP->Calc((
const char*)inBuff, inLen),
sizeof(theCS));
454 inSeg newSeg(inOff, inLen, theCS);
458 auto it = segMap.insert(std::pair(inOff, newSeg));
459 if (it.second ==
false)
460 return "; duplicate write";
473const char* XrdOfsCksFile::RTC_NCXX(
const void* inBuff, off_t inOff,
int inLen)
475 XrdSysMutexHelper mHelp(cksMtx);
479 if (inOff == nextOff)
481 calcP->Update((
const char*)inBuff, inLen);
482 nextOff = inOff + inLen;
483 auto it = segMap.begin();
484 while(it != segMap.end() && nextOff == it->second.segBeg)
486 if ((eText = RTC_Updt(it->second.segBeg, it->second.segLen)))
488 nextOff = it->second.segBeg + it->second.segLen;
489 it = segMap.erase(it);
494 if (it != segMap.end() && nextOff > it->second.segBeg)
495 return "; I/O segments overlap";
502 if (inOff < nextOff)
return "; ovewrite of previous data";
506 inSeg newSeg(inOff, inLen, 0);
510 auto it = segMap.insert(std::pair(inOff, newSeg));
511 if (it.second ==
false)
512 return "; duplicate write";
523const char* XrdOfsCksFile::RTC_Updt(off_t inOff,
int inLen)
525 ssize_t ioLen, retval;
528 {ioLen = (inLen > ioBlen ? ioBlen : inLen);
529 if ((retval = ossDF->Read(ioBuff, inOff, ioLen)) != ioLen)
530 {
if (retval >= 0) retval = -
ENODATA;
533 calcP->Update((
const char*)ioBuff, ioLen);
virtual bool Combinable()
virtual void Update(const char *Buff, int BLen)=0
virtual const char * Combine(const char *Cksum, int DLen)
virtual const char * Type(int &csSize)=0
int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) override
ssize_t Write(const void *buffer, off_t offset, size_t size) override
ssize_t WriteV(XrdOucIOVec *writeV, int wrvcnt) override
static void Init(XrdCks *cp, XrdOucEnv *ep)
XrdOfsCksFile(const char *tid, const char *path, XrdOssDF *df, XrdCksCalc *cP, bool &delFlag)
int Close(long long *retsz=0) override
ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts) override
int Ftruncate(unsigned long long flen) override
XrdOssDF(const char *tid="", uint16_t dftype=0, int fdnum=-1)
XrdOssWrapDF(XrdOssDF &df2Wrap)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static const char * ec2text(int ecode)