32#define MY_DEBUG_SECTION 50
39 bufferCapacity(bufCap),
43 reconnectScheduled(false),
44 writeScheduled(false),
52 "WARNING: tcp:" <<
remote <<
" logger configured buffer " <<
55 "Using the minimum instead.");
76 " buffered: " << bufferedSize <<
77 " conn: " << conn <<
' ' << connectFailures);
87 const bool failingToConnect = !conn && connectFailures;
88 if (bufferedSize && !failingToConnect)
105 assert(inCall !=
nullptr);
113 flushDebt = bufferedSize;
120 appendRecord(buf, len);
130 if (flushDebt > 0 || buffers.size() > 1)
138 (bufferedSize > 0) << (conn !=
nullptr) <<
139 (conn !=
nullptr && !
fd_table[conn->fd].closing()) <<
" buffered: " <<
140 bufferedSize <<
'/' << buffers.size());
144 if (!writeScheduled && bufferedSize > 0 && conn !=
nullptr &&
152 writeScheduled =
true;
162 if (bufferedSize+len <= bufferCapacity) {
173 " logger stops dropping records after " << drops <<
" drops" <<
174 "; current buffer use: " << (bufferedSize+len) <<
175 " out of " << bufferCapacity <<
" bytes");
180 if (!drops || dieOnError) {
182 "ERROR: tcp:" << remote <<
" logger " << bufferCapacity <<
"-byte " <<
183 "buffer overflowed; cannot fit " <<
184 (bufferedSize+len-bufferCapacity) <<
" bytes");
188 fatal(
"tcp logger buffer overflowed");
192 " logger starts dropping records.");
214 for (
size_t off = 0;
off < len;
off += IoBufSize)
215 appendChunk(record +
off,
min(len -
off, IoBufSize));
222 Must(len <= IoBufSize);
224 bool addBuffer = buffers.empty() ||
225 (buffers.back()->size+len > IoBufSize);
227 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
230 buffers.push_back(
new MemBlob(IoBufSize));
234 Must(!buffers.empty());
235 buffers.back()->append(chunk, len);
250 futureConn->
remote = remote;
258 connWait.start(cs, call);
268 const double delay = 0.5;
269 if (connectFailures++ % 100 == 0) {
271 " logger connection attempt #" << connectFailures <<
272 " failed. Will keep trying every " << delay <<
" seconds.");
275 if (!reconnectScheduled) {
276 reconnectScheduled =
true;
277 eventAdd(
"Log::TcpLogger::DelayedReconnect",
279 new Pointer(
this), 0.5, 0,
false);
282 if (connectFailures > 0) {
284 " logger connectivity restored after " <<
285 (connectFailures+1) <<
" attempts.");
323 Must(reconnectScheduled);
325 reconnectScheduled =
false;
333 writeScheduled =
false;
345 Must(!buffers.empty());
347 const size_t writtenSize =
static_cast<size_t>(written->
size);
350 Must(bufferedSize >= writtenSize);
351 bufferedSize -= writtenSize;
355 if (flushDebt > io.
size)
356 flushDebt -= io.
size;
369 assert(inCall !=
nullptr);
376 mustStop(
"Log::TcpLogger::handleClosure");
383 if (conn !=
nullptr) {
384 if (closer !=
nullptr) {
406 if (
TcpLogger *logger = StillLogging(lf))
413 if (
TcpLogger *logger = StillLogging(lf))
414 logger->logRecord(buf, len);
437 if (
TcpLogger *logger = StillLogging(lf)) {
438 debugs(50, 3,
"Closing " << logger);
454 assert(!StillLogging(lf));
455 debugs(5, 3,
"Tcp Open called");
459 if (strncmp(path,
"//", 2) == 0)
464 fatalf(
"Invalid TCP logging address '%s'\n", lf->
path);
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
bool GetHostWithPort(char *token, Ip::Address *ipa)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
static void Start(const Pointer &job)
virtual bool doneAll() const
whether positive goal has been reached
Cbc * valid() const
was set and is valid
int xerrno
The last errno to occur. non-zero if flag is Comm::COMM_ERROR.
Comm::Flag flag
comm layer result status.
Comm::ConnectionPointer conn
void setAnyAddr()
NOTE: Does NOT clear the Port stored. Only the Address and Type.
void writeDone(const CommIoCbParams &io)
Comm::Write callback.
void writeIfNeeded()
starts writing if and only if it is time to write accumulated records
void disconnect()
close our connection now, without flushing
void logRecord(const char *buf, size_t len)
buffers record and possibly writes it to the remote logger
void writeIfPossible()
starts writing if possible
static void EndLine(Logfile *lf)
bool canFit(const size_t len) const
whether len more bytes can be buffered
void appendChunk(const char *chunk, const size_t len)
buffer a record chunk without splitting it across buffers
static void Flush(Logfile *lf)
void delayedReconnect()
"sleep a little before trying to connect again" event callback
static void DelayedReconnect(void *data)
Log::TcpLogger::delayedReconnect() wrapper.
TcpLogger(size_t, bool, Ip::Address)
static TcpLogger * StillLogging(Logfile *lf)
void handleClosure(const CommCloseCbParams &io)
Ip::Address remote
where the remote logger expects our records
static void WriteLine(Logfile *lf, const char *buf, size_t len)
static void Close(Logfile *lf)
void doConnect()
starts [re]connecting to the remote logger
static const size_t IoBufSize
fixed I/O buffer size
static const size_t BufferCapacityMin
minimum bufferCapacity value
void connectDone(const CommConnectCbParams &conn)
Comm::ConnOpener callback.
bool doneAll() const override
whether positive goal has been reached
void appendRecord(const char *buf, size_t len)
buffer a record that might exceed IoBufSize
static int Open(Logfile *lf, const char *path, size_t bufSz, int fatalFlag)
void flush()
write all currently buffered records ASAP
size_t bufferCapacity
bufferedSize limit
static void Rotate(Logfile *lf, const int16_t)
static void StartLine(Logfile *lf)
void start() override
called by AsyncStart; do not call directly
LOGLINESTART * f_linestart
struct Logfile::@63 flags
value_type * mem
raw allocated memory block
size_type size
maximum allocated memory in use by callers
struct SquidConfig::@90 onoff
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
void fatal(const char *message)
void fatalf(const char *fmt,...)
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
const char * xstrerr(int error)