加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
serviceprovider.go 18.89 KB
一键复制 编辑 原始数据 按行查看 历史
lixiaoxiao 提交于 2022-08-13 21:59 . init
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
// This file defines ServiceProvider (i.e., a DICOM server).
package netdicom
import (
"crypto/tls"
"fmt"
"net"
"gitee.com/weili0115/go-netdicom/dimse"
"gitee.com/weili0115/go-netdicom/sopclass"
dicom "github.com/grailbio/go-dicom"
"github.com/grailbio/go-dicom/dicomio"
"github.com/grailbio/go-dicom/dicomlog"
)
// CMoveResult is an object streamed by CMove implementation.
type CMoveResult struct {
Remaining int // Number of files remaining to be sent. Set -1 if unknown.
Err error
Path string // Path name of the DICOM file being copied. Used only for reporting errors.
DataSet *dicom.DataSet // Contents of the file.
}
func handleCStore(
cb CStoreCallback,
connState ConnectionState,
c *dimse.CStoreRq, data []byte,
cs *serviceCommandState) {
status := dimse.Status{Status: dimse.StatusUnrecognizedOperation}
if cb != nil {
status = cb(
connState,
cs.context.transferSyntaxUID,
c.AffectedSOPClassUID,
c.AffectedSOPInstanceUID,
data)
}
resp := &dimse.CStoreRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
AffectedSOPInstanceUID: c.AffectedSOPInstanceUID,
Status: status,
}
cs.sendMessage(resp, nil)
}
func handleCFind(
params ServiceProviderParams,
connState ConnectionState,
c *dimse.CFindRq, data []byte,
cs *serviceCommandState) {
if params.CFind == nil {
cs.sendMessage(&dimse.CFindRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: "No callback found for C-FIND"},
}, nil)
return
}
elems, err := readElementsInBytes(data, cs.context.transferSyntaxUID)
if err != nil {
cs.sendMessage(&dimse.CFindRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: err.Error()},
}, nil)
return
}
dicomlog.Vprintf(1, "dicom.serviceProvider: C-FIND-RQ payload: %s", elementsString(elems))
status := dimse.Status{Status: dimse.StatusSuccess}
responseCh := make(chan CFindResult, 128)
go func() {
params.CFind(connState, cs.context.transferSyntaxUID, c.AffectedSOPClassUID, elems, responseCh)
}()
for resp := range responseCh {
if resp.Err != nil {
status = dimse.Status{
Status: dimse.CFindUnableToProcess,
ErrorComment: resp.Err.Error(),
}
break
}
dicomlog.Vprintf(1, "dicom.serviceProvider: C-FIND-RSP: %s", elementsString(resp.Elements))
payload, err := writeElementsToBytes(resp.Elements, cs.context.transferSyntaxUID)
if err != nil {
dicomlog.Vprintf(0, "dicom.serviceProvider: C-FIND: encode error %v", err)
status = dimse.Status{
Status: dimse.CFindUnableToProcess,
ErrorComment: err.Error(),
}
break
}
cs.sendMessage(&dimse.CFindRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNonNull,
Status: dimse.Status{Status: dimse.StatusPending},
}, payload)
}
cs.sendMessage(&dimse.CFindRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: status}, nil)
// Drain the responses in case of errors
for range responseCh {
}
}
func handleCMove(
params ServiceProviderParams,
connState ConnectionState,
c *dimse.CMoveRq, data []byte,
cs *serviceCommandState) {
sendError := func(err error) {
cs.sendMessage(&dimse.CMoveRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: err.Error()},
}, nil)
}
if params.CMove == nil {
cs.sendMessage(&dimse.CMoveRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: "No callback found for C-MOVE"},
}, nil)
return
}
remoteHostPort, ok := params.RemoteAEs[c.MoveDestination]
if !ok {
sendError(fmt.Errorf("C-MOVE destination '%v' not registered in the server", c.MoveDestination))
return
}
elems, err := readElementsInBytes(data, cs.context.transferSyntaxUID)
if err != nil {
sendError(err)
return
}
dicomlog.Vprintf(1, "dicom.serviceProvider: C-MOVE-RQ payload: %s", elementsString(elems))
responseCh := make(chan CMoveResult, 128)
go func() {
params.CMove(connState, cs.context.transferSyntaxUID, c.AffectedSOPClassUID, elems, responseCh)
}()
// responseCh :=
status := dimse.Status{Status: dimse.StatusSuccess}
var numSuccesses, numFailures uint16
for resp := range responseCh {
if resp.Err != nil {
status = dimse.Status{
Status: dimse.CFindUnableToProcess,
ErrorComment: resp.Err.Error(),
}
break
}
dicomlog.Vprintf(0, "dicom.serviceProvider: C-MOVE: Sending %v to %v(%s)", resp.Path, c.MoveDestination, remoteHostPort)
err := runCStoreOnNewAssociation(params.AETitle, c.MoveDestination, remoteHostPort, resp.DataSet)
if err != nil {
dicomlog.Vprintf(0, "dicom.serviceProvider: C-MOVE: C-store of %v to %v(%v) failed: %v", resp.Path, c.MoveDestination, remoteHostPort, err)
numFailures++
} else {
numSuccesses++
}
cs.sendMessage(&dimse.CMoveRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
NumberOfRemainingSuboperations: uint16(resp.Remaining),
NumberOfCompletedSuboperations: numSuccesses,
NumberOfFailedSuboperations: numFailures,
Status: dimse.Status{Status: dimse.StatusPending},
}, nil)
}
cs.sendMessage(&dimse.CMoveRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
NumberOfCompletedSuboperations: numSuccesses,
NumberOfFailedSuboperations: numFailures,
Status: status}, nil)
// Drain the responses in case of errors
for range responseCh {
}
}
func handleCGet(
params ServiceProviderParams,
connState ConnectionState,
c *dimse.CGetRq, data []byte, cs *serviceCommandState) {
sendError := func(err error) {
cs.sendMessage(&dimse.CGetRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: err.Error()},
}, nil)
}
if params.CGet == nil {
cs.sendMessage(&dimse.CGetRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: dimse.Status{Status: dimse.StatusUnrecognizedOperation, ErrorComment: "No callback found for C-GET"},
}, nil)
return
}
elems, err := readElementsInBytes(data, cs.context.transferSyntaxUID)
if err != nil {
sendError(err)
return
}
dicomlog.Vprintf(1, "dicom.serviceProvider: C-GET-RQ payload: %s", elementsString(elems))
responseCh := make(chan CMoveResult, 128)
go func() {
params.CGet(connState, cs.context.transferSyntaxUID, c.AffectedSOPClassUID, elems, responseCh)
}()
status := dimse.Status{Status: dimse.StatusSuccess}
var numSuccesses, numFailures uint16
for resp := range responseCh {
if resp.Err != nil {
status = dimse.Status{
Status: dimse.CFindUnableToProcess,
ErrorComment: resp.Err.Error(),
}
break
}
subCs, err := cs.disp.newCommand(cs.cm, cs.context /*not used*/)
if err != nil {
status = dimse.Status{
Status: dimse.CFindUnableToProcess,
ErrorComment: err.Error(),
}
break
}
err = runCStoreOnAssociation(subCs.upcallCh, subCs.disp.downcallCh, subCs.cm, subCs.messageID, resp.DataSet)
if err != nil {
dicomlog.Vprintf(0, "dicom.serviceProvider: C-GET: C-store of %v failed: %v", resp.Path, err)
numFailures++
} else {
dicomlog.Vprintf(0, "dicom.serviceProvider: C-GET: Sent %v", resp.Path)
numSuccesses++
}
cs.sendMessage(&dimse.CGetRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
NumberOfRemainingSuboperations: uint16(resp.Remaining),
NumberOfCompletedSuboperations: numSuccesses,
NumberOfFailedSuboperations: numFailures,
Status: dimse.Status{Status: dimse.StatusPending},
}, nil)
cs.disp.deleteCommand(subCs)
}
cs.sendMessage(&dimse.CGetRsp{
AffectedSOPClassUID: c.AffectedSOPClassUID,
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
NumberOfCompletedSuboperations: numSuccesses,
NumberOfFailedSuboperations: numFailures,
Status: status}, nil)
// Drain the responses in case of errors
for range responseCh {
}
}
func handleCEcho(
params ServiceProviderParams,
connState ConnectionState,
c *dimse.CEchoRq, data []byte,
cs *serviceCommandState) {
status := dimse.Status{Status: dimse.StatusUnrecognizedOperation}
if params.CEcho != nil {
status = params.CEcho(connState)
}
dicomlog.Vprintf(0, "dicom.serviceProvider: Received E-ECHO: context: %+v, status: %+v", cs.context, status)
resp := &dimse.CEchoRsp{
MessageIDBeingRespondedTo: c.MessageID,
CommandDataSetType: dimse.CommandDataSetTypeNull,
Status: status,
}
cs.sendMessage(resp, nil)
}
// ServiceProviderParams defines parameters for ServiceProvider.
type ServiceProviderParams struct {
// The application-entity title of the server. Must be nonempty
AETitle string
// Names of remote AEs and their host:ports. Used only by C-MOVE. This
// map should be nonempty iff the server supports CMove.
RemoteAEs map[string]string
// Called on C_ECHO request. If nil, a C-ECHO call will produce an error response.
//
// TODO(saito) Support a default C-ECHO callback?
CEcho CEchoCallback
// Called on C_FIND request.
// If CFindCallback=nil, a C-FIND call will produce an error response.
CFind CFindCallback
// CMove is called on C_MOVE request.
CMove CMoveCallback
// CGet is called on C_GET request. The only difference between cmove
// and cget is that cget uses the same connection to send images back to
// the requester. Generally you shuold set the same function to CMove
// and CGet.
CGet CMoveCallback
// If CStoreCallback=nil, a C-STORE call will produce an error response.
CStore CStoreCallback
// TLSConfig, if non-nil, enables TLS on the connection. See
// https://gist.github.com/michaljemala/d6f4e01c4834bf47a9c4 for an
// example for creating a TLS config from x509 cert files.
TLSConfig *tls.Config
}
// DefaultMaxPDUSize is the the PDU size advertized by go-netdicom.
const DefaultMaxPDUSize = 4 << 20
// CStoreCallback is called C-STORE request. sopInstanceUID is the UID of the
// data. sopClassUID is the data type requested
// (e.g.,"1.2.840.10008.5.1.4.1.1.1.2"), and transferSyntaxUID is the encoding
// of the data (e.g., "1.2.840.10008.1.2.1"). These args are extracted from the
// request packet.
//
// "data" is the payload, i.e., a sequence of serialized dicom.DataElement
// objects in transferSyntaxUID. "data" does not contain metadata elements
// (elements whose Tag.Group=2 -- e.g., TransferSyntaxUID and
// MediaStorageSOPClassUID), since they are stripped by the requster (two key
// metadata are passed as sop{Class,Instance)UID).
//
// The function should store encode the sop{Class,InstanceUID} as the DICOM
// header, followed by data. It should return either dimse.Success0 on success,
// or one of CStoreStatus* error codes on errors.
type CStoreCallback func(
conn ConnectionState,
transferSyntaxUID string,
sopClassUID string,
sopInstanceUID string,
data []byte) dimse.Status
// CFindCallback implements a C-FIND handler. sopClassUID is the data type
// requested (e.g.,"1.2.840.10008.5.1.4.1.1.1.2"), and transferSyntaxUID is the
// data encoding requested (e.g., "1.2.840.10008.1.2.1"). These args are
// extracted from the request packet.
//
// This function should stream CFindResult objects through "ch". The function
// may block. To report a matched DICOM dataset, the function should send one
// CFindResult with a nonempty Element field. To report multiple DICOM-dataset
// matches, the callback should send multiple CFindResult objects, one for each
// dataset. The callback must close the channel after it produces all the
// responses.
type CFindCallback func(
conn ConnectionState,
transferSyntaxUID string,
sopClassUID string,
filters []*dicom.Element,
ch chan CFindResult)
// CMoveCallback implements C-MOVE or C-GET handler. sopClassUID is the data
// type requested (e.g.,"1.2.840.10008.5.1.4.1.1.1.2"), and transferSyntaxUID is
// the data encoding requested (e.g., "1.2.840.10008.1.2.1"). These args are
// extracted from the request packet.
//
// The callback must stream datasets or error to "ch". The callback may
// block. The callback must close the channel after it produces all the
// datasets.
type CMoveCallback func(
conn ConnectionState,
transferSyntaxUID string,
sopClassUID string,
filters []*dicom.Element,
ch chan CMoveResult)
// ConnectionState informs session state to callbacks.
type ConnectionState struct {
// TLS connection state. It is nonempty only when the connection is set up
// over TLS.
TLS tls.ConnectionState
}
// CEchoCallback implements C-ECHO callback. It typically just returns
// dimse.Success.
type CEchoCallback func(conn ConnectionState) dimse.Status
// ServiceProvider encapsulates the state for DICOM server (provider).
type ServiceProvider struct {
params ServiceProviderParams
listener net.Listener
// Label is a unique string used in log messages to identify this provider.
label string
}
func writeElementsToBytes(elems []*dicom.Element, transferSyntaxUID string) ([]byte, error) {
dataEncoder := dicomio.NewBytesEncoderWithTransferSyntax(transferSyntaxUID)
for _, elem := range elems {
dicom.WriteElement(dataEncoder, elem)
}
if err := dataEncoder.Error(); err != nil {
return nil, err
}
return dataEncoder.Bytes(), nil
}
func readElementsInBytes(data []byte, transferSyntaxUID string) ([]*dicom.Element, error) {
decoder := dicomio.NewBytesDecoderWithTransferSyntax(data, transferSyntaxUID)
var elems []*dicom.Element
for !decoder.EOF() {
elem := dicom.ReadElement(decoder, dicom.ReadOptions{})
dicomlog.Vprintf(1, "dicom.serviceProvider: C-FIND: Read elem: %v, err %v", elem, decoder.Error())
if decoder.Error() != nil {
break
}
elems = append(elems, elem)
}
if decoder.Error() != nil {
return nil, decoder.Error()
}
return elems, nil
}
func elementsString(elems []*dicom.Element) string {
s := "["
for i, elem := range elems {
if i > 0 {
s += ", "
}
s += elem.String()
}
return s + "]"
}
// Send "ds" to remoteHostPort using C-STORE. Called as part of C-MOVE.
func runCStoreOnNewAssociation(myAETitle, remoteAETitle, remoteHostPort string, ds *dicom.DataSet) error {
su, err := NewServiceUser(ServiceUserParams{
CalledAETitle: remoteAETitle,
CallingAETitle: myAETitle,
SOPClasses: sopclass.StorageClasses})
if err != nil {
return err
}
defer su.Release()
su.Connect(remoteHostPort)
err = su.CStore(ds)
dicomlog.Vprintf(1, "dicom.serviceProvider: C-STORE subop done: %v", err)
return err
}
// NewServiceProvider creates a new DICOM server object. "listenAddr" is the
// TCP address to listen to. E.g., ":1234" will listen to port 1234 at all the
// IP address that this machine can bind to. Run() will actually start running
// the service.
func NewServiceProvider(params ServiceProviderParams, port string) (*ServiceProvider, error) {
sp := &ServiceProvider{
params: params,
label: newUID("sp"),
}
var err error
if params.TLSConfig != nil {
sp.listener, err = tls.Listen("tcp", port, params.TLSConfig)
} else {
sp.listener, err = net.Listen("tcp", port)
}
if err != nil {
return nil, err
}
return sp, nil
}
func getConnState(conn net.Conn) (cs ConnectionState) {
tlsConn, ok := conn.(*tls.Conn)
if ok {
cs.TLS = tlsConn.ConnectionState()
}
return
}
// RunProviderForConn starts threads for running a DICOM server on "conn". This
// function returns immediately; "conn" will be cleaned up in the background.
func RunProviderForConn(conn net.Conn, params ServiceProviderParams) {
upcallCh := make(chan upcallEvent, 128)
label := newUID("sc")
disp := newServiceDispatcher(label)
disp.registerCallback(dimse.CommandFieldCStoreRq,
func(msg dimse.Message, data []byte, cs *serviceCommandState) {
handleCStore(params.CStore, getConnState(conn), msg.(*dimse.CStoreRq), data, cs)
})
disp.registerCallback(dimse.CommandFieldCFindRq,
func(msg dimse.Message, data []byte, cs *serviceCommandState) {
handleCFind(params, getConnState(conn), msg.(*dimse.CFindRq), data, cs)
})
disp.registerCallback(dimse.CommandFieldCMoveRq,
func(msg dimse.Message, data []byte, cs *serviceCommandState) {
handleCMove(params, getConnState(conn), msg.(*dimse.CMoveRq), data, cs)
})
disp.registerCallback(dimse.CommandFieldCGetRq,
func(msg dimse.Message, data []byte, cs *serviceCommandState) {
handleCGet(params, getConnState(conn), msg.(*dimse.CGetRq), data, cs)
})
disp.registerCallback(dimse.CommandFieldCEchoRq,
func(msg dimse.Message, data []byte, cs *serviceCommandState) {
handleCEcho(params, getConnState(conn), msg.(*dimse.CEchoRq), data, cs)
})
go runStateMachineForServiceProvider(conn, upcallCh, disp.downcallCh, label)
for event := range upcallCh {
disp.handleEvent(event)
}
dicomlog.Vprintf(0, "dicom.serviceProvider(%s): Finished connection %p (remote: %+v)", label, conn, conn.RemoteAddr())
disp.close()
}
// Run listens to incoming connections, accepts them, and runs the DICOM
// protocol. This function never returns.
func (sp *ServiceProvider) Run() {
for {
conn, err := sp.listener.Accept()
if err != nil {
dicomlog.Vprintf(0, "dicom.serviceProvider(%s): Accept error: %v", sp.label, err)
continue
}
dicomlog.Vprintf(0, "dicom.serviceProvider(%s): Accepted connection %p (remote: %+v)", sp.label, conn, conn.RemoteAddr())
go func() { RunProviderForConn(conn, sp.params) }()
}
}
// ListenAddr returns the TCP address that the server is listening on. It is the
// address passed to the NewServiceProvider(), except that if value was of form
// <name>:0, the ":0" part is replaced by the actual port numwber.
func (sp *ServiceProvider) ListenAddr() net.Addr {
return sp.listener.Addr()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化