particle_data_script.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import socket
  2. import time
  3. import os
  4. import pandas as pd
  5. from datetime import datetime
  6. # === Configuration ===
  7. SENSOR_IP = '158.193.241.163'
  8. SENSOR_PORT = 10001
  9. LOG_INTERVAL = 10 # Save particle data every 10 seconds
  10. CYCLE_MINUTES = 6 # Trigger Telegram 8 cycle every 6 minutes
  11. # === Paths ===
  12. BASE_DIR = '/home/ranny/Particle_Data'
  13. PARTICLE_DIR = os.path.join(BASE_DIR, 'ParticleData')
  14. TELEGRAM8_DIR = os.path.join(BASE_DIR, 'Telegram8Data')
  15. ERROR_LOG_FILE = os.path.join(BASE_DIR, 'ErrorLog', 'error_log.txt')
  16. # === Command Constants ===
  17. CMD_KY1 = '00KY00001\r\n'
  18. CMD_KY0 = '00KY00000\r\n'
  19. CMD_TM3 = '00TM00003\r\n'
  20. CMD_TR8 = '00TR00008\r\n'
  21. ACK_KY1 = '!00KY00001'
  22. ACK_KY0 = '!00KY00000'
  23. # === Data Columns ===
  24. particle_columns = ['Timestamp', 'Amplitude', 'Duration', 'ITS',
  25. 'Diameter1 [mm]', 'Velocity [m/s]',
  26. 'Diameter Hamburger [mm]', 'Velocity Hamburger [m/s]', 'Temperature [C]']
  27. telegram8_columns = ["Timestamp", "ID", "Serial", "Intensity", "Date", "Time", "Code1", "Code2", "METAR1",
  28. "Value1", "Code3", "Code4", "METAR2", "Value2",
  29. "Diameter", "Velocity", "Amount_mm",
  30. "Visibility_m", "Reflectivity_dBZ", "Quality_pct",
  31. "Hail_mm", "End"]
  32. particle_df = pd.DataFrame(columns=particle_columns)
  33. # === Ensure directories exist ===
  34. for folder in [PARTICLE_DIR, TELEGRAM8_DIR, os.path.dirname(ERROR_LOG_FILE)]:
  35. os.makedirs(folder, exist_ok=True)
  36. def log_error(message):
  37. timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
  38. with open(ERROR_LOG_FILE, 'a') as f:
  39. f.write(f"{timestamp} {message}\n")
  40. def communicate(command, expect_reply=True, expected_ack=None, retries=3):
  41. for attempt in range(retries):
  42. try:
  43. print(f">>> {command.strip()}")
  44. with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
  45. s.sendall(command.encode())
  46. if expect_reply:
  47. response = s.recv(1024).decode(errors='ignore').strip()
  48. print(f"<<< {response}")
  49. if expected_ack is None or expected_ack in response:
  50. return response
  51. except Exception as e:
  52. msg = f"Attempt {attempt + 1} failed communicating command '{command.strip()}': {e}"
  53. print(f"[{datetime.now()}] {msg}")
  54. log_error(msg)
  55. time.sleep(1)
  56. return None
  57. def save_df_to_csv(df, path):
  58. file_exists = os.path.exists(path)
  59. df.to_csv(path, mode='a', index=False, header=not file_exists)
  60. def collect_telegram8():
  61. time.sleep(1)
  62. try:
  63. with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
  64. s.sendall(CMD_TR8.encode())
  65. time.sleep(1)
  66. response = s.recv(2048).decode(errors='ignore').strip()
  67. print("<<< FULL RESPONSE (Telegram 8):")
  68. print(response)
  69. except Exception as e:
  70. log_error(f"TR8 communication error: {e}")
  71. return
  72. telegram_line = None
  73. if response:
  74. start = response.find("00;")
  75. if start != -1:
  76. cleaned = response[start:].replace('\x03', '').replace('\r', '').replace('\n', '')
  77. if cleaned.count(';') >= 20:
  78. telegram_line = cleaned
  79. if telegram_line:
  80. now = datetime.now()
  81. iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
  82. row = [iso_timestamp] + telegram_line.split(';')[:len(telegram8_columns) - 1]
  83. df = pd.DataFrame([row], columns=telegram8_columns)
  84. filename = os.path.join(TELEGRAM8_DIR, f"telegram8_{now.strftime('%Y-%m-%d')}.csv")
  85. save_df_to_csv(df, filename)
  86. print(f"Saved Telegram 8 to {filename}")
  87. else:
  88. print("No valid Telegram 8 found")
  89. if communicate(CMD_KY1, expected_ack=ACK_KY1):
  90. # DO NOT CHECK ACK_TM3, sensor returns just "!"
  91. communicate(CMD_TM3, expect_reply=True) # accept any !
  92. else:
  93. log_error("Failed to re-enter config mode after Telegram 8")
  94. # === MAIN LOOP ===
  95. while True:
  96. try:
  97. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  98. s.settimeout(30)
  99. s.connect((SENSOR_IP, SENSOR_PORT))
  100. buffer = ""
  101. print("Connected. Listening for particle data...")
  102. last_write_time = time.time()
  103. last_cycle_time = time.time()
  104. skip_first_line = True
  105. first_line_after_tr8 = False
  106. while True:
  107. try:
  108. data = s.recv(1024).decode('utf-8', errors='replace')
  109. if not data:
  110. break
  111. buffer += data
  112. while "\r\n" in buffer:
  113. line, buffer = buffer.split("\r\n", 1)
  114. if skip_first_line:
  115. print("Skipping first corrupted line.")
  116. skip_first_line = False
  117. continue
  118. if first_line_after_tr8:
  119. idx = line.find("!00KY00000")
  120. if idx != -1:
  121. line = line[idx + len("!00KY00000"):].strip()
  122. first_line_after_tr8 = False
  123. now = datetime.now()
  124. iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
  125. fields = line.strip().split(';')
  126. if len(fields) == 8:
  127. try:
  128. parsed = [int(fields[0]), int(fields[1]), int(fields[2])] + \
  129. [float(f) for f in fields[3:7]] + [int(fields[7])]
  130. except:
  131. parsed = fields
  132. row = [iso_timestamp] + parsed
  133. particle_df.loc[len(particle_df)] = row
  134. print(f"[{iso_timestamp}] {fields}")
  135. except socket.timeout:
  136. print("Timeout.")
  137. except Exception as e:
  138. log_error(f"Socket error: {e}")
  139. break
  140. if time.time() - last_write_time >= LOG_INTERVAL:
  141. if not particle_df.empty:
  142. filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
  143. save_df_to_csv(particle_df, filename)
  144. print(f"Saved {len(particle_df)} new rows to {filename}")
  145. particle_df = pd.DataFrame(columns=particle_columns)
  146. last_write_time = time.time()
  147. if time.time() - last_cycle_time >= CYCLE_MINUTES * 60:
  148. print(f"\n[{datetime.now()}] Triggering Telegram 8 cycle...")
  149. if not particle_df.empty:
  150. filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
  151. save_df_to_csv(particle_df, filename)
  152. particle_df = pd.DataFrame(columns=particle_columns)
  153. s.close()
  154. print("Closed particle socket. Waiting 2 seconds before switching to TR8 mode...")
  155. time.sleep(2)
  156. for attempt in range(5):
  157. try:
  158. first_line_after_tr8 = True
  159. skip_first_line = True
  160. collect_telegram8()
  161. break
  162. except Exception as e:
  163. log_error(f"Retry {attempt+1} Telegram 8 failed: {e}")
  164. time.sleep(2 + attempt)
  165. else:
  166. log_error("Failed to collect Telegram 8 after multiple retries.")
  167. last_cycle_time = time.time()
  168. break # reconnect to sensor
  169. except Exception as e:
  170. log_error(f"Top-level error: {e}")
  171. time.sleep(5)