這是一個將軟件輪詢數據記錄到 csv 文件的示例應用程序。
使用 MCC 118,在后臺連續模式下使用 a_in_scan() 收集數據,并演示在 Python 中以塊的形式寫入數據。
文件名是根據日期和時間自動生成的,例如:yyyy_Month_dd_hhmmss.csv
數據存儲在 $ /home/pi/Documents/Measurement_Computing/log_files
-------------------------------------------------- -------------------------------------------------- ----------------------
#!/usr/bin/env python
# -*- 編碼:utf-8 -*-
從 __future__ 導入 print_function
從系統導入標準輸出
從時間導入睡眠
從 daqhats 導入 mcc118、OptionFlags、HatIDs、HatError
從 daqhats_utils 導入 select_hat_device,enum_mask_to_string,\
chan_list_to_mask
從日期時間導入日期時間,時間增量
導入操作系統
import csv #這是使 csv 文件創建簡單的導入。
導入錯誤號
READ_ALL_AVAILABLE = -1
CURSOR_BACK_2 = '\x1b[2D'
ERASE_TO_END_OF_LINE = '\x1b[0K'
定義主():
"""
該功能在模塊直接運行時自動執行。
"""
# 將頻道存儲在列表中,并將列表轉換為頻道掩碼
# 可以作為參數傳遞給 MCC 118 函數。
頻道 = [0, 1, 2, 3]
頻道掩碼 = chan_list_to_mask(頻道)
num_channels = len(頻道)
samples_per_channel = 0
選項 = OptionFlags.CONTINUOUS
掃描速率 = 1000.0
嘗試:
# 選擇要使用的 MCC 118 HAT 設備。
地址 = select_hat_device(HatIDs.MCC_118)
帽子 = mcc118(地址)
print('\n在地址處選擇的 MCC 118 HAT 設備', address)
actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
print('\nMCC 118 連續掃描示例')
print('函數演示:')
打印('mcc118.a_in_scan_start')
打印('mcc118.a_in_scan_read')
打印('mcc118.a_in_scan_stop')
print('頻道:', end='')
print(', '.join([str(chan) for chan in channels]))
print('請求的掃描速率:', scan_rate)
print('實際掃描速率:', actual_scan_rate)
print('選項:', enum_mask_to_string(OptionFlags, options))
嘗試:
input('\n按 ENTER 繼續...')
除了(名稱錯誤,語法錯誤):
經過
# 配置并開始掃描。
# 由于使用了連續選項,samples_per_channel
# 如果值小于默認的內部參數,則忽略該參數
# 緩沖區大小(在這種情況下為 10000 * num_channels)。如果更大的內部
# 需要緩沖區大小,相應地設置此參數的值。
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,
選項)
print('開始掃描...按 Ctrl-C 停止\n')
# 顯示數據表的標題行。
print('樣本讀取掃描計數', end='')
對于chan,枚舉中的項目(頻道):
print('頻道', item, sep='', end='')
打印('')
嘗試:
read_and_display_data(帽子,num_channels)
除了鍵盤中斷:
# 從顯示中清除'^C'。
打印(CURSOR_BACK_2,ERASE_TO_END_OF_LINE,'\n')
除了 (HatError, ValueError) 作為錯誤:
打印('\n',錯誤)
def read_and_display_data(帽子,num_channels):
"""
從指定 DAQ HAT 設備上的指定通道讀取數據,
更新終端顯示屏上的數據并將數據寫入 .csv
文件。讀取在循環中執行,直到用戶
停止掃描或檢測到溢出錯誤。
參數:
hat (mcc118):mcc118 HAT 設備對象。
num_channels (int):要顯示的通道數。
回報:
沒有任何
"""
total_samples_read = 0
read_request_size = READ_ALL_AVAILABLE
basepath = '/home/pi/Documents/Measurement_Computing'
mypath = basepath + '/Scanning_log_files'
# 進行連續掃描時,超時值將被忽略
# 調用 a_in_scan_read 因為我們將要求所有可用的
# 返回樣本(最多為默認緩沖區大小)。
超時 = 5.0
# 讀取所有可用的樣本(直到 read_buffer 的大小
# 由 user_buffer_size 指定)。由于設置了 read_request_size
# 為 -1 (READ_ALL_AVAILABLE),此函數立即返回
# 任何可用的樣本(最多 user_buffer_size)和超時
# 參數被忽略。
#=================================================== ==============================
# 如果掃描開始,根據當前日期和時間創建一個文件名。
# 生成完整路徑
# 將收集的數據寫入 .csv 文件的位置。打開文件
# 開始將數據寫入文件。完成后,關閉文件。
嘗試:
如果 os.path.exists(basepath):
如果不是(os.path.exists(mypath)):
os.mkdir(我的路徑)
別的:
os.mkdir(基本路徑)
os.chdir(基本路徑)
os.mkdir(我的路徑)
除了 OSError 作為 exc:
增加
os.chdir(mypath)
fileDateTime = datetime.strftime(datetime.now(), "%Y_%B_%d_%H%M%S")
fileDateTime = mypath + "/" + fileDateTime + ".csv"
csvfile = open(fileDateTime, "w+")
csvwriter = csv.writer(csvfile)
而真:
read_result = hat.a_in_scan_read(read_request_size, timeout)
# 檢查溢出錯誤
如果 read_result.hardware_overrun:
print('\n\n硬件溢出\n')
休息
elif read_result.buffer_overrun:
print('\n\n緩沖區溢出\n')
休息
samples_read_per_channel = int(len(read_result.data) / num_channels)
total_samples_read += samples_read_per_channel
totalSamples = len(read_result.data)
print("\r MyTotalSamples = %d\n" % totalSamples)
# 顯示每個通道的最后一個樣本。
print('\r{:12}'.format(samples_read_per_channel),
' {:12} '.format(total_samples_read), end='')
如果 samples_read_per_channel > 0:
索引 = samples_read_per_channel * num_channels - num_channels
print("\r 索引 = %d\n" % 索引)
新索引 = 0
myArray=[] #創建一個空數組
對于我在范圍內(0,totalSamples,num_channels):
myArray.append([]) #向數組中添加一行(COLUMN)
對于范圍內的 j (num_channels):
print('{:10.5f}'.format(read_result.data[j]), 'V',
結束='')
#追加一個num_channels的數據到數組(ROW)
myArray[new_index].append(read_result.data[i + j])
新索引+=1
打印(“\r”)
csvwriter.writerows(myArray) #將數組寫入文件
csvfile.flush
睡眠(0.1)
打印('\n')
csvfile.close()
如果 __name__ == '__main__':
主要的()