1313import json
1414import pytz
1515from tzlocal import get_localzone
16+ import time
1617
1718# get local timezone
1819local_tz = get_localzone ()
@@ -61,8 +62,8 @@ def __init__(self, writer, log, args):
6162
6263 self .writer = writer
6364 j = journal .Reader ()
64- j .log_level (journal .LOG_DEBUG )
6565 self .log = log
66+ self .total_shipped = 0
6667 self .CURSOR_FILE = args .cursor
6768 self .COUNT_THRESHOLD = args .count
6869 self .SECOND_THRESHOLD = args .timeout
@@ -102,7 +103,7 @@ def check_bucket(self):
102103 Do we need to ship the log entries yet?
103104 """
104105
105- if not self .bucket :
106+ if not self .bucket or len ( self . bucket ) == 1 :
106107 self .log .debug ('Nothing in bucket' )
107108 return False
108109
@@ -119,14 +120,13 @@ def check_bucket(self):
119120
120121
121122 def run (self ):
122- """
123-
124- """
125123
126124 while True :
125+ time .sleep (1 )
127126
128127 #get all entries currently available
129128 for entry in self .journal :
129+ #print transform_entry(entry)
130130 self .bucket .append (transform_entry (entry ))
131131 self .cursor = entry ['__CURSOR' ]
132132
@@ -142,11 +142,6 @@ def run(self):
142142 else :
143143 self .poll .poll (max_delay * 1000 )
144144
145- if self .journal .process () != journal .APPEND :
146- #Ignore NOP and INVALIDATE entries
147- self .log .debug ('NOP or INVALIDATE' )
148- continue
149-
150145
151146 def ship_logs (self ):
152147 """
@@ -156,7 +151,11 @@ def ship_logs(self):
156151 self .writer .put (self .bucket )
157152 self .last_ship = datetime .datetime .now ()
158153 self .save_cursor ()
159- self .log .info ('SHIPPED count={}' .format (count ))
154+ self .log .debug ('SHIPPED count={}' .format (count ))
155+ self .total_shipped += count
156+ if self .total_shipped > 500 :
157+ self .log .info ('SHIPPED count={}' .format (self .total_shipped ))
158+ self .total_shipped = 0
160159 self .bucket = []
161160
162161
0 commit comments